将两个UDP套接字绑定到同一地址,并将它们连接到不同的地址。

3

我需要创建一个由多个进程组成的分布式系统;为了简化问题,我们假设它们有三个:A、B和C。每个进程都知道其他进程的IP地址和端口,以接收/发送UDP数据包。它们都使用相同的UDP端口(假设是端口8080)。

假设A的IP地址是192.168.1.1,B的IP地址是192.168.1.2,C的IP地址是192.168.1.3:

  • A有两个UDP套接字,都绑定到0.0.0.0:8080。第一个连接到192.168.1.2:8080,第二个连接到192.168.1.3:8080。它使用第一个套接字从/向B接收和发送数据报,并使用第二个套接字从/向C接收和发送数据报。
  • B有两个UDP套接字,都绑定到0.0.0.0:8080。第一个连接到192.168.1.1:8080,第二个连接到192.168.1.3:8080。它使用第一个套接字从/向A接收和发送数据报,并使用第二个套接字从/向C接收和发送数据报。
  • C有两个UDP套接字,都绑定到0.0.0.0:8080。第一个连接到192.168.1.1:8080,第二个连接到192.168.1.2:8080。它使用第一个套接字从/向A接收和发送数据报,并使用第二个套接字从/向B接收和发送数据报。

这些进程相互通信并拒绝来自其他进程的数据报

我想要实现的通信可以用以下元组描述。所有行都不同,因此它们应该描述一个有效的设置。

源地址 源端口 目标地址 目标端口 协议
192.168.1.1 8080 192.168.1.2 8080 UDP
192.168.1.1 8080 192.168.1.3 8080 UDP
192.168.1.2 8080 192.168.1.1 8080 UDP
192.168.1.2 8080 192.168.1.3 8080 UDP
192.168.1.3 8080 192.168.1.1 8080 UDP
192.168.1.3 8080 192.168.1.2 8080 UDP

在 Rust + Tokio 中,我应该怎样实现?

在 Rust 中,连接套接字 意味着设定默认的 send() 目标,并通过 recv 限制从指定地址读取数据包。这个概念不仅适用于 Rust:POSIX 套接字(如 UDP 套接字)也是同样的行为

以下是我的想法:

// A's startup code

let sockforb = UdpSocket::bind("0.0.0.0:8080").await?;
let sockforc = UdpSocket::bind("0.0.0.0:8080").await?;

let b_addr = "192.168.1.2:8080".parse::<SocketAddr>().unwrap();
let c_addr = "192.168.1.3:8080".parse::<SocketAddr>().unwrap();

sockforb.connect(b_addr).await?;
sockforc.connect(c_addr).await?;

上面的代码非常方便:我有两个不同的套接字变量,如果我在它们上调用send/recv,我就可以发送/接收数据报到/从所需的进程。
然而,该代码会产生以下错误:
Error: Os { code: 98, kind: AddrInUse, message: "Address already in use" }

作为一种解决方法,我可以只定义一个套接字变量,并将多个地址传递给connect方法(参数的类型为ToSocketAddrs)。这使我只能向指定进程发送/接收数据报。然而,尽管这个解决方案没有错误,但它不太方便,因为我只有一个单独的套接字变量,而不是针对不同进程的多个变量。我的意图是为每个进程在不同的结构体中放置不同的套接字变量。

在Rust + Tokio中,我如何实现这一点,可能使用可移植(非操作系统相关)的代码?


我认为你只需要一层抽象。创建一个类似于 struct SocketEndpoint 的东西,其中包含 updsocket 和地址字段。然后你可以拥有两个 SocketEndpoint,就像你想要的那样,但它们都可以绑定到同一个底层连接上。 - etchesketch
1个回答

1

在类Unix操作系统(如MacOS和Linux)上,可以设置SO_REUSEADDRESSSO_REUSEPORT。我不确定Windows是否支持,但似乎只需要SO_REUSEADDRESS。接下来,我将使用net2库在UdpBuilderUnixUdpBuilderExt中提供的安全抽象来实现:

use std::io;

use net2::{unix::UnixUdpBuilderExt, UdpBuilder};
use tokio::net::UdpSocket;

async fn build_socket(me: u16, to: u16) -> io::Result<std::net::UdpSocket> {
    tokio::task::spawn_blocking(move || -> io::Result<_> {
        let socket = UdpBuilder::new_v4()?
            .reuse_address(true)?
            .reuse_port(true)?
            .bind(("0.0.0.0", me))?;
        socket.set_nonblocking(true)?;
        socket.connect(("127.0.0.1", to))?;
        Ok(socket)
    })
    .await
    .unwrap()
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let handles = (0..3)
        .map(|i| {
            tokio::task::spawn(async move {
                let me = 8080 + i;
                let to1 = 8080 + (i + 1) % 3;
                let to2 = 8080 + (i + 2) % 3;
                let a = build_socket(me, to1).await.unwrap();
                let a = UdpSocket::from_std(a).unwrap();

                let b = build_socket(me, to2).await.unwrap();
                let b = UdpSocket::from_std(b).unwrap();

                a.send(format!("{me} -> {to1}").as_bytes()).await.unwrap();
                b.send(format!("{me} -> {to2}").as_bytes()).await.unwrap();
                let mut buf = [0; 1024];

                if let Ok(n) = a.recv(&mut buf).await {
                    println!("{me} received: {:?}", String::from_utf8_lossy(&buf[..n]));
                }
                if let Ok(n) = b.recv(&mut buf).await {
                    println!("{me} received: {:?}", String::from_utf8_lossy(&buf[..n]));
                }
            })
        })
        .collect::<Vec<_>>();

    for handle in handles {
        handle.await.unwrap();
    }
    println!("finished all sends, returning");
    std::process::exit(0);
}

注意:上面的代码使用同一台机器上的不同端口来表示不同的OP IP,以便更轻松地进行测试。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接