如何在不同时运行同一功能的情况下,使用标准 Rust 库定期运行一组函数?

5
我希望使用Rust创建一个简单的调度程序,以便在定义的时间运行多个并发函数,但如果它们还没有完成,则不要启动更多函数。
例如,如果定义的间隔为一秒钟,调度程序应该运行这些函数,并且如果之前的函数还没有返回,则不要启动更多函数。目标是防止多次运行同一函数。
我用Go创建了一个工作示例,方法如下:
package main

import (
    "fmt"
    "sync"
    "time"
)

func myFunc(wg *sync.WaitGroup) {
    fmt.Printf("now: %+s\n", time.Now())
    time.Sleep(3 * time.Second)
    wg.Done()
}

func main() {
    quit := make(chan bool)

    t := time.NewTicker(time.Second)
    go func() {
        for {
            select {
            case <-t.C:
                var wg sync.WaitGroup
                for i := 0; i <= 4; i++ {
                    wg.Add(1)
                    go myFunc(&wg)
                }
                wg.Wait()
                fmt.Printf("--- done ---\n\n")
            case <-quit:
                return
            }
        }
    }()

    <-time.After(time.Minute)
    close(quit)
}

由于在Rust标准库中没有像Go的NewTicker这样的东西,我使用了Tokio并且想出了这个方法

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("I am i: {}", i);
                    thread::sleep(time::Duration::from_secs(3));
                    Ok(())
                }));
            }
            Ok(())
        })
        .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

我对这种方法的问题在于任务不等待前一个函数被调用,因此函数会重新开始运行,即使之前它们已经在运行了,我在这里缺少像Go的sync.WaitGroup这样的东西。有什么可以用来实现与工作示例相同的结果吗?
是否可能只使用标准库就能实现这一点?这主要是为了学习目的,可能有一种非常简单的方法来做到这一点,我可以避免额外的复杂性。
最终,我想通过HTTP定期监视一些网站(只获取返回的状态码),但在获得所有响应之前不查询它们全部。

不要在future中使用thread::sleep为什么Future::select会首先选择休眠时间更长的future? - Shepmaster
如果您每秒运行一次,而函数需要10秒钟的时间,那么在函数再次运行之前是否应该有1秒钟的休息时间,还是立即再次运行? - Shepmaster
@Shepmaster,感谢您的所有评论,但由于我对RUST的理解和经验不足,这就是为什么我提出了您所描述的不一致的问题。如果有帮助的话,最终我只想知道RUST的做法是否与工作示例https://play.golang.org/p/ZLw6ESioqfu相似。我之所以询问如何仅使用std lib来完成它,是因为在其他编程语言中,这样做非常简单,并且可能被认为是“最佳实践”,但使用crate也完全可以,我只是想学习如何正确地做事 :-) - nbari
什么是RUST方式 - 为什么您相信只有一种正确的方法?您可能可以使用线程完成,也可能可以使用futures(以及可能的其他方式)。这些都不是“正确的方式”;这就是为什么此问题似乎过于宽泛的原因。如果您可以将其缩小到类似于“如何使用Rust的<此任务>此方面执行此操作”,那么它似乎更适合Stack Overflow上的主题。(以及它只是“Rust”,不是所有大写字母,不是所有小写字母)。 - Shepmaster
@Shepmaster明白了,感谢提醒。我希望能得到一些有建设性的回答,帮助我实现问题描述中的内容,例如... - nbari
1个回答

6

如果你想要并发且只使用标准库,那么你基本上必须使用线程。

在这里,我们为每个调度程序循环的每个函数启动一个线程,允许它们并行运行。然后等待所有函数完成,防止同时运行同一函数两次。

use std::{
    thread,
    time::{Duration, Instant},
};

fn main() {
    let scheduler = thread::spawn(|| {
        let wait_time = Duration::from_millis(500);

        // Make this an infinite loop
        // Or some control path to exit the loop
        for _ in 0..5 {
            let start = Instant::now();
            eprintln!("Scheduler starting at {:?}", start);

            let thread_a = thread::spawn(a);
            let thread_b = thread::spawn(b);

            thread_a.join().expect("Thread A panicked");
            thread_b.join().expect("Thread B panicked");

            let runtime = start.elapsed();

            if let Some(remaining) = wait_time.checked_sub(runtime) {
                eprintln!(
                    "schedule slice has time left over; sleeping for {:?}",
                    remaining
                );
                thread::sleep(remaining);
            }
        }
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

你可以使用Barrier,在执行函数之前将它们都放在一个线程中,并在执行结束时同步它们。
use std::{
    sync::{Arc, Barrier},
    thread,
    time::Duration,
};

fn main() {
    let scheduler = thread::spawn(|| {
        let barrier = Arc::new(Barrier::new(2));

        fn with_barrier(barrier: Arc<Barrier>, f: impl Fn()) -> impl Fn() {
            move || {
                // Make this an infinite loop
                // Or some control path to exit the loop
                for _ in 0..5 {
                    f();
                    barrier.wait();
                }
            }
        }

        let thread_a = thread::spawn(with_barrier(barrier.clone(), a));
        let thread_b = thread::spawn(with_barrier(barrier.clone(), b));

        thread_a.join().expect("Thread A panicked");
        thread_b.join().expect("Thread B panicked");
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

个人而言,我不会使用这两种解决方案。我会寻找一个木箱,在那里有其他人编写和测试我需要的代码。

另请参阅:


非常感谢,我会根据您的建议发布另一个关于使用Tokio的问题。 - nbari

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接