向S3对象追加数据

144
假设我有一台机器,我希望能够将数据写入存储在S3存储桶中的某个日志文件。
所以,机器需要具备向该存储桶写入数据的权限,但是我不希望它有覆盖或删除该存储桶中任何文件(包括我要写入的文件)的能力。
基本上,我只想让我的机器能够将数据追加到该日志文件中,而不覆盖或下载它。
有没有办法配置我的S3使其按照我想要的方式工作?也许有一些IAM策略可以附加到它上面,以便按照我的意愿工作?

您无法在S3中修改对象。您能否只是附加一个新的日志文件?这将是更好的模型,并支持多个同时客户端。 - jarmod
@jarmod 是的,我考虑过这个问题,但问题在于如果攻击者成功访问了我的服务器,他将有能力在文件被发送到S3存储桶之前删除存储在本地的文件(假设这发生在一天结束时)。 - Theodore
您可能还想查看CloudWatch日志。让它管理收集和存储日志的复杂性,提供搜索功能、保留策略,并允许您根据可自定义的指标生成警报。 - jarmod
1
你也可以看看Google BigQuery。你可以使用它来解决你的问题。 - Daniel777
11个回答

195

很遗憾,你不能。

S3没有"append"操作。* 一旦对象被上传,就无法在原地修改它;你唯一的选择是上传一个新对象来替换它,这不符合你的需求。

*:是的,我知道这篇帖子已经几年了。但它仍然准确无误。


我可以了解一下,是否可以通过使用多部分上传来实现这个功能? - Anjali
4
多部分上传功能可让您将数据传输到S3而无需下载原始对象,但它不允许直接覆盖原始对象。请参见例如https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html 您可以删除旧对象/重命名新对象。然而,这不是问题所询问的。 - MikeGM
3
我认为使用分段上传可能会奏效。你的所有部分都是同一个文件的连续片段。如果这一部分成功上传,最终你可以提交上传以便能够读取该文件。因此,只要你不需要读取文件的内容,就可以使用同一个分段上传进行追加。 - cerebrotecnologico
2
@cerebrotecnologico 我仍然认为它不符合 OP 的要求。据我所知,没有办法限制 S3 用户执行追加到对象的分段上传--如果他们可以执行分段上传,他们可以上传任何内容。 - user149341
1
可以提供“追加接口”,就像s3fs所做的那样,但只能通过“无上传复制+部分上传+重写原始文件”的方式实现,正如@duskwuff-inactive所提到的。 - Kache

32

如接受的答案所述,您无法使用 Linux 命令行中的 >>。我所知道的最佳解决方案是使用:

AWS Kinesis Firehose

https://aws.amazon.com/kinesis/firehose/

他们的代码示例看起来很复杂,但你可以非常简单地执行 PUT(或批量PUT)操作到你的应用程序中的 Kinesis Firehose 提交流(使用 AWS SDK),并且在 AWS Kinesis Firehose 控制台中配置 Kinesis Firehose 提交流将你的流数据发送到你选择的 AWS S3 存储桶。

输入图像说明

这仍然不像在 Linux 命令行中那样便捷,因为一旦你在 S3 上创建了一个文件,你需要再次下载、追加和上传新文件。但你每批行只需执行一次而不是每行都需要执行一次,因此你不必担心由于追加操作的数量而产生巨额费用。也许可以做到,但我无法从控制台上看出如何做到。


11
请注意,执行此操作的时间有一个最大值(文件创建后900秒)或一个最大大小(128mb文件大小) - 这意味着Kinesis firehose将追加到同一个S3文件,直到达到其中任何一个限制: https://docs.aws.amazon.com/firehose/latest/dev/create-configure.html - Yaron Budowski
2
您能否在Firehose上使用单个S3文件作为输出?将多个文件合并到一个S3存储桶中似乎有些混乱。 - Jón Trausti Arason
1
很遗憾,没有更好的解决方案。我也希望有更好的解决办法。 - Sridhar Sarnobat
2
是的,这很不幸。如果我手动下载并将记录附加到单个S3对象中,我最担心的是竞态条件。我一直在考虑将记录添加到SQS中,然后使用SNS + Lambda的某些逻辑来轮询SQS,然后将新条目写入S3对象。 - Jón Trausti Arason

15

S3上的对象不支持追加。在这种情况下,您有两个解决方案:

  1. 将所有S3数据复制到一个新对象中,追加新内容并写回到S3。
function writeToS3(input) {
    var content;
    var getParams = {
        Bucket: 'myBucket', 
        Key: "myKey"
    };

    s3.getObject(getParams, function(err, data) {
        if (err) console.log(err, err.stack);
        else {
            content = new Buffer(data.Body).toString("utf8");
            content = content + '\n' + new Date() + '\t' + input;
            var putParams = {
                Body: content,
                Bucket: 'myBucket', 
                Key: "myKey",
                ACL: "public-read"
             };

            s3.putObject(putParams, function(err, data) {
                if (err) console.log(err, err.stack); // an error occurred
                else     {
                    console.log(data);           // successful response
                }
             });
        }
    });  
}
  1. 第二个选择是使用Kinesis Firehose。这相当简单。您需要创建Firehose交付流并将目标链接到S3存储桶即可。
function writeToS3(input) {
    var content = "\n" + new Date() + "\t" + input;
    var params = {
      DeliveryStreamName: 'myDeliveryStream', /* required */
      Record: { /* required */
        Data: new Buffer(content) || 'STRING_VALUE' /* Strings will be Base-64 encoded on your behalf */ /* required */
      }
    };

    firehose.putRecord(params, function(err, data) {
      if (err) console.log(err, err.stack); // an error occurred
      else     console.log(data);           // successful response
    }); 
}

