如何在另一个Tokio运行时内创建Tokio运行时,而不出现“无法从运行时内部启动运行时”的错误?

22

我正在使用rust_bert来进行文本摘要。我需要使用rust_bert::pipelines::summarization::SummarizationModel::new设置模型,该模型从互联网获取。它使用tokio异步执行此操作,而我遇到的问题(我认为)是我正在另一个Tokio运行时中运行Tokio运行时,如错误消息所示:

Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

我尝试过使用tokio::task::spawn_blockingtokio::task::block_in_place同步运行模型获取,但它们都对我没有用。 block_in_place 的错误与没有使用它时一样,而spawn_blocking 对我也不是很有用。 我还尝试了将summarize_text变为异步操作,但效果不大。

我遇到问题的代码如下:

Github Issuetokio-rs/tokio#2194和Reddit帖子"'Cannot start a runtime from within a runtime.' with Actix-Web And Postgresql"看起来相似(相同的错误信息),但它们并没有帮助我找到解决方案。

use egg_mode::tweet;
use rust_bert::pipelines::summarization::SummarizationModel;

fn summarize_text(model: SummarizationModel, text: &str) -> String {
    let output = model.summarize(&[text]);
    // @TODO: output summarization
    match output.is_empty() {
        false => "FALSE".to_string(),
        true => "TRUE".to_string(),
    }
}

#[tokio::main]
async fn main() {
    let model = SummarizationModel::new(Default::default()).unwrap();

    let token = egg_mode::auth::Token::Bearer("obviously not my token".to_string());
    let tweet_id = 1221552460768202756; // example tweet

    println!("Loading tweet [{id}]", id = tweet_id);
    let status = tweet::show(tweet_id, &token).await;
    match status {
        Err(err) => println!("Failed to fetch tweet: {}", err),
        Ok(tweet) => {
            println!(
                "Original tweet:\n{orig}\n\nSummarized tweet:\n{sum}",
                orig = tweet.text,
                sum = summarize_text(model, &tweet.text)
            );
        }
    }
}

很难回答你的问题,因为它没有包含一个 [MRE]。我们无法确定代码中存在哪些 crates(及其版本)、types、traits、fields等。如果您尝试在全新的Cargo项目中重现错误,然后[编辑]您的问题以包括完整的示例,那么这将使我们更容易帮助您。您可以使用Rust-specific MRE tips来缩小您要发布的原始代码。谢谢! - Shepmaster
MRE是否也适用于问题是由相关的箱子“引起”的情况?(rust_bert)-无论如何,我现在会添加一个。稍等片刻。(编辑:即使用rust_bert在MRE中是否可以?) - Mib
可以在MRE中使用Rust_BERT吗? - 可以,但是,就像我说的:crates(及其版本。要能够_重现_该问题,需要该版本。 - Shepmaster
2个回答

49

解决问题

这是一个简化的示例:

use tokio; // 1.0.2

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    inner_example();
}

thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.0.2/src/runtime/enter.rs:39:9

为了避免这种情况,您需要在完全独立的线程上运行创建第二个Tokio运行时的代码。最简单的方法是使用std::thread::spawn
use std::thread;

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        inner_example();
    }).join().expect("Thread panicked")
}

为了提高性能,您可能希望使用线程池而不是每次创建新线程。方便地,通过spawn_blocking,Tokio本身提供了这样的线程池:

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    tokio::task::spawn_blocking(|| {
        inner_example();
    }).await.expect("Task panicked")
}

在某些情况下,您不需要实际创建第二个Tokio运行时,而可以重复使用父运行时。要这样做,您需要向外部运行时传递一个Handle。如果您需要等待工作完成,您可以选择使用轻量级执行器,例如futures::executor来阻塞结果:
use tokio::runtime::Handle; // 1.0.2

fn inner_example(handle: Handle) {
    futures::executor::block_on(async {
        handle
            .spawn(async {
                // Do work here
            })
            .await
            .expect("Task spawned in Tokio executor panicked")
    })
}

#[tokio::main]
async fn main() {
    let handle = Handle::current();

    tokio::task::spawn_blocking(|| {
        inner_example(handle);
    })
    .await
    .expect("Blocking task panicked")
}

参见:

避免问题

更好的方法是首先避免创建嵌套的Tokio运行时。理想情况下,如果库使用异步执行器,则应该提供直接的异步函数,以便您可以使用自己的执行器。

值得查看API是否有非阻塞替代方案,如果没有,在项目存储库上提出问题。

您还可以重新组织代码,以便Tokio运行时不是嵌套而是顺序执行:

struct Data;

#[tokio::main]
async fn inner_example() -> Data {
    Data
}

#[tokio::main]
async fn core(_: Data) {}

fn main() {
    let data = inner_example();
    core(data);
}

2
这样做可以让我在生成的线程中检索变量吗? - Mib
1
根据“避免问题”/“查看API是否有非阻塞替代方法”的建议:这是rust_bert的问题吗?我做的第一件事就是寻找异步版本,但没有找到。(这种情况下似乎必须使用阻塞方式——在我的情况下,其他所有操作都依赖于它) - Mib
我也在使用 rust-bert 并尝试了上面的顺序模式,但仍然看到错误。@Mib 你能把你的工作代码发布在某个地方吗? - Alex Moore-Niemi
关于检索变量 - 您可以始终使用通道来来回传递数据。 - WhiteStork
你能否将“futures executor”与“tokio runtime”混合使用? - Brandon Ros
显示剩余8条评论

5

在将QA模型加载到tokio运行时时,我遇到了类似的问题,即使是顺序运行时也无法正常工作。但是,在rust-bert的github issues中找到了解决方案。解决方案很简单,就是将初始加载调用包装在task::spawn_blocking中。对我来说这很好,因为在加载完成之前我无法接受任何请求。下面是一个代码片段,以便帮助他人。

   78 fn with_model(
   79     qa: QaModel, // alias for Arc<Mutex<QuestionAnsweringModel>>
   80 ) -> impl Filter<Extract = (QaModel,), Error = std::convert::Infallible>       + Clone {
   81     warp::any().map(move || qa.clone())
   82 }
   83
   84 #[tokio::main]
   85 async fn main() {
   86     env_logger::init();
   87 
   88     // NOTE: have to download the model before booting up
>> 89     let qa_model: QaModel = task::spawn_blocking(move || {
   90         log::debug!("setting up qa model config");
   91         let c = qa_model_config();
   92         log::debug!("finished setting up qa model config");
   93 
   94         log::debug!("setting up qa model");
   95         let m = qa_model(c);
   96         log::debug!("finished setting up qa model");
   97         m
   98     })
   99     .await
  100     .expect("got model");
  101 
  102     let ask_handler = warp::path!("ask")
  103         .and(warp::get())
  104         .and(warp::query::<QaQuery>())
  105         .and(with_model(qa_model))
  106         .and_then(ask);
  107 
  108     warp::serve(ask_handler).run(([127, 0, 0, 1], 3030)).await;
  109 }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接