无法在`Arc`中借用数据作为可变数据

16

我不知道接下来该做什么。看起来我可能误解了一些东西,或者可能还没有学习一些关键的主题。

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/lib.rs:69:23
   |
69 |                 match state.res_rx.recv().await {
   |                       ^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<State>`

2
你的问题可能已经在如何使用Arc在线程之间共享可变对象?的答案中得到了解答。如果没有,那么请**[编辑]**你的问题以解释其中的差异。否则,我们可以将此问题标记为已回答。 - Shepmaster
1
Arc的文档中,强调我的部分:在Rust中,共享引用默认情况下不允许修改,Arc也不例外:通常无法获得指向Arc内部某个内容的可变引用。如果需要通过Arc进行修改,请使用MutexRwLock或其中一个Atomic类型 - Shepmaster
请参见 tokio::sync::Mutex - Shepmaster
1
这个回答解决了你的问题吗?如何使用Arc在线程之间共享可变对象? - Jmb
1
一个单一的消费者通道只能有一个消费者,因此 res_rx 应该由任务拥有,而不是在 Arc 后面共享。 - Jan Hudec
抱歉误导了,昨天是个艰难的一天。我的意思是:我应该如何以最优化的方式避免这种情况。每个链接都帮助我理解了整个情况,最终我想出了一个很好的解决方案。 - Lex
2个回答

10
use std::sync::Arc;

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(something);
    arc.increase();
}

提供

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:16:5
   |
16 |     arc.increase();
   |     ^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Something>`

error: aborting due to previous error; 1 warning emitted

因为它试图将arc作为可变借用,但这需要为Arc实现DerefMut,但由于Arc不可变,因此没有实现该特性。
使用 Mutex 包装您的对象可以解决该问题。
use std::sync::{Arc, Mutex};

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(Mutex::new(something));
    arc.lock().unwrap().increase();
}

现在它可以共享并增加。

问题要求如何做到这一点,但在本应该不应该尝试的情况下。 - Jan Hudec
2
最好不要在异步应用程序中使用 std::sync::Mutex,因为它会阻塞整个系统线程,导致程序死锁。如果您使用 tokio,则最好使用 tokio::sync::Mutex - 它不会阻塞系统线程。 - Lex
2
根据此链接,Actix团队建议使用标准库的mutex。 - fuzzylogical
使用tokio::sync::Mutex进行锁定比使用std::sync::Mutex更加耗费资源。如果你不需要在.await之间保持锁定状态,那么应该使用std中的锁,甚至更好的选择是parking_lot::Mutex - Malted_Wheaties

3

Lucas Zanella的回答和Shepmaster的评论帮助我重构和简化代码。我决定在Proxy::new()函数内传递所有权,而不是使用共享引用。代码变得更易读,并且我避免了对可变的tokio::sync::mpsc::Receiver使用共享引用。也许问题太不结构化了,但由于社区的帮助,我采取了新的方法。重构后的代码如下。

use reqwest::{Client, Error, Response};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};


pub struct Task {
    pub id: u32,
    pub url:  String,
}
pub enum Message{
    Failure(Task, Error),
    Success(Task, Response),
}
pub struct Proxy{
    id: u32,
    max_rps: u16,
    max_pending: u16,
    in_tx: Sender<Task>,
}


async fn send_msg<T>(tx: &Sender<T>, msg: T){
    match tx.send(msg).await {
        Err(error) => { eprintln!("{}", error) },
        _ => (),
    };
}


async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
    loop {
        if let Some(task) = in_rx.recv().await {
            let client_clone = client.clone();
            let res_tx_clone = res_tx.clone();
            tokio::spawn(async move {
                println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
                match client_clone.get(&task.url).send().await {
                    Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
                    Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
                }
            });
        }
    }
}


async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
    loop {
        if let Some(message) = res_rx.recv().await {
            match message {
                Message::Success(task, res) => { 
                    send_msg(
                        &out_tx, 
                        format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
                    ).await;
                },
                Message::Failure(task, err) => {
                    send_msg(&out_tx, err.to_string()).await;
                },
            }
        }
    }
}


impl Proxy{

    pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
        
        let mut client = Client::builder();
        if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        let client = client.build()?;
        let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
        let res_tx_clone = res_tx.clone();
        tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });

        tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
        
        Ok(Proxy{
            id,
            max_rps,
            max_pending,
            in_tx,
        })
    }

    pub fn get_in_tx(&self) -> Sender<Task> {
        self.in_tx.clone()
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接