为了账号安全,请及时绑定邮箱和手机立即绑定

如何停止去抖动的 Rxjs Observable?

如何停止去抖动的 Rxjs Observable?

慕桂英3389331 2022-01-07 10:35:29
我创建了一个 observable,它将在最后一次更改后 3 秒触发,并调用publishChange服务。它可以工作,但我想创建一个doImmediateChange函数,它publishChange立即调用并停止去抖动的 observable。这怎么可能?我的组件:class MyComponent {    private updateSubject = new Subject<string>();    ngOnInit() {        this.updateSubject.pipe(            debounceTime(3000),            distinctUntilChanged()        ).subscribe(val => {            this.srv.publishChange(val);        });    }    doChange(val: string) {        this.updateSubject.next(val);    }    doImmediateChange(val: string) {        // Stop the current updateSubject if debounce is in progress and call publish immediately        // ??        this.srv.publishChange(val);    }}
查看完整描述

3 回答

?
墨色风雨

TA贡献1853条经验 获得超6个赞

您可以debounceTime使用switchMap和进行模拟delay。然后取消内部 ObservabletakeUntil以防止发出等待值。


private updateSubject = new Subject<string>();

private interrupt = new Subject();


ngOnInit() {

  this.updateSubject.pipe(

    switchMap(val => of(val).pipe(

      delay(3000),

      takeUntil(this.interrupt)

    ))

  ).subscribe(val => publish(val));

}


doChange(val: string) {

  this.updateSubject.next(val);

}


doImmediateChange(val: string) {

  this.interrupt.next();

  publish(val);

}

https://stackblitz.com/edit/rxjs-ya93fb


查看完整回答
反对 回复 2022-01-07
?
拉莫斯之舞

TA贡献1820条经验 获得超10个赞

使用竞赛运算符:


第一个完成的observable成为唯一订阅的observable,因此这个递归函数将在一次发射后完成take(1),然后重新订阅() => this.raceRecursive()。


private timed$ = new Subject<string>();

private event$ = new Subject<string>();


ngOnInit() {

  this.raceRecursive()

}


raceRecursive() {

  race(

    this.timed$.pipe(debounceTime(1000)),

    this.event$

  )

    .pipe(take(1)) // force it to complete

    .subscribe(

      val => console.log(val), // srv call here

      err => console.error(err),

      () => this.raceRecursive() // reset it once complete

    )

}


doChange(val: string) {

  this.timed$.next(val)

}


doImmediateChange(val: string) {

  this.event$.next(val)

}



查看完整回答
反对 回复 2022-01-07
?
茅侃侃

TA贡献1842条经验 获得超21个赞

您可以使用debounce和race实现此行为:

使用您提供的代码


private destroy$ = new Subject<void>();

private immediate$ = new Subject<void>();

private updateSubject$ = new Subject<string>();


constructor(private srv: PubSubService) {}


ngOnInit() {

  this.updateSubject$.pipe(

      takeUntil(this.destroy$),

      debounce(() => race(timer(3000), this.immediate$))

  ).subscribe(val => {

      this.srv.publishChange(val);

  });

}


doChange(val: string, immediate?: boolean) {

  this.updateSubject$.next(val);

  if (immediate) this.immediate$.next();

}


// don't forget to unsubscribe

ngOnDestroy() {

  this.destroy$.next();

}

立即发出更改将替换之前的正常更改(即去抖 3 秒),而不会延迟(感谢我们的种族可观察)。

查看完整回答
反对 回复 2022-01-07
  • 3 回答
  • 0 关注
  • 116 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信