"@aws-sdk/client-s3": "^3.17.0"
"@aws-sdk/lib-storage": "^3.34.0"
"JSONStream": "^1.3.5",
尝试 #1:似乎我没有正确使用 JSONStream.stringify():
import { MongoClient } from 'mongodb';
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error(err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
错误 #1:
类型错误 [ERR_INVALID_ARG_TYPE]: 第一个参数必须是字符串、缓冲区、ArrayBuffer、数组或类数组对象之一。接收到的类型为对象 位于 Function.from (buffer.js:305:9) 位于 getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 位于 processTicksAndRejections (internal/process/task_queues.js:94:5) 位于 Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 位于 Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在 async Promise.all (index 0) 函数内 位于 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) 位于 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
尝试 #2,使用变量 jsonStream
:
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
const jsonStream = readStream.pipe(JSONStream.stringify());
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: jsonStream,
},
});
错误 #2:
引用错误:ReadableStream 未定义 在对象.getChunk (/.../node_modules/@aws-sdk/lib-storage/src/chunker.ts:22:30) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:187:24) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:37)
尝试 #3:使用 stream.PassThrough
:
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream();
readStream.pipe(JSONStream.stringify()).pipe(uploadStreamFile('benda_mongo.json'));
...
const stream = require('stream');
export const uploadStreamFile = async(fileName) => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
const res = await upload.done();
log.info('finished uploading file', fileName);
return res;
}
catch(err){
return;
}
};
错误 #3:
'dest.on 不是一个函数,位于 Stream.pipe (internal/streams/legacy.js:30:8'
尝试 #4:使用 mongodb.stream({transform: doc => JSON.stringify...}) 替代 JSONStream:
import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import { env } from '../../../env';
const s3Client = new S3Client({ region: env.AWS_REGION });
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName)
.find('{}')
.limit(5)
.stream({ transform: doc => JSON.stringify(doc) + '\n' });
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: readStream,
},
});
await upload.done();
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
错误: #4:
类型错误 [ERR_INVALID_ARG_TYPE]: 第一个参数必须是字符串、缓冲区(Buffer)、数组(ArrayBuffer)、数组(Array)或类似数组(Array-like Object)之一。 收到的类型为对象 位于 Function.from (buffer.js:305:9) 在 getDataReadable (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getDataReadable.ts:6:18) 在 processTicksAndRejections (internal/process/task_queues.js:94:5) 在 Object.getChunkStream (/.../node_modules/@aws-sdk/lib-storage/src/chunks/getChunkStream.ts:17:20) 在 Upload.__doConcurrentUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:121:22) 在 async Promise.all (index 0) 在 Upload.__doMultipartUpload (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:196:5) 在 Upload.done (/.../node_modules/@aws-sdk/lib-storage/src/Upload.ts:88:12)
尝试 #5: 使用 stream.PassThrough()
并将 pass
返回给 pipe
:
export const uploadMongoStreamToS3 = async (connectionString, collectionName) => {
let client;
try {
client = await MongoClient.connect(connectionString);
const db = client.db();
const readStream = db.collection(collectionName).find('{}').limit(5).stream({ transform: doc => JSON.stringify(doc) + '\n' });
readStream.pipe(uploadStreamFile());
}
catch (err) {
log.error('waaaaa', err);
throw err.name;
}
finally {
if (client) {
client.close();
}
}
};
const stream = require('stream');
export const uploadStreamFile = async() => {
try{
const pass = new stream.PassThrough();
const upload = new Upload({
client: s3Client,
params: {
Bucket: 'test-bucket',
Key: 'extracted-data/benda_mongo.json',
Body: pass,
},
});
await upload.done();
return pass;
}
catch(err){
log.error('pawoooooo', err);
return;
}
};
错误 #5:
类型错误:dest.on 不是一个函数 位于 Cursor.pipe (_stream_readable.js:680:8)