你能否将单个S3文件用作输出? - Jón Trausti Arason

7
您可以:
  1. 设置分段上传
  2. 调用UploadPartCopy,指定现有的s3对象作为源
  3. 使用要附加的数据调用UploadPart
  4. 关闭分段上传。

有一些限制,例如您的现有对象必须大于5MB(但如果较小,则将其复制到客户端应该足够快速地满足大多数情况)。 这不如直接追加来得好,但至少您不需要在AWS和本地计算机之间来回复制数据。


3

如果有人想要在类似S3的服务中向对象附加数据,阿里云 OSS(对象存储服务)原生支持此功能

OSS提供附加上传(通过AppendObject API),允许您直接将内容附加到对象的末尾。使用此方法上传的对象是可附加对象,而使用其他方法上传的对象是普通对象。附加的数据可以立即读取。


3
我们面临的问题是创建一个数千兆字节大小的s3文件,而不需要将其全部加载到RAM中。下面的方法通过将多个文件附加在彼此末尾来组合它们,因此根据您的需求,这可能是一个可行的解决方案。
我们想出的解决方案是:
1. 将文件分块上传到AWS S3文件夹中 2. 运行AWS Athena来定义基于该S3文件夹的表
CREATE EXTERNAL TABLE IF NOT EXISTS `TrainingDB`.`TrainingTable` (`Data` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('collection.delim' = '\n')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your-bucket-name/TrainingTesting/';


通过运行以下操作,生成该表中所有结果的组合:
UNLOAD (SELECT * FROM "TrainingDB"."TrainingTable") 
TO 's3://your-bucket/TrainingResults/results5' 
WITH ( format = 'TEXTFILE', compression='none' )

这将把所有文件追加到彼此的末尾,并为您提供一个包含您试图追加的所有块的文件。如果您只想组合几个小文件,则这种方法过于复杂,此时仅将原始文件拉下并写入末尾可能会更好(正如其他答案所建议的那样)。

2

正如其他人之前所述,S3对象不可附加。
然而,另一个解决方案是将内容写入CloudWatch日志,然后导出您想要的日志到S3。这也可以防止任何攻击者从您的服务器删除S3存储桶中的内容,因为Lambda不需要任何S3权限。

Original Answer翻译成“最初的回答”


1
这是原问题的一个好解决方案。我们不应该问“我无法让Y解决X,我该如何让Y工作?”而是应该问“我该如何解决X?”我认为这种方式更好地解决了这个问题。 - four43

1

我曾经遇到过类似的问题,需要在长时间运行的过程中将错误写入S3日志文件。因此,我没有本地文件来创建一次性流,而是必须在运行时将错误附加到一个文件中。

所以你可以保持与特定文件的开放连接,并在需要时向该文件写入:

const { S3 } = require('aws-sdk')
const { PassThrough } = require('stream')

// append to open connection
const append = (stream, data ) => new Promise(resolve => {
  stream.write(`${data}\n`, resolve)
})

const openConnectionWithS3 = async () => {
  const s3 = new S3({
    credentials: {
      accessKeyId: process.env.AWS_ACCESS_KEY_ID,
      secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
    },
    endpoint: process.env.AWS_S3_ENDPOINT,
    region: process.env.AWS_DEFAULT_REGION,
  })
  const fileName = 'test.log'
  const bucketName = 'my-bucket'
  // create pass through stream. This stream we use to write data to
  // but this stream we also use to pass the same data to aws
  const pass = new PassThrough()

  // dont resolve the promise, but keep it open and await for the result when the long running process is done
  const promise = s3
    .upload({
      Bucket: bucketName,
      Key: fileName,
      // pass the stream as body, aws will handle the stream from now
      Body: pass,
    })
    .promise()

  // write data to our open connection.
  // we can even write it on different places
  for (let i = 0; i < 100000; i++) {
    await append(pass, `foo${i}`)
  }

  // here we resolve the promise and close the connection
  await Promise.all([
    // push null to the stream, the stream now knows after the
    // 1000 foo's it should stop writing
    pass.push(null),
    promise,
  ])
}

openConnectionWithS3()

它将在S3中向文件追加项目,并在完成时解决。

以上方法的问题在于这不是真正的流式传输。您的程序首先将所有内容写入流中,然后才将其上传到S3。因此,内存必须与文件一样大,如果在中途失败,则必须从头开始。我检查了内存,它会不断增长,直到整个流被填满。 - isaac.hazan

1

S3存储桶不允许您追加现有对象,可以使用的方法是,首先使用获取方法从S3存储桶中获取数据,然后在本地添加要追加的新数据,最后将其推回S3存储桶。

由于不可能向现有的S3对象追加内容,因此您需要用一个包含追加数据的新对象替换它。这意味着每次追加新条目时,您都需要上传整个对象(日志文件)。这样做效率不高。

您可以将日志条目发送到SQS队列中,当队列大小达到设定值时,您可以将日志消息合并为一起,并添加为S3存储桶中的一个对象。但这仍然无法满足您追加到单个对象的要求。


0
是的,你可以使用s3fs实现这个功能。
import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Create a file just like you do on a local system
path_to_your_file = "s3://my-bucket/my-key/my_file.txt

with s3.open('path_to_your_file, 'w') as f:
    f.write(f"This is a new QA file!\n")

# Now append to the file just like you do on a local system.
with s3.open('path_to_your_file, 'a') as f:
    f.write(f"----------------------------------------------------------!\n")

如果你在s3上检查文件,你会看到添加了虚线。 你必须配置s3fs与你的本地(CLI工具)配合使用。
希望对你有所帮助!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接