作者:半邪书生66_516 | 来源:互联网 | 2023-09-18 16:58
我有一个可观察的物体,它定期发射出元素。在这些元素上,我执行一种快速和一种缓慢的操作。我想要的是在忙时为慢速观察者添加新元素。有什么办法可以用Rx来实现这一目标,而不是将标志保持在慢速运行状态?
我在Reactive扩展中还很新,如果我的假设有任何问题,请更正我。
let tick = Observable.interval(.seconds(1),scheduler: SerialDispatchQueueScheduler(qos: .background)).share()
tick.subscribe {
print("fast observer \($0)")
}.disposed(by: disposeBag)
// observing in another queue so that it does not block the source
tick.observeon(SerialDispatchQueueScheduler(qos: .background))
.subscribe {
print("slow observer \($0)")
sleep(3) // cpu-intensive task
}.disposed(by: disposeBag)
为此,flatMap是您的朋友。每当您要删除事件时(无论是新事件时的当前事件,还是在处理当前事件时的后续事件),都应使用flatMap。有关更多信息,请参见我的文章:RxSwift’s Many Faces of FlatMap
您在这里:
let tick = Observable.interval(.seconds(1),scheduler: MainScheduler.instance).share()
func cpuLongRunningTask(_ input: Int) -> Observable {
return Observable.create { observer in
print("start task")
sleep(3)
print("finish task")
observer.onNext(input)
observer.onCompleted()
return Disposables.create { /* cancel the task if possible */ }
}
}
tick
.subscribe {
print("fast \($0)")
}
.disposed(by: disposeBag)
tick
.flatMapFirst {
// subscribing in another scheduler so that it does not block the source
cpuLongRunningTask($0)
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
}
.observeOn(MainScheduler.instance) // make sure the print happens on the main thread
.subscribe {
print("slow \($0)")
}
.disposed(by: disposeBag)
示例输出如下:
fast next(0)
start task
fast next(1)
fast next(2)
fast next(3)
finish task
slow next(0)
fast next(4)
start task
fast next(5)
fast next(6)
fast next(7)
finish task
slow next(4) <-- slow ignored the 1,2,and 3 values.
,
恐怕没有一个简单的解决方案。您描述的问题与背压有关,不幸的是,RxSwift不为此提供支持(Apple Combine提供)。通常,您将必须使用以下过滤运算符之一来手动处理这种情况:debounce
,throttle
或filter
。
通过使用debounce
或throttle
,您可能需要知道操作的确切持续时间,而实际情况可能并非总是如此。
如您所说,通过使用filter
,可以在开始长时间运行的操作之前检查设置的标志。