如何读取基于Tokio的Hyper请求的整个主体?

19

我想使用当前 Hyper 的主支(master branch)编写一个服务器,该服务器会保存由 POST 请求传递的消息,并将此消息发送给每个传入的 GET 请求。

我有以下代码,大部分是从 Hyper 的示例目录中复制来的:

extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::future::FutureResult;

use hyper::{Get, Post, StatusCode};
use hyper::header::{ContentLength};
use hyper::server::{Http, Service, Request, Response};
use futures::Stream;

struct Echo {
    data: Vec<u8>,
}

impl Echo {
    fn new() -> Self {
        Echo {
            data: "text".into(),
        }
    }
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = FutureResult<Response, hyper::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let resp = match (req.method(), req.path()) {
            (&Get, "/") | (&Get, "/echo") => {
                Response::new()
                    .with_header(ContentLength(self.data.len() as u64))
                    .with_body(self.data.clone())
            },
            (&Post, "/") => {
                //self.data.clear(); // argh. &self is not mutable :(
                // even if it was mutable... how to put the entire body into it?
                //req.body().fold(...) ?
                let mut res = Response::new();
                if let Some(len) = req.headers().get::<ContentLength>() {
                    res.headers_mut().set(ContentLength(0));
                }
                res.with_body(req.body())
            },
            _ => {
                Response::new()
                    .with_status(StatusCode::NotFound)
            }
        };
        futures::future::ok(resp)
    }

}


fn main() {
    pretty_env_logger::init().unwrap();
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo::new())).unwrap();
    println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
    server.run().unwrap();
}

如何将看起来像是一系列数据块的req.body()(似乎是一个Stream)转换为Vec<u8>?我猜想我必须返回一个消费该Stream并将其转换为单个Vec<u8>Future,也许可以使用fold()。但我不知道该如何做。


您始终可以从修订列表中回滚任何您不同意的编辑或执行进一步的编辑。 - Shepmaster
然而,答案是一样的,你需要线程安全的内部可变性,例如 MutexAtomic*RwLock - Shepmaster
请注意,您需要每个问题只提出一个问题 - Shepmaster
1
好的,那我就为第一个问题开一个新的提问。谢谢。 - JayStrictor
1
好的。在这种情况下,我建议您创建一个[MCVE]并使用发布版本的Hyper。链接到我上面建议的问题,并展示为什么它对您的情况无效也会有很大帮助! - Shepmaster
显示剩余5条评论
3个回答

38

Hyper 0.13提供了一个body::to_bytes函数用于此目的。

use hyper::body;
use hyper::{Body, Response};

pub async fn read_response_body(res: Response<Body>) -> Result<String, hyper::Error> {
    let bytes = body::to_bytes(res.into_body()).await?;
    Ok(String::from_utf8(bytes.to_vec()).expect("response was not valid utf-8"))
}

2
响应实现了HttpBody,因此无需显式调用into_body()。而且,Bytes实现了AsRef<[u8]>,因此可以避免使用Vec进行分配。 - Denis Otkidach

15
我将简化问题,只返回总字节数而不是回显整个流。

Futures 0.3

Hyper 0.13 + TryStreamExt::try_fold

如果您只想要所有数据作为一个巨大的块,请参阅euclio的答案有关hyper :: body :: to_bytes 的详细信息。

访问流允许更精细的控制:

use futures::TryStreamExt; // 0.3.7
use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.13.9
use std::convert::Infallible;
use tokio; // 0.2.22

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(service::make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service::service_fn(echo))
    }));

    println!("Listening on http://{}.", server.local_addr());

    if let Err(e) = server.await {
        eprintln!("Error: {}", e);
    }
}

async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let (parts, body) = req.into_parts();
    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body
                .try_fold(Vec::new(), |mut data, chunk| async move {
                    data.extend_from_slice(&chunk);
                    Ok(data)
                })
                .await;

            entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            })
        }
        _ => {
            let body = Body::from("Can only POST to /");
            Ok(Response::new(body))
        }
    }
}

很遗憾,当前Bytes的实现与TryStreamExt::try_concat不再兼容,所以我们必须切换回折叠(fold)。

Futures 0.1

hyper 0.12 + Stream::concat2

自Futures 0.1.14版本开始,你可以使用Stream::concat2将所有数据粘合在一起:

fn concat2(self) -> Concat2<Self>
where
    Self: Sized,
    Self::Item: Extend<<Self::Item as IntoIterator>::Item> + IntoIterator + Default, 

use futures::{
    future::{self, Either},
    Future, Stream,
}; // 0.1.25

use hyper::{server::Server, service, Body, Method, Request, Response}; // 0.12.20

use tokio; // 0.1.14

