如何在actix-web的中间件中修改请求数据?

4
有没有推荐的方法来修改接收到的actix-web请求?我正在寻找一种方法来向请求对象添加数据,并使其可用于下游中间件和处理程序进行处理。 中间件文档说:
“Actix-web的中间件系统允许我们向请求/响应处理中添加附加行为。中间件可以钩入传入请求流程,使我们能够修改请求以及停止请求处理以提前返回响应。”
该页面没有关于如何修改请求的示例。
让我们看一下下面的代码(从上面的文档中获取),那么怎么样才能将数据添加到请求中呢?
use std::pin::Pin;
use std::task::{Context, Poll};

use actix_service::{Service, Transform};
use actix_web::{dev::ServiceRequest, dev::ServiceResponse, Error};
use futures::future::{ok, Ready};
use futures::Future;

// There are two steps in middleware processing.
// 1. Middleware initialization, middleware factory gets called with
//    next service in chain as parameter.
// 2. Middleware's call method gets called with normal request.
pub struct SayHi;

// Middleware factory is `Transform` trait from actix-service crate
// `S` - type of the next service
// `B` - type of response's body
impl<S, B> Transform<S> for SayHi
where
    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Request = ServiceRequest;
    type Response = ServiceResponse<B>;
    type Error = Error;
    type InitError = ();
    type Transform = SayHiMiddleware<S>;
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ok(SayHiMiddleware { service })
    }
}

pub struct SayHiMiddleware<S> {
    service: S,
}

impl<S, B> Service for SayHiMiddleware<S>
where
    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Request = ServiceRequest;
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, req: ServiceRequest) -> Self::Future {
        println!("Hi from start. You requested: {}", req.path());

        let fut = self.service.call(req);

        Box::pin(async move {
            let res = fut.await?;

            println!("Hi from response");
            Ok(res)
        })
    }
}
2个回答

4

我也在查找如何做到这一点,但令人失望的是,他们在文档中提到了它,但没有显示任何示例。

我发现你可以按照以下方式编辑标题或扩展名,否则我不知道如何进行编辑。

头文件

  • 从服务请求中获取头文件,let headers = req.headers_mut()
  • 获取HeaderMap然后您可以从那里插入、删除、清除等操作
  • 插入示例:headers.insert(HeaderName::from_lowercase(b"content-length").unwrap(), HeaderValue::from_static("hello"));

扩展名

  • 获取请求的扩展名,let extensions = req.extensions_mut()
  • 添加扩展:extensions.insert("foo".to_string());
  • 一行代码:req.extensions_mut().insert("foo".to_string());
  • 稍后获取该扩

0

从中间件修改请求和相关响应是完全可行的。这里有一个简短的例子(这适用于Actix v4):

use x_contrib::{actix_http, actix_web, body, futures, log, HttpMessage, HttpResponseBuilder};
use std::cell::RefCell;

use log::Level;
use std::pin::Pin;

use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use actix_web::Error;
use futures::future::{ok, Ready};
use futures::task::{Context, Poll};
use std::future::Future;
use std::rc::Rc;
use std::str;

use crate::middleware::utility::{ApiMiddlewareUtility, MiddlewareUtility};
use crate::request;
use x_common::prelude::*;
use x_common::settings;
use x_contrib::actix_http::h1::Payload;
use x_contrib::body::{EitherBody, MessageBody};
use x_contrib::futures::future::err;
use x_contrib::futures::StreamExt;
use x_contrib::web::{Buf, BytesMut};

pub struct LogEvents;

impl<S: 'static, B> Transform<S, ServiceRequest> for LogEvents
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: MessageBody + 'static,
{
    type Response = ServiceResponse<EitherBody<B>>;
    type Error = Error;
    type Transform = LogEventsMiddleware<S>;
    type InitError = ();
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ok(LogEventsMiddleware {
            service: Rc::new(RefCell::new(service)),
        })
    }
}

pub struct LogEventsMiddleware<S> {
    service: Rc<RefCell<S>>,
}

