异步块创建的未来不是`Send`类型,因为涉及到*mut u8。

7

我已经能够继续实现我的异步UDP服务器。但是,由于我的变量data的类型为*mut u8,而不是Send,因此出现了这个错误,而且出现了两次:

error: future cannot be sent between threads safely
 help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `*mut u8`
note: captured value is not `Send`

以下是代码(MRE):

use std::error::Error;
use std::time::Duration;
use std::env;
use tokio::net::UdpSocket;
use tokio::{sync::mpsc, task, time}; // 1.4.0
use std::alloc::{alloc, Layout};
use std::mem;
use std::mem::MaybeUninit;
use std::net::SocketAddr;

const UDP_HEADER: usize = 8;
const IP_HEADER: usize = 20;
const AG_HEADER: usize = 4;
const MAX_DATA_LENGTH: usize = (64 * 1024 - 1) - UDP_HEADER - IP_HEADER;
const MAX_CHUNK_SIZE: usize = MAX_DATA_LENGTH - AG_HEADER;
const MAX_DATAGRAM_SIZE: usize = 0x10000;

/// A wrapper for [ptr::copy_nonoverlapping] with different argument order (same as original memcpy)
unsafe fn memcpy(dst_ptr: *mut u8, src_ptr: *const u8, len: usize) {
    std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, len);
}

// Different from https://doc.rust-lang.org/std/primitive.u32.html#method.next_power_of_two
// Returns the [exponent] from the smallest power of two greater than or equal to n.
const fn next_power_of_two_exponent(n: u32) -> u32 {
    return 32 - (n - 1).leading_zeros();
}