fn main() {
    let addr = "127.0.0.1:12346".parse().expect("Unable to parse address");

    let server = Server::bind(&addr).serve(|| service::service_fn(echo));

    println!("Listening on http://{}.", server.local_addr());

    let server = server.map_err(|e| eprintln!("Error: {}", e));
    tokio::run(server);
}

fn echo(req: Request<Body>) -> impl Future<Item = Response<Body>, Error = hyper::Error> {
    let (parts, body) = req.into_parts();

    match (parts.method, parts.uri.path()) {
        (Method::POST, "/") => {
            let entire_body = body.concat2();
            let resp = entire_body.map(|body| {
                let body = Body::from(format!("Read {} bytes", body.len()));
                Response::new(body)
            });
            Either::A(resp)
        }
        _ => {
            let body = Body::from("Can only POST to /");
            let resp = future::ok(Response::new(body));
            Either::B(resp)
        }
    }
}

您还可以通过使用entire_body.to_vec()Bytes转换为Vec<u8>,然后将其转换为String

另请参见:

hyper 0.11 + Stream::fold

Iterator :: fold类似,Stream :: fold接受一个累加器(称为init)和一个函数,该函数对累加器和流中的项目进行操作。函数的结果必须是具有与原始错误类型相同的另一个future。总结果本身也是一个future。

fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
where
    F: FnMut(T, Self::Item) -> Fut,
    Fut: IntoFuture<Item = T>,
    Self::Error: From<Fut::Error>,
    Self: Sized,

我们可以使用一个 Vec 作为累加器。 BodyStream 实现返回一个 Chunk。这个实现了 Deref<[u8]>,所以我们可以使用它来将每个块的数据附加到 Vec 中。
extern crate futures; // 0.1.23
extern crate hyper;   // 0.11.27

use futures::{Future, Stream};
use hyper::{
    server::{Http, Request, Response, Service}, Post,
};

fn main() {
    let addr = "127.0.0.1:12346".parse().unwrap();

    let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
    println!(
        "Listening on http://{} with 1 thread.",
        server.local_addr().unwrap()
    );
    server.run().unwrap();
}

struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<futures::Future<Item = Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        match (req.method(), req.path()) {
            (&Post, "/") => {
                let f = req.body()
                    .fold(Vec::new(), |mut acc, chunk| {
                        acc.extend_from_slice(&*chunk);
                        futures::future::ok::<_, Self::Error>(acc)
                    })
                    .map(|body| Response::new().with_body(format!("Read {} bytes", body.len())));

                Box::new(f)
            }
            _ => panic!("Nope"),
        }
    }
}

您还可以将Vec<u8> body转换为String

另请参见:

输出

从命令行调用时,我们可以看到结果:

$ curl -X POST --data hello http://127.0.0.1:12346/
Read 5 bytes

警告

所有这些解决方案都允许恶意终端用户POST一个无限大小的文件,这将导致机器内存耗尽。根据预期使用情况,您可能希望在读取的字节数上建立一定的限制,可能在某个断点处写入文件系统。

另请参见:


1
你能解释一下为什么在fold方法中有futures::future:ok(),尽管你已经从type Future = ...中删除了FutureResult吗? - JayStrictor
2
@JayStrictor 因为给 fold 的闭包需要返回一个 future 本身:F: FnMut(T, Self::Item) -> Fut。这使得操作本身需要时间。由于 extend_from_slice 是同步的,我们使用 future::ok 将结果“提升”起来。这与 type Future = FutureResult 相当独立,后者用作处理程序的返回值(我因懒惰而将其装箱)。 - Shepmaster
Stream::fold(...) 可以被替换为 Stream::concat2(),它们的功能相同。由于 Chunk 本身就是 Extend,所以 concat2 的结果将是一个包含整个主体的单个 Chunk - Arnavion
1
@M.Leonhard 你可能希望在读取的字节数上建立某种限制,在某个断点处将其写入文件系统。我的建议是不要写入文件系统,而只是使用内存,但除此之外还要有一个现有的限制。 - Shepmaster
显示剩余3条评论

-1

这个主题上的大多数答案都已经过时或过于复杂了。解决方案非常简单:

/*
    WARNING for beginners!!! This use statement
    is important so we can later use .data() method!!!
*/
use hyper::body::HttpBody;

let my_vector: Vec<u8> = request.into_body().data().await.unwrap().unwrap().to_vec();
let my_string = String::from_utf8(my_vector).unwrap();

你也可以使用body::to_bytes就像@euclio所回答的那样。这两种方法都很简单!不要忘记正确处理unwrap


1
data 被记录为“返回一个将解析为下一个数据块的未来对象,如果有的话。”,因此我相信这个答案是不正确/不完整的。 - Shepmaster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接