
转换操作符
buffer
import { interval, fromEvent } from 'rxjs';
import { buffer } from 'rxjs/operators';
const myInterval = interval(1000);
const bufferBy = fromEvent(document, 'click');
/*
收集由 myInterval 发出的所有值,直到我们点击页面。此时 bufferBy 会发出值以完成缓存。
将自上次缓冲以来收集的所有值传递给数组。
*/
const myBufferedInterval = myInterval.pipe(buffer(bufferBy));
// 例如 输出: [1,2,3] ... [4,5,6,7,8]
const subscribe = myBufferedInterval.subscribe(val =>
console.log(' Buffered Values:', val)
);
bufferCount
bufferSize ),如果指定了第二个参数:startBufferEvery (上一轮索引),意味着每次新一轮的收集会从上一轮指定索引的位置开始复用。
import { interval } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
const source = interval(1000);
// 在发出3个值后,将缓冲的值作为数组传递
const bufferThree = source.pipe(bufferCount(3));
// 输出: [0,1,2]...[3,4,5]
const subscribe = bufferThree.subscribe(val =>
console.log('Buffered Values:', val)
);
import { interval } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(
pluck('clientX'),
/*
* 页面点击3次,达到缓存限度,输出长度为3的数组,类似:[743, 709, 1154]
* 以后每点击一次,服用上一次从位置1开始的数组,类似:[743, 709, 560]
* */
bufferCount(3, 1)
)
.subscribe(res => console.log(res));
bufferTime
bufferTimeSpan ),才将其作为数组发出,如果指定了第二个参数:bufferCreationInterval (间隔时间),就会等待这个时间再次发送下一个流。import { interval } from 'rxjs';
import { bufferTime } from 'rxjs/operators';
const source = interval(500);
// 2秒后,将缓冲值作为数组发出
const example = source.pipe(bufferTime(2000));
// 输出: [0,1,2] [3,4,5,6] [7,8,9,10]...
const subscribe = example.subscribe(val =>console.log(val));
import { interval } from 'rxjs';
import { bufferTime } from 'rxjs/operators';
interval(500)
.pipe(
/*
* 在1秒时推送第一个流:[0]
* 此后,每等待3秒推送下一个:[5,6,7] [11,12,13]...
* */
bufferTime(1000, 3000)
)
.subscribe(res => console.log(res));
bufferToggle
import { interval } from 'rxjs';
import { bufferToggle } from 'rxjs/operators';
const sourceInterval = interval(1000);
const startInterval = interval(5000);
const closingInterval = val => {
console.log(`${val} 开始缓冲! 3秒后关闭`);
return interval(3000);
};
// 每5秒会开启一个新的缓冲区以收集发出的值,3秒后发出缓冲的值,并关闭当前缓冲区
const bufferToggleInterval = sourceInterval.pipe(
bufferToggle(
startInterval,
closingInterval
)
);
const subscribe = bufferToggleInterval.subscribe(val =>
console.log('Emitted Buffer:', val)
);
bufferWhen
Observable 发出值。import { interval } from 'rxjs';
import { bufferWhen } from 'rxjs/operators';
const oneSecondInterval = interval(1000);
const fiveSecondInterval = () => interval(5000);
// 每5秒发出缓冲的值
const bufferWhenExample = oneSecondInterval.pipe(bufferWhen(fiveSecondInterval));
// 输出: [0,1,2,3] [4,5,6,7,8]...
const subscribe = bufferWhenExample.subscribe(val =>
console.log('Emitted Buffer: ', val)
);
map
map ,对源 observable 的每个值进行映射。import { from } from 'rxjs';
import { map } from 'rxjs/operators';
const source = from([1, 2, 3, 4, 5]);
const example = source.pipe(map(val => val + 10));
const subscribe = example.subscribe(val => console.log(val));
mapTo
observable 的每个值映射成一个指定的值。import { interval } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const source = interval(2000);
// 将所有发出值映射成同一个值
const example = source.pipe(mapTo('HELLO WORLD!'));
// 输出: 'HELLO WORLD!'...'HELLO WORLD!'...'HELLO WORLD!'...
const subscribe = example.subscribe(val => console.log(val));
switchMap
observable 发出的每个值,经过处理,映射成新的 observable ,但每次都会取消上一次的 observable ,不管上次 observable 是否完成。此操作符可以取消正在进行中的网络请求!import { fromEvent, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(switchMap((ev) => interval(1000)));
// 每次点击都会重新开始执行interval的流,从0开始打印值
result.subscribe(x => console.log(x));
switchMapTo
observable 发出的每个值,经过处理,映射成固定的值(不限于 observable )import { fromEvent } from 'rxjs';
import { switchMapTo } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(switchMapTo('hello'));
// 每次点击打印出'hello'
result.subscribe(x => console.log(x));
concatMap & mergeMap
observable 发出的每个值,按顺序映射成一个新的 observable。import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';
const source = of(2000, 1000);
// 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
const example = source.pipe(
concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
// 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
console.log(`With concatMap: ${val}`)
);
// 展示 concatMap 和 mergeMap 之间的区别
const mergeMapExample = source
.pipe(
// 只是为了确保 meregeMap 的日志晚于 concatMap 示例
delay(5000),
mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
)
.subscribe(val => console.log(`With mergeMap: ${val}`));
concatMap 和 mergeMap 之间的区别:concatMap 之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。对比的话, mergeMap 会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。concatMapTo & mergeMapTo
observable 发出的每个值,按顺序映射成一个固定值(不限于 observable )。import { fromEvent, interval } from 'rxjs';
import { concatMapTo, mergeMapTo, take } from 'rxjs/operators';
fromEvent(document, 'click').pipe(
concatMapTo(interval(1000).pipe(take(2)))
).subscribe(
// 每次点击,隔1秒输出0,再间隔1秒输出1,流发射结束
res => console.log(res)
);
fromEvent(document, 'click').pipe(
mergeMapTo(interval(1000).pipe(take(2)))
).subscribe(
// 每次点击,都会出发interval发射流
res => console.log(res)
);
scan
observable 上应用累加器函数,每次累加后返回一个 observable 。// 随着时间的推移计算总数
import { of, Subject } from 'rxjs';
import { scan } from 'rxjs/operators';
const source = of(1, 2, 3);
// 基础的 scan 示例,从0开始,随着时间的推移计算总数
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
// 输出每次累加值: 1,3,6
const subscribe = example.subscribe(val => console.log(val));
// 对对象进行合并
const subject = new Subject();
subject.pipe(
scan((acc, value) => Object.assign({}, acc, value), {})
).subscribe(
res => console.log(res)
);
// subject 发出的值会被添加成对象的属性
// {name: 'Joe'}
subject.next({ name: 'Joe' });
// {name: 'Joe', age: 30}
subject.next({ age: 30 });
// {name: 'Joe', age: 30, favoriteLanguage: 'JavaScript'}
subject.next({ favoriteLanguage: 'JavaScript' });
mergeScan
scan , 在源 Observable 上应用累加器函数,每次累加后返回一个 Observable 。import { of } from 'rxjs';
import { mergeScan } from 'rxjs/operators';
of(1, 2, 3).pipe(
mergeScan((acc, value) => of(acc + value), 0)
).subscribe(
res => console.log(res)
);
reduce
Array.prototype.reduce() , 也类似 scan ,只不过会等源 observable 完成时将结果发出。import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';
const source = of(1, 2, 3);
const example = source.pipe(reduce((acc, val) => acc + val));
// 输出最后累加结果:6
const subscribe = example.subscribe(val => console.log('Sum:', val));
pluck
import { from } from 'rxjs';
import { pluck } from 'rxjs/operators';
const source = from([
{ name: 'Joe', age: 30, job: { title: 'Developer', language: 'JavaScript' } },
// 当找不到 job 属性的时候会返回 undefined
{ name: 'Sarah', age: 35 }
]);
// 提取 job 中的 title 属性
const example = source.pipe(pluck('job', 'title'));
// 输出: "Developer" , undefined
const subscribe = example.subscribe(val => console.log(val));
pairwise
import { interval } from 'rxjs';
import { pairwise } from 'rxjs/operators';
// 输出:[0, 1] [1, 2]...
interval(1000).pipe(pairwise()).subscribe(res => console.log(res));
groupBy
Observable 按条件按组发出。(这个就很有用了)import { from } from 'rxjs';
import { groupBy, mergeMap, toArray } from 'rxjs/operators';
const people = [
{ name: 'Sue', age: 25 },
{ name: 'Joe', age: 30 },
{ name: 'Frank', age: 25 },
{ name: 'Sarah', age: 35 }
];
// 发出每个 people
const source = from(people);
// 根据 age 分组
const example = source.pipe(
groupBy(person => person.age),
// 为每个分组返回一个数组
mergeMap(group => group.pipe(toArray()))
);
/*
输出:
[{age: 25, name: "Sue"},{age: 25, name: "Frank"}]
[{age: 30, name: "Joe"}]
[{age: 35, name: "Sarah"}]
*/
const subscribe = example.subscribe(val => console.log(val));
exhaust
Observable 为完成前,忽略所有其它的 Observable 。import { fromEvent, interval } from 'rxjs';
import { exhaust, map, take } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
map((ev) => interval(1000).pipe(take(5))),
);
const result = higherOrder.pipe(exhaust());
// 不管点击多少次,都会先完成上一次未完成的interval的流
result.subscribe(x => console.log(x));
exhaustMap
map + exhaust的结合,比如上个例子可简写为:import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
exhaustMap(ev => interval(1000).pipe(take(5)))
);
higherOrder.subscribe(x => console.log(x));


最新评论