如何使用Tokio在定期间隔内同时运行一组函数,而不会同时运行相同的函数?

4
我的目标是同时运行N个函数,但在所有函数完成之前不想再生成更多的函数。这是我到目前为止所拥有的
extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

每秒钟我生成5个函数,但现在我想等到所有的函数完成后再生成更多的函数。
据我理解(可能理解有误),我正在另一个 future 中返回一个 Future
task (Interval ----------------------+ (outer future)
    for i in 0..5 {                  |
        tokio::spawn(  ----+         | 
            // my function | (inner) |
            Ok(())         |         |
        )              ----+         |
    }                                |
    Ok(()) --------------------------+

我被困在等待内部未来完成的过程中。
2个回答

6

您可以通过将工作未来对象合并到一起,以便它们都在并行运行,但必须同时完成来实现这一点。然后,出于相同的原因,将其与1秒的延迟连接。将其包装成一个循环,以便永久运行它(或为演示运行5次迭代)。

Tokio 1.3

use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
use std::time::{Duration, Instant};
use tokio::time; // 1.3.0

#[tokio::main]
async fn main() {
    let now = Instant::now();
    let forever = stream::unfold((), |()| async {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
        
        Some(((), ()))
    });

    forever.take(5).for_each(|_| async {}).await;
    eprintln!("Took {:?}", now.elapsed());
}

fn all_pages() -> Vec<BoxFuture<'static, ()>> {
    vec![page("a", 100).boxed(), page("b", 200).boxed()]
}

async fn page(name: &'static str, time_ms: u64) {
    eprintln!("page {} starting", name);
    time::sleep(Duration::from_millis(time_ms)).await;
    eprintln!("page {} done", name);
}

Loop starting at Instant { t: 1022680437923626 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022681444390534 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022682453240399 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022683469924126 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { t: 1022684493522592 }
page a starting
page b starting
page a done
page b done
Took 5.057315596s

Tokio 0.1

use futures::future::{self, Loop}; // 0.1.26
use std::time::{Duration, Instant};
use tokio::{prelude::*, timer::Delay};  // 0.1.18

fn main() {
    let repeat_count = Some(5);

    let forever = future::loop_fn(repeat_count, |repeat_count| {
        eprintln!("Loop starting at {:?}", Instant::now());

        // Resolves when all pages are done
        let batch_of_pages = future::join_all(all_pages());

        // Resolves when both all pages and a delay of 1 second is done
        let wait = Future::join(batch_of_pages, ez_delay_ms(1000));

        // Run all this again
        wait.map(move |_| {
            if let Some(0) = repeat_count {
                Loop::Break(())
            } else {
                Loop::Continue(repeat_count.map(|c| c - 1))
            }
        })
    });

    tokio::run(forever.map_err(drop));
}

fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
    vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
}

fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    future::ok(())
        .inspect(move |_| eprintln!("page {} starting", name))
        .and_then(move |_| ez_delay_ms(time_ms))
        .inspect(move |_| eprintln!("page {} done", name))
}

fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
    Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
}

Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
page a starting
page b starting
page a done
page b done
Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
page a starting
page b starting
page a done
page b done

另请参阅:


