如何使用futures.rs和Redis PubSub实现阻塞调用的future流?

7
我正在尝试创建一个系统,使我的应用程序能够从Redis PubSub频道接收流数据并处理它。我正在使用的Redis驱动程序以及我看到的所有其他Rust Redis驱动程序,都使用阻塞操作从通道获取数据,只有在接收到数据时才返回值:
let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

我希望使用futures-rs库来包装这个阻塞函数调用,以便在等待输入时可以执行应用程序中的其他任务。

我阅读了futures的教程,并尝试创建一个Stream,以便当PubSub接收到数据时发出信号,但我无法弄清楚如何实现。

如何为阻塞的pubsub.get_message()函数创建schedulepoll函数?


5
在大型公告的当天使用一个图书馆,多么雄心勃勃!^_^ - Shepmaster
1个回答

11

重要提示:我以前从未使用过这个库,对某些概念的低级了解略有欠缺。主要是通过阅读教程来学习。我相信任何做过异步工作的人都会嘲笑我的水平,但这可能对其他人是一个有用的起点。买家自负!


让我们从一些更简单的东西开始,演示一个Stream如何工作。我们可以将一个Result迭代器转换成流:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

这向我们展示了一种消费流的方法。我们使用and_then对每个有效载荷执行某些操作(这里只是打印出来),然后使用for_eachStream转换回Future。然后我们可以通过调用奇怪命名的forget方法运行未来。


接下来是将Redis库与混合体系结合起来,仅处理一个消息。由于get_message()方法是阻塞的,我们需要将一些线程引入到混合中。在此类异步系统中执行大量工作并不是一个好主意,因为其他所有工作都会被阻塞。例如

除非另有安排,否则应确保此函数的实现非常快速。

在理想的情况下,Redis库将建立在像futures这样的库之上,并本地公开所有内容。

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

我在这里的理解有些模糊。在一个单独的线程中,我们会阻塞等待消息并在得到消息后将其推送到通道中。但我不明白为什么我们需要保留线程的句柄。我期望foo.forget会自己阻塞,一直等到流为空。

在连接Redis服务器的telnet中,发送以下内容:

publish rust awesome

你会看到它起作用了。添加打印语句表明,在线程被生成之前,foo.forget 语句已经运行了(对我而言)。


发送多条消息则更加棘手。为了防止生成方进度过快,Sender 将自身消耗掉。这是通过从 send 返回另一个 future 来实现的!我们需要将其传回来以在循环的下一次迭代中重复使用:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

我相信随着时间的推移,会有更多的生态系统来支持这种互操作性。例如,futures-cpupool crate 可能可以扩展以支持类似于这个用例。


感谢您的精彩回答!只有一个问题:加入redis_thread是否会抵消使结果读取过程非阻塞的所有努力?也许有些事情我没有理解。 - Ameo
1
我会期望foo.forget会阻塞自己,等待流变为空。实际上,未来对象并不一定提供“阻塞直到准备就绪”的方法。根据其描述,forget()方法是为了在未来对象被丢弃时防止自动取消,但它与等待无关。例如,在Scala中,未来对象没有这种方法,而是有一个单独的Await.ready/Await.result方法对,可以在某个超时时间内等待未来对象准备就绪。 - Vladimir Matveev
1
据我所了解,在 future-rs 中可以使用 Future::select 实现类似的功能,其中第二个 future 在固定超时时间后完成。 - Vladimir Matveev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接