Rusoto使用sigv4进行流式上传

8

我在上传流媒体到S3时遇到了问题:

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),

        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

我使用dd if=/dev/zero of=/tmp/test.bin bs=4k count=500生成一个二进制文件。
尽管我还没有完全理解未来的东西,但我只是试图将某个文件转储到S3中,并且尽可能少地使用内存。
运行时,我会得到以下调试日志输出;可能含有敏感信息。
$ RUST_LOG=debug cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.36s
     Running `target/debug/uploader`
 DEBUG uploader > copy_to_s3: "/tmp/test.bin"
 DEBUG uploader > filename: "test.bin"
 DEBUG rusoto_core::request > Full request:
 method: PUT
 final_uri: https://s3.eu-west-2.amazonaws.com/.../test.bin
Headers:

 DEBUG rusoto_core::request > authorization:"AWS4-HMAC-SHA256 Credential=.../20200408/eu-west-2/s3/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-server-side-encryption, Signature=..."
 DEBUG rusoto_core::request > content-type:"application/octet-stream"
 DEBUG rusoto_core::request > host:"s3.eu-west-2.amazonaws.com"
 DEBUG rusoto_core::request > x-amz-content-sha256:"UNSIGNED-PAYLOAD"
 DEBUG rusoto_core::request > x-amz-date:"20200408T173930Z"
 DEBUG rusoto_core::request > x-amz-security-token:"..."
 DEBUG rusoto_core::request > x-amz-server-side-encryption:"AES256"
 DEBUG rusoto_core::request > user-agent:"rusoto/0.43.0 rust/1.42.0 macos"
 DEBUG hyper::client::connect::dns > resolving host="s3.eu-west-2.amazonaws.com"
 DEBUG hyper::client::connect::http > connecting to 52.95.148.48:443
 DEBUG hyper::client::connect::http > connected to 52.95.148.48:443
 DEBUG hyper::proto::h1::io         > flushed 1070 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 147600 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 16405 bytes
 DEBUG hyper::proto::h1::io         > read 291 bytes
 DEBUG hyper::proto::h1::io         > parsed 7 headers
 DEBUG hyper::proto::h1::conn       > incoming body is chunked encoding
 DEBUG hyper::proto::h1::io         > read 345 bytes
 DEBUG hyper::proto::h1::decode     > incoming chunked header: 0x14D (333 bytes)
 DEBUG hyper::proto::h1::conn       > incoming body completed
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "Header" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "RequestId" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "HostId" in error response.
 ERROR uploader                       > Failure: Unknown(BufferedHttpResponse {status: 501, body: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>3F1A03D67D81CCAB</RequestId><HostId>...=</HostId></Error>", headers: {"x-amz-request-id": "3F1A03D67D81CCAB", "x-amz-id-2": "...", "content-type": "application/xml", "transfer-encoding": "chunked", "date": "Wed, 08 Apr 2020 17:39:30 GMT", "connection": "close", "server": "AmazonS3"} })
 DEBUG uploader                       > DONE: copy_to_s3: "/tmp/test.bin"

我认为这告诉我它不是一个sigv4签名的上传,但我不确定。

在大部分情况下,调试输出看起来像是成功地按块发送文件,但后来出现了错误...

鉴于我对它发送的是sigv2而不是sigv4的假设,我该如何使其发送sigv4标头呢?如果无法实现,我错过了什么?


1
很难回答你的问题,因为它没有包含一个 [MRE]。我们无法确定代码中存在什么 crates(及其版本)、types、traits、fields等等。如果可能的话,您可以在Rust Playground上尝试复制您的错误,否则您可以在一个全新的Cargo项目中进行尝试,然后将附加信息编辑入您的问题中以便于我们帮助您。在此处提问时,您可以使用 Rust-specific MRE tips来简化您的原始代码。谢谢! - Shepmaster
1
好的,我会很快回复。 - pms1969
2
完成。添加了MRE。添加了更完整的输出。 - pms1969
1个回答

2

应该指定Content-Length。

更改的部分

let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

固定示例的完整文本

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

代码的功能正常,但是你需要事先知道文件的长度。

非常感谢。非常感激。您能解释一下发生了什么以及为什么需要文件长度吗?我猜内容长度是sigv4头的一部分吗? - pms1969

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接