作者:属于小草的树洞 | 来源:互联网 | 2023-06-23 20:03
我的脑袋围绕着 Flux Sinks,无法理解更高层次的图片。使用时Sinks.Many tryEmitNext
,该函数会告诉我是否存在争用以及在失败的情况下我该怎么办(FailFast/Handler)。
但是有没有一个简单的结构可以让我从多个线程安全地发出元素。例如,与其让用户知道存在争用,我应该再试一次,不如将元素添加到队列(mpmc、mpsc 等)中,并且仅在队列已满时才通知。
现在我可以自己添加一个队列来缓解这个问题,但这似乎是一个常见的用例。我想我在这里遗漏了一点。
回答
我遇到了同样的问题,从支持多线程安全发射的处理器迁移。我使用这个自定义 EmitFailureHandler 来执行 EmitFailureHandler 文档建议的繁忙循环。
public static EmitFailureHandler etryOnNonSerializedElse(EmitFailureHandler fallback){
return (signalType, emitResult) -> {
if (emitResult == EmitResult.FAIL_NON_SERIALIZED) {
LockSupport.parkNanos(10);
return true;
} else
return fallback.onEmitFailure(signalType, emitResult);
};
}
关于 3.4.0 实现有各种令人困惑的方面
- 这意味着除非使用 Unsafe 变体,否则接收器支持序列化发射,但实际上所有序列化版本都会在并发发射的情况下快速失败。
- Flux.Create 提供的 Sink 确实支持线程安全发射。
我希望图书馆在某个时候会提供一个可靠的工程替代方案。