impl<S: 'static, B> Service<ServiceRequest> for LogEventsMiddleware<S>
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: MessageBody + 'static,
{
    type Response = ServiceResponse<EitherBody<B>>;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    actix_web::dev::forward_ready!(service);

    fn call(&self, mut req: ServiceRequest) -> Self::Future {
        let svc = self.service.clone();
        let log_level = settings::get_setting("log_level").unwrap_or("info".to_owned());

        Box::pin(async move {
            match log_level.as_str() {
                "debug" | "trace" | "info" => {
                    let route = req.path().to_owned();

                    /* we only process requests that are json */
                    if !MiddlewareUtility::is_json_request(&req) {
                        let res: ServiceResponse = svc.call(req).await?.map_into_boxed_body();
                        return Ok(res.map_into_right_body());
                    }

                    /* extract and log the request */
                    let mut request_body = BytesMut::new();
                    while let Some(chunk) = req.take_payload().next().await {
                        request_body.extend_from_slice(&chunk?);
                    }

                    match str::from_utf8(&request_body.to_vec().as_slice()) {
                        Ok(str) => {
                            /* identify routes that we will redact the body from,
                            these are items that contain sensitive information we do not want to log
                             */
                            match route.as_str() {
                                "/x/protected_endpoint" => {
                                    tracing::info!({ body = "Redacted" }, "HTTP Request");
                                }
                                _ => {
                                    tracing::info!({body = %str}, "HTTP Request");
                                }
                            }
                        }
                        Err(_) => {}
                    };

                    let (payload_sender, mut orig_payload) = Payload::create(true);
                    orig_payload.unread_data(request_body.freeze());
                    req.set_payload(actix_http::Payload::from(orig_payload));

                    /* extract and log the response */
                    let res: ServiceResponse = svc.call(req).await?.map_into_boxed_body();
                    if !MiddlewareUtility::is_json_response(&res) {
                        return Ok(res.map_into_right_body());
                    }

                    let res_status = res.status().clone();
                    let res_headers = res.headers().clone();
                    let new_request = res.request().clone();
                    let body_bytes = body::to_bytes(res.into_body()).await?;
                    match str::from_utf8(&body_bytes) {
                        Ok(str) => {
                            tracing::info!({body = %str}, "HTTP Response");
                            str
                        }
                        Err(_) => "Unknown",
                    };

                    /* build an identical response */
                    let mut new_response = HttpResponseBuilder::new(res_status);
                    for (header_name, header_value) in res_headers {
                        new_response.insert_header((header_name.as_str(), header_value));
                    }
                    let new_response = new_response.body(body_bytes.to_vec());

                    Ok(ServiceResponse::new(
                        new_request,
                        new_response.map_into_right_body(),
                    ))
                }
                _ => {
                    let res: ServiceResponse = svc.call(req).await?.map_into_boxed_body();
                    Ok(res.map_into_right_body())
                }
            }
        })
    }
}


这个例子与tracing_actix_web集成,它是一个非常好的遥测工具,并且如果请求/响应是json,则将请求/响应记录到jaeger中。
需要注意的一点是,据我所知,一旦读取了请求,就必须重新组装它,响应也是如此。因此,这个例子所做的就是这个。

从哪个板条箱中获取 x_commonx_contrib - Vabka
@Vabka,x_common和x_contrib(我使用的一个中央创建来重新导出依赖项)是我正在使用的内部创建。该示例在另一个系统上不是100%可运行的,需要对配置进行一些修改。但是,所有必要的元素都可以从futures、actix_web等创建中提取。需要进行一些修改才能使其可运行。 - Chris Andrew
好的。刚刚找到了另一种方法。let single_part: Result = Ok(orig_payload); let in_memory_stream = stream::once(single_part); let pinned_stream: Pin>>> = Box::pin(in_memory_stream); let in_memory_payload: Payload = pinned_stream.into(); req.set_payload(in_memory_payload); Ok(svc.call(req).await?) - Vabka
@Vabka,你在v4上测试过吗? - Chris Andrew
@Vabka,啊,是的,这样更好:https://github.com/actix/examples/blob/master/middleware/middleware/src/read_request_body.rs - Chris Andrew
是的,我实际上在我的项目中使用它:https://github.com/vabka/dis/blob/main/src/discord_authorization.rs#L84 - Vabka

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接