使用 Rust 阻塞任务直到队列不为空

3
我需要在一组任务之间分派工作。std::sync::deque 足以解决这个问题,但如果队列为空,我需要阻塞任务。
以下代码(在 GitHub gist 中可用)是使用std::sync::deque的工作示例:
extern crate time;

use std::io::timer::sleep;
use std::sync::deque::{BufferPool, Empty, Abort, Data};
use std::time::Duration;

fn main() {

  let start = time::precise_time_s();
  let pool = BufferPool::new();
  let (worker, stealer) = pool.deque();

  for task_id in range(1i, 5) {
    let sc = stealer.clone();
    spawn(proc() {
      loop {
        let elapse = time::precise_time_s() - start;
        match sc.steal() {
          Empty      => { println!("[{} @ {:#7.4}] No items", task_id, elapse); sleep(Duration::milliseconds(300)) },
          Abort      =>   println!("[{} @ {:#7.4}] ABORT. Retrying.", task_id, elapse),
          Data(item) =>   println!("[{} @ {:#7.4}] Found {}", task_id, elapse, item)
        }
      }
    });
  }

  for item in range(1i, 1000) {
    for n in range(1i, 20) {
      worker.push(item * n);
    }
    sleep(Duration::milliseconds(1000));
  }

}

我看到有一个std::sync::TaskPool,但是当前实现即使线程忙于旧任务,也会将工作发送到任务中。

我的问题是:如何最好地阻止任务,直到队列中有任何项目?

1个回答

4
一种可能的解决方案是使用信号量:
extern crate time;

use std::io::timer::sleep;
use std::sync::deque::{BufferPool, Empty, Abort, Data};
use std::sync::{Semaphore, Arc};
use std::time::Duration;

fn main() {

  let start = time::precise_time_s();
  let pool = BufferPool::new();
  let (worker, stealer) = pool.deque();
  let sem = Arc::new(Semaphore::new(0));

  for task_id in range(1i, 5) {
    let sc = stealer.clone();
    let s = sem.clone();
    spawn(proc() {
      loop {
        let elapse = time::precise_time_s() - start;
        s.acquire();
        match sc.steal() {
          Empty      => {
              println!("[{} @ {:#7.4}] No items", task_id, elapse);
              sleep(Duration::milliseconds(300))
          },
          Abort      =>   {
              println!("[{} @ {:#7.4}] ABORT. Retrying.", task_id, elapse);
              s.release();
          },
          Data(item) =>   println!("[{} @ {:#7.4}] Found {}", task_id, elapse, item)
        }
      }
    });
  }

  for item in range(1i, 1000) {
    for n in range(1i, 20) {
      worker.push(item * n);
      sem.release();
    }
    sleep(Duration::milliseconds(1000));
  }

}

你可以在这里看到,当每个值被生产时,你会释放一个信号量资源,并在从队列中获取值之前获取它。在这种情况下,返回的值永远不会为空,但仍然可能会中止,因为没有读取任何内容,但是该值仍然在队列中。

另一种可能的解决方案也是使用通道,在没有值时阻塞。为了提高性能,您需要对两种解决方案进行基准测试。


1
谢谢!最终,我为所有工作线程使用了全局信号量,并为每个工作任务使用了“release()”方法。代码在https://gist.github.com/ayosec/aee066d5e5a3f38c66b9#file-jobs_semaphore-rs中。 - Ayose
1
@Ayose:你忘记在“Abort”时释放一个资源了。这样,当任务阻塞时,队列中会留下一个值来表示每个中止。 - Arjan
或者只需在循环中调用 steal() 直到获取数据。 - Arjan
你说得对。感谢您的评论!我使用loop来包装steal(),为此问题更新了gist中的修复程序。差异在https://gist.github.com/ayosec/aee066d5e5a3f38c66b9/revisions中可见。 - Ayose

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接