介绍
RxJS(Reactive Extensions for JavaScript)是一个用于处理异步数据流的库。它允许开发者使用观察者模式(Observer Pattern)来订阅(subscribe)数据流,并在数据到达时执行特定的操作。RxJS 是 ReactiveX 项目的一部分,该项目为多种编程语言提供了类似的库,如 Java、.NET、C++ 等等。
以下是关于 RxJS 的一些官方资源和高质量的教程推荐,这些资源可以帮助你从基础到高级逐步掌握 RxJS。
社区资源
1. 官方文档
2. 社区和生态
3. 教程和文章
核心概念
Observables(可观察对象):
- Observables 是 RxJS 中的核心概念,可以认为是能够发出多个值或事件的“容器”。这些值可以随着时间推移异步地发出。Observables 可以被订阅,当 Observable 发出新的值时,所有订阅者都会收到通知。
Observers(观察者):
- Observers 是订阅了 Observable 的对象。它们定义了如何响应来自 Observable 的通知,通常包括三个回调函数:
next
(接收每个值)、error
(处理错误)、complete
(当 Observable 完成时调用)。
- Observers 是订阅了 Observable 的对象。它们定义了如何响应来自 Observable 的通知,通常包括三个回调函数:
Operators(操作符):
- Operators 是 RxJS 中的一类方法,用于转换 Observable 的输出。例如,
map
操作符可以用来将 Observable 发出的每个值通过一个函数进行转换;filter
操作符可以用来过滤 Observable 发出的值。RxJS 提供了大量的内置操作符,支持各种复杂的异步数据流处理需求。
- Operators 是 RxJS 中的一类方法,用于转换 Observable 的输出。例如,
Subjects(主题):
- Subjects 是 RxJS 中同时充当 Observer 和 Observable 的特殊对象。作为 Observer,它可以订阅其他 Observable;作为 Observable,它可以被多个 Observer 订阅。Subjects 被用来实现多播(multicasting),即一个值可以被多个 Observer 同时接收。
Schedulers(调度器):
- Schedulers 控制 Observable 执行异步操作的方式和时机。不同的 Scheduler 可以控制任务是在当前线程立即执行、在未来的某个时间点执行还是在一个新的线程中执行。
基本使用
下面是一个简单的 RxJS 示例,演示了如何创建一个 Observable 并订阅它:
1 | import { Observable } from 'rxjs'; |
在这个例子中,我们创建了一个 Observable,它会立即发出三个值(1, 2, 3),然后在一秒钟后发出第四个值(4)并完成。我们还定义了一个 Observer 来监听 Observable 发出的值,并在控制台打印出来。
高级特性
1. 操作符组合
RxJS 提供了大量的操作符,可以通过组合这些操作符来实现复杂的数据流处理。常见的组合方式包括:
管道(Pipelines):使用
pipe
方法将多个操作符串联起来,形成一个数据处理管道。1
2
3
4
5
6
7
8
9
10import { of } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5);
source.pipe(
filter(x => x % 2 === 0), // 过滤偶数
map(x => x * 2), // 将每个值乘以2
tap(x => console.log('Processed value:', x)) // 打印处理后的值
).subscribe(x => console.log('Final value:', x));
3. 多播(Multicasting)
多播允许一个 Observable 的输出被多个 Observer 共享,而不是为每个 Observer 重新生成数据。常用的多播操作符包括 share
, publish
, refCount
等。
**
share
**:使 Observable 多播,当第一个订阅者订阅时开始执行,后续订阅者共享同一个数据流。1
2
3
4
5
6
7
8
9
10import { interval } from 'rxjs';
import { share } from 'rxjs/operators';
// 创建一个每秒发出一个值的 Observable,并通过 share 操作符将其转换为热 Observable
const source = interval(1000).pipe(share());
const subscription1 = source.subscribe(value => console.log('Subscriber 1:', value));
setTimeout(() => {
const subscription2 = source.subscribe(value => console.log('Subscriber 2:', value));
}, 2500);
4. 调度器(Schedulers)
调度器控制 Observable 的执行时间和方式。常用的调度器包括 asyncScheduler
, queueScheduler
, animationFrameScheduler
等。
**
asyncScheduler
**:在下一个事件循环中执行任务。1
2
3
4import { of } from 'rxjs';
import { asyncScheduler } from 'rxjs';
of(1, 2, 3, 4, 5, asyncScheduler).subscribe(value => console.log(value));
最佳实践
避免内存泄漏:确保在不再需要时取消订阅,特别是在组件销毁时。
1
2
3
4
5
6
7
8
9
10
11
12
13import { Subscription } from 'rxjs';
let subscription: Subscription;
function subscribeToObservable() {
subscription = someObservable.subscribe(value => console.log(value));
}
function unsubscribeFromObservable() {
if (subscription) {
subscription.unsubscribe();
}
}使用
pipe
组合操作符:保持代码的可读性和可维护性。1
2
3
4
5
6
7
8
9import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5);
source.pipe(
filter(x => x % 2 === 0),
map(x => x * 2)
).subscribe(value => console.log(value));处理错误:使用
catchError
捕获和处理错误,确保应用程序的健壮性。1
2
3
4
5
6
7
8
9
10
11import { throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
const source = throwError('An error occurred');
source.pipe(
catchError(error => {
console.error('Error caught:', error);
return of('Default value');
})
).subscribe(value => console.log(value));
模块化和代码组织
将 RxJS 代码模块化,按照功能和职责进行组织,可以提高代码的可读性和可维护性。
模块化示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// services/data.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
({
providedIn: 'root'
})
export class DataService {
constructor(private http: HttpClient) {}
getData(): Observable<any> {
return this.http.get('/api/data').pipe(
map(response => response.data),
catchError(error => of('Error fetching data'))
);
}
}
// components/my-component.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';
import { DataService } from '../services/data.service';
({
selector: 'app-my-component',
template: `<div>{{ data }}</div>`
})
export class MyComponent implements OnInit, OnDestroy {
data: any;
private subscription: Subscription;
constructor(private dataService: DataService) {}
ngOnInit() {
this.subscription = this.dataService.getData().subscribe(data => {
this.data = data;
});
}
ngOnDestroy() {
if (this.subscription) {
this.subscription.unsubscribe();
}
}
}
这些最佳实践可以帮助你在使用 RxJS 时构建更加高效、可维护和健壮的应用程序。
实际应用案例
表单验证:在 Angular 中,RxJS 常用于处理表单验证逻辑,可以实时监听表单输入并进行验证。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import { FormBuilder, FormGroup, Validators } from '@angular/forms';
import { Observable } from 'rxjs';
import { map, debounceTime, distinctUntilChanged } from 'rxjs/operators';
class MyComponent {
form: FormGroup;
username$: Observable<string>;
constructor(private fb: FormBuilder) {
this.form = this.fb.group({
username: ['', [Validators.required, Validators.minLength(3)]]
});
this.username$ = this.form.get('username').valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
map(username => (username.length < 3 ? 'Username must be at least 3 characters' : null))
);
}
}数据流管理:在 React 应用中,可以使用 RxJS 来管理复杂的数据流,特别是在需要处理多个异步请求的情况下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import { of, from } from 'rxjs';
import { mergeMap, catchError, map } from 'rxjs/operators';
import axios from 'axios';
const fetchData = (id) => {
return from(axios.get(`/api/data/${id}`)).pipe(
map(response => response.data),
catchError(error => of(`Error fetching data for id ${id}: ${error.message}`))
);
};
const source = of(1, 2, 3, 4, 5).pipe(
mergeMap(id => fetchData(id))
);
source.subscribe(data => console.log(data));
高级主题
1. 背压(Backpressure)
背压是指在数据生产速度超过消费速度时,如何控制数据流的技术。RxJS 提供了几种处理背压的机制:
**
buffer
**:将数据缓冲到数组中,直到满足某些条件时再发出。1
2
3
4
5
6
7import { interval } from 'rxjs';
import { buffer, take } from 'rxjs/operators';
const source = interval(1000).pipe(take(10));
const bufferedSource = source.pipe(buffer(interval(5000)));
bufferedSource.subscribe(values => console.log('Buffered values:', values));**
throttleTime
**:在一定时间内忽略后续的值,只发出第一个值。1
2
3
4
5
6
7import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(throttleTime(1000));
result.subscribe(x => console.log(x));**
debounceTime
**:在一定时间内没有新值发出时才发出最后一个值。1
2
3
4
5
6
7import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
const keyups = fromEvent(document, 'keyup');
const result = keyups.pipe(debounceTime(300));
result.subscribe(x => console.log(x));
2. 虚拟时间调度器(Virtual Time Scheduler)
虚拟时间调度器用于测试异步代码,可以在一个测试环境中模拟时间的流逝。
**
TestScheduler
**:用于编写单元测试,可以精确控制时间。1
2
3
4
5
6
7
8
9
10
11
12
13import { TestScheduler } from 'rxjs/testing';
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const source = of(1, 2, 3).pipe(delay(1000));
expectObservable(source).toBe('1000ms (3|)', { 3: [1, 2, 3] });
});
与其他框架的集成
1. Angular
在 Angular 中,RxJS 是核心依赖之一,广泛用于处理异步数据流和表单验证。
- **
HttpClient
**:Angular 的 HTTP 客户端返回 Observable。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
()
export class DataService {
constructor(private http: HttpClient) {}
getData(): Observable<any> {
return this.http.get('/api/data').pipe(
map(response => response.data),
catchError(error => of('Error fetching data'))
);
}
}
2. React
在 React 中,可以使用 useEffect
和 useMemo
等 Hook 来管理 RxJS 的订阅和取消订阅。
**
useEffect
**:管理订阅和取消订阅。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import React, { useEffect, useState } from 'react';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
function App() {
const [count, setCount] = useState(0);
useEffect(() => {
const subscription = interval(1000).pipe(take(10)).subscribe(value => {
setCount(value);
});
return () => {
subscription.unsubscribe();
};
}, []);
return <div>{count}</div>;
}
export default App;
特定使用场景
1. WebSockets 和 Server-Sent Events (SSE)
RxJS 可以很好地处理 WebSocket 和 SSE 这样的实时数据流。
WebSocket:
1
2
3
4
5
6
7
8
9
10
11
12
13import { webSocket } from 'rxjs/webSocket';
const socket = webSocket('ws://localhost:8080');
// 发送消息
socket.next({ action: 'join', room: 'room1' });
// 接收消息
socket.subscribe(
msg => console.log('Message received:', msg),
err => console.error('Error:', err),
() => console.log('Complete')
);**Server-Sent Events (SSE)**:
1
2
3
4
5
6
7
8
9
10
11import { fromEvent } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const eventSource = new EventSource('/sse-endpoint');
const sse$ = fromEvent(eventSource, 'message').pipe(
map(event => JSON.parse(event.data)),
filter(data => data.type === 'update')
);
sse$.subscribe(data => console.log('SSE Data:', data));
错误处理和恢复
1. retryWhen
retryWhen
允许自定义重试逻辑,例如在特定条件下重试。
**
retryWhen
**:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import { of, throwError } from 'rxjs';
import { retryWhen, delay, takeWhile } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5).pipe(
map(val => {
if (val === 3) {
throw new Error('Error at 3');
}
return val;
})
);
source.pipe(
retryWhen(errors => errors.pipe(
delay(1000),
takeWhile((_, index) => index < 2)
))
).subscribe(
value => console.log(value),
error => console.error('Final error:', error)
);
2. 错误处理
RxJS 提供了多种处理错误的方法,包括 catchError
和 retry
操作符。
**
catchError
**:捕获并处理错误,可以选择重新抛出错误或返回一个新的 Observable。1
2
3
4
5
6
7
8
9
10
11import { throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
const source = throwError('An error occurred');
source.pipe(
catchError(error => {
console.error('Error caught:', error);
return of('Default value'); // 返回默认值
})
).subscribe(value => console.log(value));**
retry
**:在发生错误时自动重试指定次数。1
2
3
4
5
6
7
8
9
10
11
12
13import { of, throwError } from 'rxjs';
import { concat, retry } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5).pipe(
concat(throwError('An error occurred'))
);
source.pipe(
retry(2) // 重试2次
).subscribe({
next: value => console.log(value),
error: error => console.error('Final error:', error)
});
日志记录
在生产环境中,良好的错误处理和日志记录机制可以大大提高系统的稳定性和可维护性。
错误处理和日志记录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import { of } from 'rxjs';
import { map, catchError, tap } from 'rxjs/operators';
function fetchData(url: string): Observable<any> {
return of({ url, data: {} }).pipe(
map(response => response.data),
tap(data => console.log('Fetched data:', data)),
catchError(error => {
console.error('Error fetching data:', error);
return of('Error fetching data');
})
);
}
const data$ = fetchData('/api/data');
data$.subscribe(data => console.log(data));
设计模式
1. 单例模式(Singleton Pattern)
在 RxJS 中,可以使用 BehaviorSubject
或 ReplaySubject
来实现单例模式,确保全局状态的一致性。
BehaviorSubject
单例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import { BehaviorSubject } from 'rxjs';
class AppState {
private static instance: AppState;
private subject = new BehaviorSubject<{ count: number }>({ count: 0 });
private constructor() {}
static getInstance(): AppState {
if (!AppState.instance) {
AppState.instance = new AppState();
}
return AppState.instance;
}
getState(): BehaviorSubject<{ count: number }> {
return this.subject;
}
increment() {
const currentState = this.subject.value;
this.subject.next({ count: currentState.count + 1 });
}
}
const appState = AppState.getInstance();
appState.getState().subscribe(state => console.log('State:', state));
appState.increment(); // State: { count: 1 }
appState.increment(); // State: { count: 2 }
2. 工厂模式(Factory Pattern)
工厂模式可以用于创建复杂的 Observable 对象,提高代码的可复用性和可维护性。
- Observable 工厂:
1
2
3
4
5
6
7
8
9
10
11
12import { of, Observable } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
function createDataObservable(url: string): Observable<any> {
return of({ url, data: {} }).pipe(
map(response => response.data),
catchError(error => of('Error fetching data'))
);
}
const data$ = createDataObservable('/api/data');
data$.subscribe(data => console.log(data));
应用场景
- 用户界面事件:RxJS 非常适合处理用户界面中的事件,比如点击、滚动等,可以方便地组合多个事件源。
- 异步数据请求:处理 AJAX 请求,特别是需要合并、取消或者重试请求的情况。
- 实时数据流:处理 WebSocket 或者 Server-Sent Events (SSE) 这样的实时数据流。
RxJS 是一个功能强大且灵活的工具,适用于需要处理复杂异步逻辑的应用程序。然而,由于其学习曲线相对较陡峭,初学者可能需要一些时间来熟悉其核心概念和最佳实践。
1. 无限滚动
在无限滚动场景中,RxJS 可以帮助你处理滚动事件并动态加载更多数据。
无限滚动示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import { fromEvent } from 'rxjs';
import { filter, map, throttleTime, switchMap, scan, takeWhile } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
class InfiniteScrollComponent {
private page = 1;
private loading = false;
constructor(private http: HttpClient) {
fromEvent(window, 'scroll').pipe(
filter(() => this.isNearBottom()),
throttleTime(300),
map(() => this.page++),
switchMap(page => this.loadMoreData(page)),
scan((acc, data) => [...acc, ...data], []),
takeWhile(() => !this.loading)
).subscribe(data => {
this.appendData(data);
});
}
isNearBottom(): boolean {
return window.innerHeight + document.documentElement.scrollTop >= document.documentElement.offsetHeight - 100;
}
loadMoreData(page: number): Observable<any[]> {
this.loading = true;
return this.http.get<any[]>(`/api/data?page=${page}`).pipe(
tap(() => this.loading = false)
);
}
appendData(data: any[]) {
// 将数据添加到页面
console.log('Appending data:', data);
}
}
2. 复杂数据流处理
通过组合多个操作符,可以实现复杂的业务逻辑,例如处理多个数据源的合并和转换。
合并多个数据源:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import { combineLatest, of } from 'rxjs';
import { map, switchMap, catchError } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
class DataService {
constructor(private http: HttpClient) {}
fetchCombinedData(): Observable<any> {
const user$ = this.http.get('/api/user');
const posts$ = this.http.get('/api/posts');
return combineLatest([user$, posts$]).pipe(
map(([user, posts]) => ({ user, posts })),
switchMap(combinedData => this.enrichData(combinedData)),
catchError(error => of('Error fetching combined data'))
);
}
enrichData(data: any): Observable<any> {
// 模拟数据增强
return of({ ...data, extra: 'Enriched data' }).pipe(delay(500));
}
}
const dataService = new DataService(httpClient);
dataService.fetchCombinedData().subscribe(data => console.log(data));
2. 条件处理
使用 iif
操作符可以根据条件选择不同的 Observable。
条件处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import { iif, of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
function fetchData(condition: boolean): Observable<any> {
return iif(
() => condition,
of({ data: 'Condition met' }),
of({ data: 'Condition not met' })
).pipe(
map(response => response.data),
catchError(error => of('Error fetching data'))
);
}
fetchData(true).subscribe(data => console.log(data)); // Condition met
fetchData(false).subscribe(data => console.log(data)); // Condition not met
1. concatMap
concatMap
用于顺序处理 Observable,确保前一个 Observable 完成后再处理下一个。
**
concatMap
**:1
2
3
4
5
6
7
8import { of } from 'rxjs';
import { concatMap } from 'rxjs/operators';
const source = of(1, 2, 3);
source.pipe(
concatMap(val => of(val * 10).pipe(delay(1000)))
).subscribe(value => console.log(value));
2. switchMap
switchMap
用于取消前一个 Observable 并切换到新的 Observable,适用于最新的请求覆盖旧的请求。
**
switchMap
**:1
2
3
4
5
6
7
8import { of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
const source = of(1, 2, 3);
source.pipe(
switchMap(val => of(val * 10).pipe(delay(1000)))
).subscribe(value => console.log(value));
3. exhaustMap
exhaustMap
用于忽略新的 Observable,直到当前的 Observable 完成。
- **
exhaustMap
**:1
2
3
4
5
6
7
8import { of } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';
const source = of(1, 2, 3);
source.pipe(
exhaustMap(val => of(val * 10).pipe(delay(1000)))
).subscribe(value => console.log(value));
1. 组合多个操作符
通过组合多个操作符,可以实现复杂的业务逻辑。
组合多个操作符:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import { of } from 'rxjs';
import { map, filter, switchMap, catchError, finalize } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0), // 过滤偶数
map(x => x * 2), // 将每个值乘以2
switchMap(x => this.fetchData(x)), // 异步获取数据
catchError(error => {
console.error('Error:', error);
return of('Default value');
}),
finalize(() => {
console.log('Observable completed');
})
);
source.subscribe(value => console.log(value));
2. 使用 withLatestFrom
withLatestFrom
操作符可以将多个 Observable 的最新值组合在一起。
withLatestFrom
操作符:1
2
3
4
5
6
7
8
9
10import { of, interval } from 'rxjs';
import { withLatestFrom, map } from 'rxjs/operators';
const source1 = interval(1000);
const source2 = of(1, 2, 3);
source1.pipe(
withLatestFrom(source2),
map(([value1, value2]) => `Value1: ${value1}, Value2: ${value2}`)
).subscribe(value => console.log(value));
性能优化
1. 避免不必要的订阅
尽量减少不必要的订阅,特别是在组件销毁时取消订阅,以防止内存泄漏。
**使用
takeUntil
**:在组件销毁时自动取消订阅。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
class MyComponent {
private destroy$ = new Subject<void>();
ngOnInit() {
this.someObservable.pipe(takeUntil(this.destroy$)).subscribe(value => {
// 处理值
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
2. 使用 shareReplay
shareReplay
可以缓存最新的值并在新的订阅者订阅时立即发出,从而减少重复计算。
**
shareReplay
**:缓存最新的值并共享给新的订阅者。1
2
3
4
5
6
7import { of } from 'rxjs';
import { shareReplay } from 'rxjs/operators';
const source = of(1, 2, 3).pipe(shareReplay(1));
const subscription1 = source.subscribe(value => console.log('Subscriber 1:', value));
const subscription2 = source.subscribe(value => console.log('Subscriber 2:', value));
3. 使用 combineLatest
和 forkJoin
combineLatest
和 forkJoin
可以有效地管理多个 Observable 的并发执行,减少不必要的订阅和数据处理。
**
combineLatest
**:1
2
3
4
5
6
7
8
9import { combineLatest, of } from 'rxjs';
import { map } from 'rxjs/operators';
const obs1 = of(1, 2, 3);
const obs2 = of('a', 'b', 'c');
combineLatest([obs1, obs2]).pipe(
map(([num, letter]) => ({ num, letter }))
).subscribe(value => console.log(value));**
forkJoin
**:1
2
3
4
5
6
7
8
9import { forkJoin, of } from 'rxjs';
import { map } from 'rxjs/operators';
const obs1 = of(1, 2, 3);
const obs2 = of('a', 'b', 'c');
forkJoin([obs1, obs2]).pipe(
map(([num, letter]) => ({ num, letter }))
).subscribe(value => console.log(value));
2. 使用 asObservable
封装 Subject
asObservable
可以将 Subject
转换为 Observable
,隐藏内部实现,提高代码的安全性和可维护性。
**
asObservable
**:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import { Subject } from 'rxjs';
class DataService {
private subject = new Subject<number>();
get data$(): Observable<number> {
return this.subject.asObservable();
}
sendData(value: number) {
this.subject.next(value);
}
}
const dataService = new DataService();
dataService.data$.subscribe(value => console.log('Received:', value));
dataService.sendData(1); // Received: 1
dataService.sendData(2); // Received: 2
特定领域的应用
1. 用户界面交互
RxJS 在处理用户界面交互方面非常强大,可以轻松处理复杂的用户输入和事件流。
鼠标拖动示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import { fromEvent } from 'rxjs';
import { map, pairwise, filter, takeUntil } from 'rxjs/operators';
const mouseDown$ = fromEvent(document, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');
const drag$ = mouseDown$.pipe(
map(event => ({
startX: event.clientX,
startY: event.clientY,
startEvent: event
})),
switchMap(start => mouseMove$.pipe(
map(moveEvent => ({
startX: start.startX,
startY: start.startY,
moveX: moveEvent.clientX,
moveY: moveEvent.clientY
})),
takeUntil(mouseUp$)
))
);
drag$.subscribe(dragEvent => {
const dx = dragEvent.moveX - dragEvent.startX;
const dy = dragEvent.moveY - dragEvent.startY;
console.log(`Drag: dx=${dx}, dy=${dy}`);
});
2. 定时任务
RxJS 可以轻松处理定时任务,例如定期轮询数据或执行定时任务。
- 定期轮询数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import { interval } from 'rxjs';
import { switchMap, catchError, retryWhen, delay, takeWhile } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
class PollingService {
constructor(private http: HttpClient) {}
pollData(): Observable<any> {
return interval(5000).pipe(
switchMap(() => this.http.get('/api/data')),
retryWhen(errors => errors.pipe(
delay(1000),
takeWhile((_, index) => index < 3)
)),
catchError(error => of('Error fetching data'))
);
}
}
const pollingService = new PollingService(httpClient);
pollingService.pollData().subscribe(data => console.log(data));
高级调试技巧
1. 使用 debug
操作符
debug
操作符可以帮助你在开发过程中调试 Observable 的值。
debug
操作符:1
2
3
4
5
6
7
8import { of } from 'rxjs';
import { debug } from 'rxjs-debug';
const source = of(1, 2, 3).pipe(
debug('Source Observable')
);
source.subscribe(value => console.log(value));
2. 使用 tap
和 do
操作符
tap
操作符可以在不改变数据流的情况下插入调试代码,do
操作符在 RxJS 6 之后被 tap
替代。
tap
操作符:1
2
3
4
5
6
7
8import { of } from 'rxjs';
import { tap } from 'rxjs/operators';
const source = of(1, 2, 3).pipe(
tap(value => console.log('Processing value:', value))
);
source.subscribe(value => console.log('Final value:', value));
1. 使用 Marble Diagrams
Marble Diagrams 是一种可视化 RxJS 数据流的工具,可以帮助你更好地理解复杂的 Observable 操作。
Marble Diagrams:
1
2
3
4
5
6
7
8
9
10
11
12
13import { of } from 'rxjs';
import { marbleTesting } from 'rxjs-marbles/jest';
describe('Observable', () => {
it('should map values correctly', marbleTesting(({ cold, expectObservable }) => {
const source = cold('--a--b--c--|', { a: 1, b: 2, c: 3 });
const expected = '--x--y--z--|';
const result = source.pipe(map(val => val * 10));
expectObservable(result).toBe(expected, { x: 10, y: 20, z: 30 });
}));
});
2. 使用 rxjs-visualize
rxjs-visualize
是一个浏览器扩展,可以帮助你可视化 RxJS 的数据流。
**安装
rxjs-visualize
**:- 在 Chrome Web Store 中搜索并安装
rxjs-visualize
。 - 在代码中引入
rxjs-visualize
:1
import 'rxjs-visualize/dist/side-effect';
- 在 Chrome Web Store 中搜索并安装