async fn run_server(socket: UdpSocket) {
    let mut missing_indexes: Vec<u16> = Vec::new();
    let mut peer_addr = MaybeUninit::<SocketAddr>::uninit();
    let mut data = std::ptr::null_mut(); // ptr for the file bytes
    let mut len: usize = 0; // total len of bytes that will be written
    let mut layout = MaybeUninit::<Layout>::uninit();
    let mut buf = [0u8; MAX_DATA_LENGTH];
    let mut start = false;
    let (debounce_tx, mut debounce_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);
    let (network_tx, mut network_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);

    loop {
        // Listen for events
        let debouncer = task::spawn(async move {
            let duration = Duration::from_millis(3300);

            loop {
                match time::timeout(duration, debounce_rx.recv()).await {
                    Ok(Some((size, peer))) => {
                        eprintln!("Network activity");
                    }
                    Ok(None) => {
                        if start == true {
                            eprintln!("Debounce finished");
                            break;
                        }
                    }
                    Err(_) => {
                        eprintln!("{:?} since network activity", duration);
                    }
                }
            }
        });
        // Listen for network activity
        let server = task::spawn({
            // async{
            let debounce_tx = debounce_tx.clone();
            async move {
                while let Some((size, peer)) = network_rx.recv().await {
                    // Received a new packet
                    debounce_tx.send((size, peer)).await.expect("Unable to talk to debounce");
                    eprintln!("Received a packet {} from: {}", size, peer);

                    let packet_index: u16 = (buf[0] as u16) << 8 | buf[1] as u16;

                    if start == false { // first bytes of a new file: initialization // TODO: ADD A MUTEX to prevent many initializations
                        start = true;
                        let chunks_cnt: u32 = (buf[2] as u32) << 8 | buf[3] as u32;
                        let n: usize = MAX_DATAGRAM_SIZE << next_power_of_two_exponent(chunks_cnt);
                        unsafe {
                            layout.as_mut_ptr().write(Layout::from_size_align_unchecked(n, mem::align_of::<u8>()));
                            
                            
                            // /!\  data has type `*mut u8` which is not `Send`
                            data = alloc(layout.assume_init());
                            
                            peer_addr.as_mut_ptr().write(peer);
                        }
                        let a: Vec<u16> = vec![0; chunks_cnt as usize]; //(0..chunks_cnt).map(|x| x as u16).collect(); // create a sorted vector with all the required indexes
                        missing_indexes = a;
                    }
                    missing_indexes[packet_index as usize] = 1;
                    unsafe {
                        let dst_ptr = data.offset((packet_index as usize * MAX_CHUNK_SIZE) as isize);
                        memcpy(dst_ptr, &buf[AG_HEADER], size - AG_HEADER);
                    };
                    println!("receiving packet {} from: {}", packet_index, peer);
                }
            }
        });

        // Prevent deadlocks
        drop(debounce_tx);

        match socket.recv_from(&mut buf).await {
            Ok((size, src)) => {
                network_tx.send((size, src)).await.expect("Unable to talk to network");
            }
            Err(e) => {
                eprintln!("couldn't recieve a datagram: {}", e);
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
    let socket = UdpSocket::bind(&addr).await?;
    println!("Listening on: {}", socket.local_addr()?);
    run_server(socket);
    Ok(())
}

由于我正在将同步代码转换为异步代码,所以可能会有多个线程写入数据,这可能是我遇到此错误的原因。但我不知道我可以使用哪种语法来“克隆”mut ptr并使其对每个线程唯一(以及缓冲区相同)。

如用户user4815162342建议的,我认为最好的方法是

通过将指针包装在结构体中并声明unsafe impl Send for NewStruct{}来使指针可发送。

非常感谢任何帮助!

PS:完整代码可以在我的GitHub存储库上找到


1
为什么要使用 as_mut_ptr()?你不能将其 Send。你需要另一种方法,最好是可以移动它或克隆/复制它。 - tadman
1
我不确定你能否在不转变方法的情况下赢得这场战斗。你所拥有的似乎更偏向于C或者可能是C++风格,而这种方式却忽略了Rust本应该为你提供的功能。 - tadman
1
好的,我会尝试并相应地编辑帖子,这本来就是我的TODO列表中的事情。 - Antonin GAVREL
1
不一定,这取决于您使用的运行时。单线程版本和多线程版本都存在。 - tadman
1
你可以通过将指针包装在结构体中并声明 unsafe impl Send for NewStruct {} 来创建一个名为 Send 的指针。然后,你需要自己证明不变量,例如读者^写者。 - user4815162342
显示剩余18条评论
1个回答

1

简短版

感谢user4815162342的评论,我决定添加mut ptr的实现,以便能够与Send和Sync一起使用,这使我得以解决这部分问题(还有其他问题,但超出了此问题的范围):

pub struct FileBuffer {
     data: *mut u8
 }

 unsafe impl Send for FileBuffer {}
 unsafe impl Sync for FileBuffer {}

//let mut data = std::ptr::null_mut(); // ptr for the file bytes
let mut fileBuffer: FileBuffer = FileBuffer { data:  std::ptr::null_mut() };

长版本
use std::error::Error;
use std::time::Duration;
use std::env;
use tokio::net::UdpSocket;
use tokio::{sync::mpsc, task, time}; // 1.4.0
use std::alloc::{alloc, Layout};
use std::mem;
use std::mem::MaybeUninit;
use std::net::SocketAddr;

const UDP_HEADER: usize = 8;
const IP_HEADER: usize = 20;
const AG_HEADER: usize = 4;
const MAX_DATA_LENGTH: usize = (64 * 1024 - 1) - UDP_HEADER - IP_HEADER;
const MAX_CHUNK_SIZE: usize = MAX_DATA_LENGTH - AG_HEADER;
const MAX_DATAGRAM_SIZE: usize = 0x10000;

/// A wrapper for [ptr::copy_nonoverlapping] with different argument order (same as original memcpy)
unsafe fn memcpy(dst_ptr: *mut u8, src_ptr: *const u8, len: usize) {
    std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, len);
}

// Different from https://doc.rust-lang.org/std/primitive.u32.html#method.next_power_of_two
// Returns the [exponent] from the smallest power of two greater than or equal to n.
const fn next_power_of_two_exponent(n: u32) -> u32 {
    return 32 - (n - 1).leading_zeros();
}

 pub struct FileBuffer {
     data: *mut u8
 }

 unsafe impl Send for FileBuffer {}
 unsafe impl Sync for FileBuffer {}

async fn run_server(socket: UdpSocket) {
    let mut missing_indexes: Vec<u16> = Vec::new();
    let mut peer_addr = MaybeUninit::<SocketAddr>::uninit();
    //let mut data = std::ptr::null_mut(); // ptr for the file bytes
    let mut fileBuffer: FileBuffer = FileBuffer { data:  std::ptr::null_mut() };
    let mut len: usize = 0; // total len of bytes that will be written
    let mut layout = MaybeUninit::<Layout>::uninit();
    let mut buf = [0u8; MAX_DATA_LENGTH];
    let mut start = false;
    let (debounce_tx, mut debounce_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);
    let (network_tx, mut network_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);

    loop {
        // Listen for events
        let debouncer = task::spawn(async move {
            let duration = Duration::from_millis(3300);

            loop {
                match time::timeout(duration, debounce_rx.recv()).await {
                    Ok(Some((size, peer))) => {
                        eprintln!("Network activity");
                    }
                    Ok(None) => {
                        if start == true {
                            eprintln!("Debounce finished");
                            break;
                        }
                    }
                    Err(_) => {
                        eprintln!("{:?} since network activity", duration);
                    }
                }
            }
        });
        // Listen for network activity
        let server = task::spawn({
            // async{
            let debounce_tx = debounce_tx.clone();
            async move {
                while let Some((size, peer)) = network_rx.recv().await {
                    // Received a new packet
                    debounce_tx.send((size, peer)).await.expect("Unable to talk to debounce");
                    eprintln!("Received a packet {} from: {}", size, peer);

                    let packet_index: u16 = (buf[0] as u16) << 8 | buf[1] as u16;

                    if start == false { // first bytes of a new file: initialization // TODO: ADD A MUTEX to prevent many initializations
                        start = true;
                        let chunks_cnt: u32 = (buf[2] as u32) << 8 | buf[3] as u32;
                        let n: usize = MAX_DATAGRAM_SIZE << next_power_of_two_exponent(chunks_cnt);
                        unsafe {
                            layout.as_mut_ptr().write(Layout::from_size_align_unchecked(n, mem::align_of::<u8>()));

                            // /!\  data has type `*mut u8` which is not `Send`
                            fileBuffer.data = alloc(layout.assume_init());

                            peer_addr.as_mut_ptr().write(peer);
                        }
                        let a: Vec<u16> = vec![0; chunks_cnt as usize]; //(0..chunks_cnt).map(|x| x as u16).collect(); // create a sorted vector with all the required indexes
                        missing_indexes = a;
                    }
                    missing_indexes[packet_index as usize] = 1;
                    unsafe {
                        let dst_ptr = fileBuffer.data.offset((packet_index as usize * MAX_CHUNK_SIZE) as isize);
                        memcpy(dst_ptr, &buf[AG_HEADER], size - AG_HEADER);
                    };
                    println!("receiving packet {} from: {}", packet_index, peer);
                }
            }
        });

        // Prevent deadlocks
        drop(debounce_tx);

        match socket.recv_from(&mut buf).await {
            Ok((size, src)) => {
                network_tx.send((size, src)).await.expect("Unable to talk to network");
            }
            Err(e) => {
                eprintln!("couldn't recieve a datagram: {}", e);
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
    let socket = UdpSocket::bind(&addr).await?;
    println!("Listening on: {}", socket.local_addr()?);
    run_server(socket);
    Ok(())
}

7
"unsafe impl Send" 在 Rust 里的意思相当于“玩命模式”,但如果它能正常工作,就算成功了吗? - tadman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接