在Rust中,创建线程或函数的超时,正确的方法是什么?

26
这是我的代码:
use std::net;
use std::thread;

fn scan_port(host: &str, port: u16) -> bool {
    let host = host.to_string();
    let port = port;
    let t = thread::spawn(move || net::TcpStream::connect((host.as_str(), port)).is_ok());

    t.join().unwrap()
}

如何创建一个情况,如果连接没有在N秒内完成,线程将被终止或杀死?

所有这一切的原因是Rust没有设置套接字连接超时的方法,因此我无法确保程序不会卡住。


3
如果你的示例代码更加简洁明了,只展示手头问题而不是一堆命令行参数解析和选项调整,那么可能会更好。 - Mike Cooper
在添加了终止或杀死线程的语言中,这是一个不好的想法。Rust 不会添加它。 - Shepmaster
@Ba7a7chy:我认为@MikeCooper的意思是一个MCVE,这意味着与tcp毫不相关的东西。只需编写一个执行loop {}的线程以及您试图终止它的尝试即可。 - oli_obk
1
@ker 我有一些希望,也许有人会看到这个 Rust 的例子,并会说“哦,你不能设置超时,但你可以做 x”,所以我会留下具体的 tcp socket 示例,希望这个愿望能成真 :) - Ba7a7chy
哦,好的,那很有道理。不过,导入 argparse 包并不是必要的。 - oli_obk
net2 crate可以根据alexcrichton的说法,为TCP情况提供更精细的控制。 - oli_obk
4个回答

13

@ker的答案将始终等待5秒钟,即使连接更快地完成。这里有一种类似的方法,其中超时和网络请求都在不同的线程上进行,并且第一个完成的线程获胜:

let (sender, receiver) = mpsc::channel();
let tsender = sender.clone();
let t = thread::spawn(move || {
    match sender.send(Ok(net::TcpStream::connect((host.as_str(), port)))) {
        Ok(()) => {}, // everything good
        Err(_) => {}, // we have been released, don't panic
    }
});
let timer = thread::spawn(move || {
  thread::sleep(Duration::from_millis(5000));
  match tsender.send(Err(MyTimeoutError)) {
    Ok(()) => {}, // oops, we timed out
    Err(_) => {}, // great, the request finished already
  }
});
return receiver.recv().unwrap();

但只要你这样做了,你也可以直接使用recv_timeout

let (sender, receiver) = mpsc::channel();
let t = thread::spawn(move || {
    match sender.send(net::TcpStream::connect((host.as_str(), port))) {
        Ok(()) => {}, // everything good
        Err(_) => {}, // we have been released, don't panic
    }
});
return receiver.recv_timeout(Duration::from_millis(5000));

12

正如@Shepmaster所指出的: 终止线程是一个不好的想法。

相反,您可以给线程一个Sender,通过它通知您它是否成功打开了连接(甚至可以通过发送句柄来通知您)。然后,您可以让主线程sleep等待所需的时间。当您的线程醒来时,它会检查其对应的Receiver是否有来自该线程的生命迹象。如果线程没有回答,只需释放它并丢弃JoinHandleReceiver。这不像它正在消耗CPU时间(它被阻塞),也不会消耗太多内存。如果它解除阻塞,它将检测到Sender未连接,并可以永久关闭。

当然,你不应该打开无数这样的线程,因为它们会消耗资源(内存和系统线程句柄),但在正常情况下,这并不是太大的问题。
例如:
use std::net;
use std::thread;
use std::sync::mpsc;

fn scan_port(host: &str, port: u16) -> bool {
    let host = host.to_string();
    let port = port;
    let (sender, receiver) = mpsc::channel();
    let t = thread::spawn(move || {
        match sender.send(net::TcpStream::connect((host.as_str(), port))) {
            Ok(()) => {}, // everything good
            Err(_) => {}, // we have been released, don't panic
        }
    });

    thread::sleep(std::time::Duration::new(5, 0));

    match receiver.try_recv() {
        Ok(Ok(handle)) => true, // we have a connection
        Ok(Err(_)) => false, // connecting failed
        Err(mpsc::TryRecvError::Empty) => {
            drop(receiver);
            drop(t);
            // connecting took more than 5 seconds
            false
        },
        Err(mpsc::TryRecvError::Disconnected) => unreachable!(),
    }
}

我现在尝试了一下,似乎接收器会一直阻塞直到它收到一个答案,如果真是这样的话,那么使用发送\接收时我就处于同样的位置。 - Ba7a7chy
2
我已经更新了一些代码。你需要使用try_recv,它是非阻塞接收方法。 - oli_obk

3
这里有一个抽象函数,可以让你在设定的时间内运行任何函数。
use std::{
    collections::HashMap,
    sync::mpsc::{self, RecvTimeoutError},
    thread,
    time::Duration,
};

#[derive(Debug)]
struct TimeoutError;

fn run_with_timeout<F, T>(f: F, timeout: Duration) -> Result<T, TimeoutError>
where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,
{
    let (tx, rx) = mpsc::channel();
    let _ = thread::spawn(move || {
        let result = f();
        match tx.send(result) {
            Ok(()) => {} // everything good
            Err(_) => {} // we have been released, don't panic
        }
    });

    match rx.recv_timeout(timeout) {
        Ok(result) => Ok(result),
        Err(RecvTimeoutError::Timeout) => Err(TimeoutError),
        Err(RecvTimeoutError::Disconnected) => unreachable!(),
    }
}

#[allow(dead_code)]
#[derive(Debug)]
struct Foo {
    bar: String,
    bar_vec: Vec<String>,
    bar_map: HashMap<String, String>,
}

fn return_foo() -> Foo {
    Foo {
        bar: "bar".to_string(),
        bar_vec: vec!["bar".to_string()],
        bar_map: [("bar".to_string(), "bar".to_string())]
            .iter()
            .cloned()
            .collect(),
    }
}

fn main() {
    // This will timeout
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            42
        },
        Duration::from_secs(1),
    );
    println!("Result: {:?}", result);

    // This will not timeout
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            42
        },
        Duration::from_secs(3),
    );
    println!("Result: {:?}", result);

    // This will timeout (Custom type)
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            return_foo()
        },
        Duration::from_secs(1),
    );
    println!("Result: {:?}", result);

    // This will not timeout (Custom type)
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            return_foo()
        },
        Duration::from_secs(3),
    );
    println!("Result: {:?}", result);
}


请注意,超时不会停止正在运行的线程。如果您想取消作业,请使用tokio。可以在此处找到示例:https://github.com/JadKHaddad/Run-With-Timeout - Jad Haddad

2

您可以使用tokio

use std::net;
use std::time::Duration;
use tokio::time::error::Elapsed;

async fn scan_port(host: &str, port: u16) -> Result<bool, Elapsed> {
    tokio::time::timeout(Duration::from_secs(10), async {
        net::TcpStream::connect((host, port)).is_ok()
    })
    .await
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接