在 Reactor 中并行分发 `groupBy` 分组

9

我正在学习Reactor,我想知道如何实现特定的行为。 假设我有一系列传入的消息。 每个消息都与某个实体相关联并包含一些数据。

interface Message {
    String getEntityId();
    Data getData();
}

不同实体相关的消息可以并行处理。然而,任何单个实体的消息必须一次处理一个,即在实体 "abc" 的消息 1 处理完成之前不能开始处理实体 "abc" 的消息 2。 在处理消息时,应缓冲该实体的其他消息。 其他实体的消息可以无阻地进行。 可以将其视为每个实体有一个线程运行以下代码:

public void run() {
    for (;;) {
        // Blocks until there's a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

如何在React中实现非阻塞操作?总消息速率可能很高,但每个实体的消息速率将非常低。实体集可能非常大,并且不一定事先知道。

我猜它可能看起来像这样,但我不确定。

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }
2个回答

6
使用ProjectReactor,您可以以以下方式解决此问题:
@Test
public void testMessages() {
    Flux.fromStream(incomingMessages())
            .groupBy(Message::getEntityId)
            .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
            .subscribe( //create consumer for main stream
                    stream ->
                            stream.subscribe(this::processMessage) // create consumer for group stream
            );
}

public Stream<Message> incomingMessages() {
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
    private final int id;
    private final int entityId;

    public Message(int id, int entityId) {
        this.id = id;
        this.entityId = entityId;
    }

    public int getId() {
        return id;
    }

    public int getEntityId() {
        return entityId;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", entityId=" + entityId +
                '}';
    }
}

我认为类似的解决方案可以在 RxJava 中找到。


如果您将flux传递给调用publishOn的其他服务(例如Spring Reactive Mongo-它使用单独的线程保存到数据库),则不起作用。即使某些人再次在流上调用publishOn,是否有可能强制一次处理一个消息? - Mr Jedi

5

我们在项目中遇到了同样的问题。具有相同ID的实体必须按顺序处理,但具有不同ID的实体可以并行处理。

解决方案非常简单。我们开始使用concatMap而不是flatMap。从concatMap文档中可以看到:

 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux}, sequentially and
 * preserving order using concatenation.

代码示例:

public void receive(Flux<Data> data) {
    data
        .groupBy(Data::getPointID)
        .flatMap(service::process)
        .onErrorContinue(Logging::logError)
        .subscribe();

}

处理方法:

Flux<SomeEntity> process(Flux<Data> dataFlux) {
    return dataFlux
        .doOnNext(Logging::logReceived)
        .concatMap(this::proceedDefinitionsSearch)
        .doOnNext(Logging::logDefSearch)
        .flatMap(this::processData)
        .doOnNext(Logging::logDataProcessed)
        .concatMap(repository::save)
        .doOnNext(Logging::logSavedEntity);
}

我对这里的用例有点困惑,为什么我们要嵌套使用concatMap。根据我在这里看到的 - https://stackoverflow.com/questions/68542082/reactor-groupby-with-parallelism-runs-on-same-thread - 这对我来说是有道理的 - 如果我们同时使用groupBy和flatMap - 组内的项目会按顺序进行处理 - 根据我的测试结果。 - alext
@alext 我是在4年前,所以我不太记得这段代码,但是process方法只是个例子,整个重点是使用groupBy。 - Mr Jedi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接