0%

Rxjs

current version: 6.5.2

关于响应式编程

知乎:响应式编程

Observable 的发布和订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Create simple observable that emits three values
const myObservable = of(1, 2, 3); // or from([1,2,3])

// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

// Execute with the observer object
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

of(…items) from(iterable)是自带常用产生Observable对象的工具

Observable 可观察对象

在http请求中,可观察对象不是new构造的,而是由httpclient的get或post方法返回一个Observable\. 私以为是成功调用接口后就发布一个结果,这个结果可以用管道加工

参考 Observable 的操作符

1
2
3
4
5
6
7
8
9
10
11
getDict(dictname): Observable<DictItem[]> {
return this.http.get(`dict/${dictname}`).pipe(
map((res: HttpResponse) => {
if (res.status === 'ok') {
return res.data;
} else {
return [];
}
}),
);
}

获取这个结果就需要订阅(subscribe)这个Observable,这个Observable是匿名的,每次获取它需要调用函数来返回

QQs:为什么httpclient方法不需要取消订阅

据说这些方法被实现为只next一次

Subject

A Subject is a special type of Observable which shares a single execution path among observers.

被比喻成广播者

1
2
3
4
5
6
7
const subject = new Subject();

subject.subscribe(log('s1 subject'));
subject.subscribe(log('s2 subject'));

subject.next('r');
subject.next('x');

同步数据转Observable

场景:组件粒度小,在视图中多次实例化,或者重复初始化,为防止频繁调用后台接口应加入数据缓存,
基础数据缓存实现为,第一次调用,从httpclient获取接口数据的Observable对象,并用管道处理加入缓存,之后将缓存数据取出转为Observable对象
basicdata.service.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Observable, of, from } from 'rxjs';
import { map } from 'rxjs/operators';

users: User[];
getUserList(): Observable<User[]> {
if (this.users) {
return of(this.users);
} else {
return this.http.post(`user/search`, { isvalid: 1 }).pipe(
map((res) => {
this.users = res as User[];
return this.users;
})
);
}
}

app.component.ts
1
2
3
4
this.basicData.getUserList().subscribe(list => {
this.userlist = list;
this.getUserName();
});

Promise转Observable

起初为了用async await编码以及利用链式调用,很多异步操作封装成了Promise,Promise转Observable用from方法转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { Observable, of, from } from 'rxjs';
import { map } from 'rxjs/operators';

getCurrentUserToken(): Observable<any> {
if (this.currentUser) {
return of(this.currentUser);
} else {
return from(this.ipcService.call('currentuser')).pipe(
map(arg => {
if (arg) {
return typeof arg === 'string' ? JSON.parse(arg) : arg;
} else {
return null;
}
})
);
}
}

Observable的链式调用

操作符

操作符实在太多了

实现一个乘10的operator

1
2
3
4
5
6
7
8
9
function multiplyByTen(input: Observable<any>): Observable<any> {
return Rx.Observable.create(observer => {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
}

创建操作符 of from interval等:

from

转化Promise对象、类数组对象、迭代器对象转化为 Observables
将数组转化为 Observable

1
2
3
4
5
6
var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));

// 结果如下:
// 10 20 30

将一个无限的迭代器(来自于 generator)转化为 Observable。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
function* generateDoubles(seed) {
var i = seed;
while (true) {
yield i;
i = 2 * i; // double it
}
}

var iterator = generateDoubles(3);
var result = Rx.Observable.from(iterator).take(10);
result.subscribe(x => console.log(x));

// Results in the following:
// 3 6 12 24 48 96 192 384 768 1536

转化操作符 map mapTo merge mergeMap等:

map类似于Array.prototype.map投射函数应用于每个值;mapTo相当于忽略实际订阅接受结果,替换为指定值;

merge将多个订阅捋直成一个订阅;mergeMap将投射函数应用于每个值,并将多个订阅捋直(啥?不懂)

take
filter
tap

Perform a side effect for every emission on the source Observable, but return an Observable that is identical to the source.对源可观察对象的每个‘发射’应用一个副作用,但仍然返回与源相同的可观察对象

1
2
3
4
tap<T>(nextOrObserver?: NextObserver<T> | ErrorObserver<T> | CompletionObserver<T> | (
(x: T) => void),
error?: (e: any) => void,
complete?: () => void): MonoTypeOperatorFunction<T>

参数可以是可观察对象或回调方法
常见于附上一个log操作
1
2
3
4
5
6
7
getHeroes(): Observable<Hero[]> {
return this.http.get<Hero[]>(this.heroesUrl)
.pipe(
tap(heroes => this.log(`fetched heroes`)),
catchError(this.handleError('getHeroes'))
) as Observable<Hero[]>;
}

BehaviorSubject

Subject 的作用是实现 Observable 的多播。由于其 Observable execution 是在多个订阅者之间共享的,所以它可以确保每个订阅者接收到的数据绝对相等。不仅使用 Subject 可以实现多播,RxJS 还提供了一些 Subject 的变体以应对不同场景,那就是:BehaviorSubject、ReplaySubject 以及 AsyncSubject。

BehaviorSubject 的特性就是它会存储“当前”的值。这意味着你始终可以直接拿到 BehaviorSubject 最后一次发出的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const subject = new Rx.BehaviorSubject(Math.random());

// 订阅者 A
subject.subscribe((data) => {
console.log('Subscriber A:', data);
});

subject.next(Math.random());

// 订阅者 B
subject.subscribe((data) => {
console.log('Subscriber B:', data);
});

subject.next(Math.random());

toPromise deprecated

Rxjs v8版本后toPromise方法弃用, 原因基于Observable与Promise前后返回值不一致的issue
RxJS heads up: toPromise is being deprecated

曾常使用的

1
2
3
4
5
public async loadCategories() {
this.categories = await this.inventoryService
.getCategories()
.toPromise()
}

变更为
1
2
3
4
5
6
import { lastValueFrom } from 'rxjs';
...
public async loadCategories() {
const categories$ = this.inventoryService.getCategories();
this.categories = await lastValueFrom(categories$);
}

rxjs 6中被废弃的toPromise

The lastValueFrom is almost exactly the same as toPromise() meaning that it will resolve with the last value that has arrived when the Observable completes, but with the difference in behavior when Observable completes without emitting a single value. When Observable completes without emitting, toPromise() will successfully resolve with undefined (thus the return type change), while the lastValueFrom will reject with the EmptyError. Thus, the return type of the lastValueFrom is Promise, just like toPromise() had in RxJS 6.lastValueFrom几乎与toPromise() 一个意思,在Observable complete的时候,lastValueFrom会带着Observable最后一个产生的值resolve,但是不同之处在于Observable不带值complete的情况下。
当Observable不产生值并且complete时,toPromise方法会成功resolve为undefined(也就是返回类型改变了,不再是T),但是lastValueFrom会带着EmptyErrorreject。因此,就如同在RxJS 6里一样,它的返回类型是Promise

However, you might want to take the first value as it arrives without waiting an Observable to complete, thus you can use firstValueFrom. The firstValueFrom will resolve a Promise with the first value that was emitted from the Observable and will immediately unsubscribe to retain resources. The firstValueFrom will also reject with an EmptyError if the Observable completes with no values emitted.然而,你可能不等Observable complete,想要使用其第一个生产的值,因此可以用firstValueFrom。firstValueFrom会将Observable第一个生产的值resolve并且立刻unsubscribe保存该值。firstValueFrom也会在Observable没有值产生而complete的情况下带着 EmptyErrorreject。