使用@aws-sdk/lib-storage和JSONStream.stringify()将MongoDB中的JSON数据流式传输到S3。

7
我将尝试使用@aws-sdk/lib-storage的新版本将JSON从MongoDB流式传输到S3:
"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",

尝试 #1:似乎我没有正确使用 JSONStream.stringify():

import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

错误 #1:

类型错误 [ERR_INVALID_ARG_TYPE]: 第一个参数必须是字符串、缓冲区、ArrayBuffer、数组或类数组对象之一。接收到的类型为对象 位于 Function.from (buffer.js:305:9) 位于 getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 位于 processTicksAndRejections (internal/process/task_queues.js:94:5) 位于 Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 位于 Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在 async Promise.all (index 0) 函数内 位于 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) 位于 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

尝试 #2,使用变量 jsonStream:

  const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    const jsonStream = readStream.pipe(JSONStream.stringify());
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: jsonStream,
      },
    });

错误 #2:

引用错误:ReadableStream 未定义 在对象.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)

尝试 #3:使用 stream.PassThrough

    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));

...

const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
  try{

    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    const res = await upload.done();
    
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    return;
  }
};

错误 #3:

'dest.on 不是一个函数,位于 Stream.pipe (internal/streams/legacy.js:30:8'

尝试 #4:使用 mongodb.stream({transform: doc => JSON.stringify...}) 替代 JSONStream:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName)
      .find('{}')
      .limit(5)
      .stream({ transform: doc => JSON.stringify(doc) + '\n' });
  
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: readStream,
      },
    });
  
    await upload.done(); 
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};

错误: #4:

类型错误 [ERR_INVALID_ARG_TYPE]: 第一个参数必须是字符串、缓冲区(Buffer)、数组(ArrayBuffer)、数组(Array)或类似数组(Array-like Object)之一。 收到的类型为对象 位于 Function.from (buffer.js:305:9) 在 getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 在 processTicksAndRejections (internal/process/task_queues.js:94:5) 在 Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 在 Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在 async Promise.all (index 0) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)

尝试 #5: 使用 stream.PassThrough() 并将 pass 返回给 pipe:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
    readStream.pipe(uploadStreamFile());
  }
  catch (err) {
    log.error('waaaaa', err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


const stream = require('stream');

export const uploadStreamFile = async() => {
  try{
    const pass = new stream.PassThrough();
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: pass,
      },
    });
    await upload.done();
    return pass;
  }
  catch(err){
    log.error('pawoooooo', err);
    return;
  }
};

错误 #5:

类型错误:dest.on 不是一个函数 位于 Cursor.pipe (_stream_readable.js:680:8)


谢谢,但我得到了相同的错误。我成功地将其导入到fs.createWriteStream中并写入文件,稍后我可以使用fs.createReadStream并将其导入到我的“uploadStreamFile”中,这样它就能正常工作。但我不喜欢这个解决方案,因为它会导致我的服务器将MongoDB的响应写入临时文件,而不是直接流式传输到S3。 - Oron Bendavid
谢谢,完整的堆栈跟踪是:'dest.on不是一个函数 在Stream.pipe(internal/streams/legacy.js:30:8)' - Oron Bendavid
我已经更新了问题,并附上了所有相关的错误信息。 - Oron Bendavid
我已经删除了注释,因为它们已经包含在答案中。希望任何一种提出的替代方案都能对您有所帮助。 - jccampanero
2个回答

2
经过查看您的错误堆栈,问题可能与MongoDB驱动程序 提供了一个对象模式的游标 有关,而UploadBody参数需要一个传统的流,适合在这种情况下由Buffer处理。
以您的原始代码为参考,您可以尝试提供一个Transform流来处理这两个要求。
例如,请考虑以下代码:
import { Transform } from 'stream';
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const readStream = db.collection(collectionName).find('{}').limit(5).stream();
    // We are creating here a Transform to adapt both sides
    const toJSONTransform = new Transform({
      writableObjectMode: true,
      transform(chunk, encoding, callback) {
        this.push(JSON.stringify(chunk) + '\n');
        callback();  
      }  
    });

    readStream.pipe(toJSONTransform);
 
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: 'extracted-data/benda_mongo.json',
        Body: toJSONTransform,
      },
    });
    
    await upload.done(); 
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }

};

在代码中,我们在toJSONTransform中将可写部分定义为对象模式;相比之下,可读部分适合从S3 Upload方法中读取...至少我希望如此。
关于您报告的第二个错误,与dest.on有关,我最初认为,并且我向您提到了可能性,错误是由于在uploadStreamFile中您返回了一个Promise而不是流,并且您将该Promise传递给了需要流的pipe方法,基本上是您返回了错误的变量。但我没有意识到您正在尝试将PassThrough流作为参数传递给Upload方法:请注意,这个流不包含任何信息,因为您没有传递任何信息,从MongoDB查询获得的可读流的内容从未传递到回调函数或Upload本身。

谢谢,我也试过了。使用以下语法运行代码:'Body: readStream.pipe(JSONStream.stringify())',会返回以下错误:'ReferenceError: ReadableStream未定义'。 - Oron Bendavid
很抱歉听到这个消息。请问您能否在问题中提供更多关于错误堆栈跟踪的信息?我认为这可能是相关的。 - jccampanero
谢谢,我已经更新了问题,并附上了所有相关的错误信息。 - Oron Bendavid
非常感谢@OronBen-David。我更新了答案,提供了更多的替代方案。关于“ReferenceError: ReadableStream未定义”,我认为这与使用JSONStream.stringify获得的流只能在我们的用例中可写,但不能同时可读有关:我想我误解了库文档,它可以像转换一样既可写又可读,但恐怕不是同时。这个错误很有道理。 - jccampanero
1
太好了@OronBen-David。听到它正常工作,我感到非常高兴。非常感谢您分享基于stream.PassThrough的解决方案。 - jccampanero
显示剩余3条评论

2

我发现了一种使用stream.PassThrough的额外解决方案,使用JSONStream可以流式传输对象数组而不是一个接一个地传输:

export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
  let client;

  try {
    client = await MongoClient.connect(connectionString);
    const db = client.db();
    const passThroughStream = new stream.PassThrough();
    const readStream = db.collection(collectionName)
      .find('{}')
      .stream();

    readStream.on('end', () => passThroughStream.end());

    readStream.pipe(JSONStream.stringify()).pipe(passThroughStream);
    await uploadStreamFile('benda_mongo.json', passThroughStream);
  }
  catch (err) {
    log.error(err);
    throw err.name;
  }
  finally {
    if (client) {
      client.close();
    }
  }
};


export const uploadStreamFile = async(fileName, stream) => {
  try{
    log.info('start uploading file', fileName);
    const upload = new Upload({
      client: s3Client,
      params: {
        Bucket: 'test-bucket',
        Key: `${fileName}`,
        Body: stream,
      },
    });

    const res = await upload.done();
    log.info('finished uploading file', fileName);
    return res;
  }
  catch(err){
    log.error(err);
    return;
  }
};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接