谢谢,但我还在努力理解tokio::spawn()的用法。你的回答帮助我了解了如何“延迟”而不是使用thread::sleep。但是,如果我想调用系统命令sleep N,有没有一种方法可以“分组并等待”多个tokio::spawn()的返回值,并在所有函数完成后继续执行下一批或者等待定义的时间间隔过去后再次调用所有函数? - nbari
例如,如何使用类似于 Command::new("sleep").arg("3").output().expect("failed to execute process"); 的方式进行模拟。 - nbari
@nbari 这样的问题没有意义,因为tokio::spawn返回的值“实际上没有提供任何功能”(https://docs.rs/tokio/0.1.20/tokio/executor/struct.Spawn.html)。 - Shepmaster
@nbari 我已经添加了关于如何运行通用阻塞代码的现有问答链接(简而言之:使用线程池),以及关于Command的链接(简而言之:使用tokio-process)。 - Shepmaster
谢谢,不过我还是不太清楚如何使用Tokio来以定义的间隔(调度器)无限运行某个任务,并同时并发地生成X个阻塞函数并等待它们完成后再次调用。最终,我只想在Rust中做类似这样的事情:https://play.golang.org/p/ZLw6ESioqfu。我知道比较语言是不公平的,但这有助于明确我想要实现的目标。 - nbari
@nbari 将对 page 的调用替换为对 tokio-threadpool 的调用,如链接的问答中所述。 - Shepmaster

1
从我的理解来看(我可能理解错误),我在另一个future中返回了一个Future。
您没有错,但是在您提供的代码中,唯一返回的future是实现IntoFuture的Ok(())。tokio::spawn只是将新任务生成到Tokio的DefaultExecutor中。
如果我从您的问题中理解正确,您想要在前一个批次完成后再生成下一个批次,但是如果前一个批次在1秒钟之前完成,则希望在生成下一个批次之前完成该1秒钟。
实现自己的future并自己处理poll会是更好的解决方案,但是这可以粗略地完成:
使用join_all收集批处理任务。这是一个等待收集的futures完成的新future。
对于1秒钟的等待,您可以使用原子状态。如果锁定了tick,则等待直到状态被释放。
这是代码(Playground):
extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::time::{self, Duration, Instant};

use tokio::prelude::*;
use tokio::timer::{Delay, Interval};

use futures::future::join_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

fn main() {
    let locker = Arc::new(AtomicBool::new(false));

    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .map_err(|e| panic!("interval errored; err={:?}", e))
        .for_each(move |interval| {
            let is_locked = locker.load(Ordering::SeqCst);
            println!("Interval: {:?} --- {:?}", interval, is_locked);

            if !is_locked {
                locker.store(true, Ordering::SeqCst);
                println!("locked");

                let futures: Vec<_> = (0..5)
                    .map(|i| {
                        lazy(move || {
                            println!("Running Task-{}", i);
                            // mock delay
                            Delay::new(Instant::now() + Duration::from_millis(100 - i))
                                .then(|_| Ok(()))
                        })
                        .and_then(move |_| {
                            println!("Task-{} is done", i);
                            Ok(())
                        })
                    })
                    .collect();

                let unlocker = locker.clone();
                tokio::spawn(join_all(futures).and_then(move |_| {
                    unlocker.store(false, Ordering::SeqCst);
                    println!("unlocked");

                    Ok(())
                }));
            }

            Ok(())
        });

    tokio::run(task.then(|_| Ok(())));
}

输出:
Interval: Instant { tv_sec: 4036783, tv_nsec: 211837425 } --- false
locked
Running Task-0
Running Task-1
Running Task-2
Running Task-3
Running Task-4
Task-4 is done
Task-3 is done
Task-2 is done
Task-1 is done
Task-0 is done
unlocked
Interval: Instant { tv_sec: 4036784, tv_nsec: 211837425 } --- false
locked
Running Task-0
Running Task-1
Running Task-2
Running Task-3
Running Task-4
Task-3 is done
Task-4 is done
Task-0 is done
Task-1 is done
Task-2 is done
unlocked

警告!:请检查{{link1:Shepmaster的评论}}

即使是为了演示,{{link2:你也不应该在futures中使用thread:sleep}}。有更好的替代方案。


即使是为了演示,你也不应该在future中使用thread:sleep。有更好的替代方案。 - Shepmaster
@Shepmaster 在问题中已经提到了,我直接复制了它。而且它被标记为模拟延迟,所以我在答案中没有提到它。 - Ömer Erden
1
原始问题还有其他问题,你没有直接复制(而是Stack Overflow的重点)。要意识到大多数人会将你的代码复制粘贴,几乎没有思考答案。更好的做法是在任何地方都鼓励良好的实践。 - Shepmaster
@ÖmerErden 谢谢,但是如何同时生成所有的函数呢?根据你的例子,它们是一个接一个地运行的。所以如果每个函数延迟5秒,那么下一批将会在N个函数 * 5秒之后运行,而不仅仅是5秒整体运行。 - nbari
@nbari 是的,由于使用了 thread::sleep,它并没有并行执行。如果你阅读评论中的帖子,你就可以看到原因。我更新了代码,使用适当的模拟延迟,并延迟了 Duration::from_millis(100 - i),以便查看任务以任意顺序完成。 - Ömer Erden
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接