如何在 Reactor 中使用 flatMap() 和 Context?

11

我对Context的理解有些问题。文档说Context是:

通过上下文协议在组件之间(如操作器)传播的键/值存储。 上下文非常适合传输诸如跟踪或安全令牌之类的正交信息。

很好。

现在假设我们想使用Context传播一些东西,以便到处都能使用它。要调用另一个异步代码,我们只需使用flatMap()方法。

问题: 如何在被调用的方法内部访问上下文?

示例(简单)代码:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
            .flatMap(TestFlatMap::nameToGreeting)
            .subscriberContext(context ->
                Context.of("greetingWord", "Hello")  // context initialized
            );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.just("Hello " + name + " !!!");  // ALERT: we don't have Context here
    }
}

被调用的方法可以(并且很可能)在另一个类中。事先感谢您的帮助!编辑:删除了一些代码,使问题更加简洁明了。
2个回答

20

链接您的Publisher,愿Context与您同在

如果您连接了所有Publisher(包括在flatMap/concatMap和类似运算符中的连接),则Context将正确地在整个流程运行时传播。

要在nameToGreeting方法中访问Context,可以调用Mono.subscribeContext并检索存储的信息,即使这些方法看起来不相关。以下示例展示了上述概念:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.subscriberContext()
                   .filter(c -> c.hasKey("greetingWord"))
                   .map(c -> c.get("greetingWord"))
                   .flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
    }
}

同时,您也可以使用zip运算符以类似的方式进行操作,以便稍后组合结果:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.zip(
            Mono.subscriberContext()
                .filter(c -> c.hasKey("greetingWord"))
                .map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
            Mono.just(name),
            (greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
        );
    }
}

那么,为什么它能工作?

从上面的示例中可以看出,nameToGreeting 在主 Flux 的上下文中被调用。在内部实现中->(这里是一些 FluxFlatMap 内部实现),每个映射的 Publisher 都由 FlatMapInner 订阅。如果我们查看 FlatMapInner 并查找 currentContext 重载,我们将看到,FlatMapInner 使用父级 Context,这意味着如果父级拥有 Reactor Context - 则此上下文将传播到每个内部的 Publisher

因此,返回的 Mono,由 nameToGreeting 方法生成,将具有与其父级相同的 Context


10

Reactor-Core v3.4引入了Mono.deferContextualFlux.deferContextual,它们取代了v3.3中引入的Mono.deferWithContextFlux.deferWithContext

使用这些方法,Oleh Dokukas zip example可以简化为:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                .flatMap(TestFlatMap::nameToGreeting)
                .subscriberContext(context ->
                        Context.of("greetingWord", "Hello"));  // context initialized
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.deferContextual(c -> Mono.just(name)
                .filter(x -> c.hasKey("greetingWord"))
                .map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接