如何创建一个Flux/Publisher以用于数据流处理?

5

我正在使用轮询方式定期获取数据,新数据可能随时到达。我希望向客户端提供一种反应式接口。因此,我想创建一个发布器(Flux?),当有新数据可用时,它会发布新数据并通知订阅者。我该怎么做呢?我看到的所有Flux示例都是针对已知/可用数据的情况。实际上,我想要像基于队列的Flux,并且我的轮询线程可以在发现新数据时继续填充队列。


1
好的,看起来我需要使用一个 sink。那么,也许是 Flux#create(sink) 或 Flux#push(sink)。 - Pratik
1个回答

5

如果您需要简单的东西,可以使用DirectProcessor。这并不是最复杂的Flux Sink,但它可以帮助您一部分。

我写了一个快速示例:

Flux<String> hot = DirectProcessor.create<String>()
hot.onNext("Hello")//not printed
hot.subscribe(it -> System.out.println(it))

hot.onNext("Goodbye")//printed
Thread.sleep(100)
hot.onNext("foo")//printed

DirectProcessor实现了Flux,因此您可以像使用Flux一样使用它。

正如您所看到的,在订阅hotsource之前添加的元素将不会传递给subscribe。

查看其他帖子,Flux#create和Flux#generate可能是一个很好的开始。Difference Between Flux.create and Flux.generate <- 这将为您提供更复杂和可控的Flux操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接