重要提示:我以前从未使用过这个库,对某些概念的低级了解略有欠缺。主要是通过阅读教程来学习。我相信任何做过异步工作的人都会嘲笑我的水平,但这可能对其他人是一个有用的起点。买家自负!
让我们从一些更简单的东西开始,演示一个Stream
如何工作。我们可以将一个Result
迭代器转换成流:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
这向我们展示了一种消费流的方法。我们使用and_then
对每个有效载荷执行某些操作(这里只是打印出来),然后使用for_each
将Stream
转换回Future
。然后我们可以通过调用奇怪命名的forget
方法运行未来。
接下来是将Redis库与混合体系结合起来,仅处理一个消息。由于get_message()
方法是阻塞的,我们需要将一些线程引入到混合中。在此类异步系统中执行大量工作并不是一个好主意,因为其他所有工作都会被阻塞。例如:
除非另有安排,否则应确保此函数的实现非常快速。
在理想的情况下,Redis库将建立在像futures这样的库之上,并本地公开所有内容。
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我在这里的理解有些模糊。在一个单独的线程中,我们会阻塞等待消息并在得到消息后将其推送到通道中。但我不明白为什么我们需要保留线程的句柄。我期望foo.forget
会自己阻塞,一直等到流为空。
在连接Redis服务器的telnet中,发送以下内容:
publish rust awesome
你会看到它起作用了。添加打印语句表明,在线程被生成之前,foo.forget
语句已经运行了(对我而言)。
发送多条消息则更加棘手。为了防止生成方进度过快,Sender
将自身消耗掉。这是通过从 send
返回另一个 future 来实现的!我们需要将其传回来以在循环的下一次迭代中重复使用:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我相信随着时间的推移,会有更多的生态系统来支持这种互操作性。例如,futures-cpupool crate 可能可以扩展以支持类似于这个用例。