如何使用tokio异步TcpStream连接外部TCP服务器以连接Bevy游戏?

3

我想在游戏客户端和服务器之间发送事件,我已经实现了这一点,但我不知道如何在bevy中实现。

我必须使用tokios异步TcpStream,因为我需要能够使用stream.into_split()将流分成OwnedWriteHalfOwnedReadhalf

我的第一个想法是只需生成一个处理连接的线程,然后使用mpsc::channel将接收到的事件发送到队列中。

然后,我将此队列包括在一个bevy资源中,使用app.insert_resource(Queue)将其插入,并从游戏循环中提取事件。

队列:

use tokio::sync::mpsc;

pub enum Instruction {
    Push(GameEvent),
    Pull(mpsc::Sender<Option<GameEvent>>),
}

#[derive(Clone, Debug)]
pub struct Queue {
    sender: mpsc::Sender<Instruction>,
}
impl Queue {
    pub fn init() -> Self {
        let (tx, rx) = mpsc::channel(1024);
        init(rx);
        Self{sender: tx}
    }
    pub async fn send(&self, event: GameEvent) {
        self.sender.send(Instruction::Push(event)).await.unwrap();
    }
    pub async fn pull(&self) -> Option<GameEvent> {
        println!("new pull");
        let (tx, mut rx) = mpsc::channel(1);
        self.sender.send(Instruction::Pull(tx)).await.unwrap();
        rx.recv().await.unwrap()
    }
}

fn init(mut rx: mpsc::Receiver<Instruction>) {
    tokio::spawn(async move {
        let mut queue: Vec<GameEvent> = Vec::new();

        loop {
            match rx.recv().await.unwrap() {
                Instruction::Push(ev) => {
                    queue.push(ev);
                }
                Instruction::Pull(sender) => {
                    sender.send(queue.pop()).await.unwrap();
                }
            }
        }
    });
}

但由于这一切都必须是异步的,所以我在同步游戏循环中阻塞了pull()函数。 我使用futures-lite创建来实现:

fn event_pull(
    communication: Res<Communication>
) {
    let ev = future::block_on(communication.event_queue.pull());
    println!("got event: {:?}", ev);
}

这个代码可以正常工作,但是大约5秒后整个程序就会停止接收任何事件。

似乎future::block_on()会无限期地阻塞。

将构建和运行bevy::prelude::App的主函数改为异步的tokio::main函数也可能是一个问题。

最好将异步的TcpStream初始化、tokio::sync::mpsc::Sender以及Queue.pull都包装成同步函数,但我不知道如何实现。

有人能帮忙吗?

如何重现

该代码库可以在此处找到。

只需编译serverclient,然后按相同顺序运行两者即可。

1个回答

2

我只需将每个 tokio::sync::mpsc 替换为 crossbeam::channel,就使其正常工作了。但这可能会有问题,因为它会阻塞。

并且需要手动初始化 tokio 运行时。

因此,初始化代码如下:

pub struct Communicator {
    pub event_bridge: bridge::Bridge,
    pub event_queue: event_queue::Queue,
    _runtime: Runtime,
}
impl Communicator {
    pub fn init(ip: &str) -> Self {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_io()
            .build()
            .unwrap();
    
        let (bridge, queue, game_rx) = rt.block_on(async move {
            let socket = TcpStream::connect(ip).await.unwrap();
            let (read, write) = socket.into_split();
            let reader = TcpReader::new(read);
            let writer = TcpWriter::new(write);
        
            let (bridge, tcp_rx, game_rx) = bridge::Bridge::init(); 
            reader::init(bridge.clone(), reader);
            writer::init(tcp_rx, writer);
        
            let event_queue = event_queue::Queue::init();
        
            return (bridge, event_queue, game_rx);
        });
    
        // game of game_rx events to queue for game loop
        let eq_clone = queue.clone();
        rt.spawn(async move {
            loop {
                let event = game_rx.recv().unwrap(); 
                eq_clone.send(event);
            }
        });
    
        Self {
            event_bridge: bridge,
            event_queue: queue,
            _runtime: rt,
        }
    }
}

main.rs 文件看起来像这样:

fn main() {
    let communicator = communication::Communicator::init("0.0.0.0:8000");

    communicator.event_bridge.push_tcp(TcpEvent::Register{name: String::from("luca")});

    App::new()
        .insert_resource(communicator)
        .add_system(event_pull)
        .add_plugins(DefaultPlugins)
        .run();
}

fn event_pull(
    communication: Res<communication::Communicator>
) {
    let ev = communication.event_queue.pull();
    if let Some(ev) = ev {
        println!("ev");
    }
}

也许有更好的解决方案。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接