如何使用批处理在Firestore中更新超过500个文档?

33

我正在尝试使用Firestore管理时间戳更新包含500个以上文档的集合中的字段timestamp

const batch = db.batch();
const serverTimestamp = admin.firestore.FieldValue.serverTimestamp();

db
  .collection('My Collection')
  .get()
  .then((docs) => {
    serverTimestamp,
  }, {
    merge: true,
  })
  .then(() => res.send('All docs updated'))
  .catch(console.error);

这会抛出一个错误

{ Error: 3 INVALID_ARGUMENT: cannot write more than 500 entities in a single call
    at Object.exports.createStatusError (C:\Users\Growthfile\Desktop\cf-test\functions\node_modules\grpc\src\common.js:87:15)
    at Object.onReceiveStatus (C:\Users\Growthfile\Desktop\cf-test\functions\node_modules\grpc\src\client_interceptors.js:1188:28)
    at InterceptingListener._callNext (C:\Users\Growthfile\Desktop\cf-test\functions\node_modules\grpc\src\client_interceptors.js:564:42)
    at InterceptingListener.onReceiveStatus (C:\Users\Growthfile\Desktop\cf-test\functions\node_modules\grpc\src\client_interceptors.js:614:8)
    at callback (C:\Users\Growthfile\Desktop\cf-test\functions\node_modules\grpc\src\client_interceptors.js:841:24)
  code: 3,
  metadata: Metadata { _internal_repr: {} },
  details: 'cannot write more than 500 entities in a single call' }

有没有一种递归方法,可以创建一个批量对象,逐个更新500个文档的一批批,直到所有文档都被更新。

根据文档,我知道可以使用递归方法进行删除操作,如此处所述:

https://firebase.google.com/docs/firestore/manage-data/delete-data#collections

但是,对于更新操作,我不确定如何结束执行,因为文档并没有被删除。


1
为什么不遍历所有500个文档,更新并使用最后一个文档键来构建startAt以创建新的查询? - Borko Kovacev
你可以递归地进行限制和批处理,我曾经遇到过同样的问题,这是我的解决方案:https://dev59.com/A7roa4cB1Zd3GeqPn56S#61639536 - Stathis Ntonas
10个回答

69

我也遇到了在Firestore集合内更新超过500个文档的问题,现在我想分享一下我是如何解决这个问题的。

我使用云函数来更新Firestore中的集合,但这也适用于客户端代码。

该解决方案计算批处理中进行的每个操作的数量,在达到限制后创建一个新的批处理,并将其推送到batchArray中。

在所有更新完成后,该代码循环遍历batchArray并提交数组中包含的每个批次。

重要的是要计算批处理中进行的每个操作set(),update(),delete(),因为它们都计入了500次操作限制。

const documentSnapshotArray = await firestore.collection('my-collection').get();

const batchArray = [];
batchArray.push(firestore.batch());
let operationCounter = 0;
let batchIndex = 0;

documentSnapshotArray.forEach(documentSnapshot => {
    const documentData = documentSnapshot.data();

    // update document data here...

    batchArray[batchIndex].update(documentSnapshot.ref, documentData);
    operationCounter++;

    if (operationCounter === 499) {
      batchArray.push(firestore.batch());
      batchIndex++;
      operationCounter = 0;
    }
});

batchArray.forEach(async batch => await batch.commit());

return;

11
如何确保所有批次都能成功执行,因为只有批内操作是原子性的。如果有些批次执行而另一些没有执行,将导致数据不一致。 - Adarsh
1
@Mihae Kheel 是的,循环在完成500个操作后会创建一个新的批次,但是重要的是要计算每个操作,还需要一些形式的错误处理。 - Sebastian Vischer
@saurabh 我从未尝试过处理这么多文档。也许提交有某种限制。我喜欢你的解决方案,即在执行500个操作后提交批处理。在我看来,这是更简单的解决方案。 - Sebastian Vischer
1
@Cedric 不是这样的,batch.commit() 返回一个 Promise,而 Promise.all() 等待 Promise 数组。像你所做的那样在内部回调中添加 async/await 会使 Promise.all() 等待一组未定义的值。无论哪种方式,所有的 batch.commit() 调用都会解决。因此,在这种情况下,任何一种方法都可能有效,但原始方法是正确的。 - jscuba
1
@Cedric,你说得对。在内部回调中添加async/await只会增加另一层Promise。感谢你提出这个问题,让我更好地理解了它。 - jscuba
显示剩余18条评论

28

我喜欢这个简单的解决方案:

const users = await db.collection('users').get()

const batches = _.chunk(users.docs, 500).map(userDocs => {
    const batch = db.batch()
    userDocs.forEach(doc => {
        batch.set(doc.ref, { field: 'myNewValue' }, { merge: true })
    })
    return batch.commit()
})

await Promise.all(batches)

只需要记得在顶部添加import * as _ from "lodash"。参考此答案


1
使用 TypeScript...我没有看到任何 TypeScript。 - Matt Fletcher
3
这应该成为官方文档的一部分,或者至少有类似的东西可以不依赖于lodash。非常好用! :) - Michel K
1
如果您想要类型支持,请安装@types/lodash。@MattFletcher使用Vanilla JS编写了loadash。 - tai C

8
你可以使用默认的BulkWriter。该方法使用500/50/5规则。
示例:
let bulkWriter = firestore.bulkWriter();

bulkWriter.create(documentRef, {foo: 'bar'});
bulkWriter.update(documentRef2, {foo: 'bar'});
bulkWriter.delete(documentRef3);
await close().then(() => {
  console.log('Executed all writes');
});

4

如上所述,@Sebastian的回答很好,我也点赞了。虽然在一次性更新25000多个文档时遇到了问题。 逻辑的微调如下。

console.log(`Updating documents...`);
let collectionRef = db.collection('cities');
try {
  let batch = db.batch();
  const documentSnapshotArray = await collectionRef.get();
  const records = documentSnapshotArray.docs;
  const index = documentSnapshotArray.size;
  console.log(`TOTAL SIZE=====${index}`);
  for (let i=0; i < index; i++) {
    const docRef = records[i].ref;
    // YOUR UPDATES
    batch.update(docRef, {isDeleted: false});
    if ((i + 1) % 499 === 0) {
      await batch.commit();
      batch = db.batch();
    }
  }
  // For committing final batch
  if (!(index % 499) == 0) {
    await batch.commit();
  }
  console.log('write completed');
} catch (error) {
  console.error(`updateWorkers() errored out : ${error.stack}`);
  reject(error);
}

1

根据以上所有答案,我整理了以下代码片段,可以将其放入JavaScript后端和前端的模块中,轻松使用Firestore批量写入,而不必担心500次写入限制。

后端(Node.js)

// The Firebase Admin SDK to access Firestore.
const admin = require("firebase-admin");
admin.initializeApp();

// Firestore does not accept more than 500 writes in a transaction or batch write.
const MAX_TRANSACTION_WRITES = 499;

const isFirestoreDeadlineError = (err) => {
  console.log({ err });
  const errString = err.toString();
  return (
    errString.includes("Error: 13 INTERNAL: Received RST_STREAM") ||
    errString.includes("Error: 4 DEADLINE_EXCEEDED: Deadline exceeded")
  );
};

const db = admin.firestore();

// How many transactions/batchWrites out of 500 so far.
// I wrote the following functions to easily use batchWrites wthout worrying about the 500 limit.
let writeCounts = 0;
let batchIndex = 0;
let batchArray = [db.batch()];

// Commit and reset batchWrites and the counter.
const makeCommitBatch = async () => {
  console.log("makeCommitBatch");
  await Promise.all(batchArray.map((bch) => bch.commit()));
};

// Commit the batchWrite; if you got a Firestore Deadline Error try again every 4 seconds until it gets resolved.
const commitBatch = async () => {
  try {
    await makeCommitBatch();
  } catch (err) {
    console.log({ err });
    if (isFirestoreDeadlineError(err)) {
      const theInterval = setInterval(async () => {
        try {
          await makeCommitBatch();
          clearInterval(theInterval);
        } catch (err) {
          console.log({ err });
          if (!isFirestoreDeadlineError(err)) {
            clearInterval(theInterval);
            throw err;
          }
        }
      }, 4000);
    }
  }
};

//  If the batchWrite exeeds 499 possible writes, commit and rest the batch object and the counter.
const checkRestartBatchWriteCounts = () => {
  writeCounts += 1;
  if (writeCounts >= MAX_TRANSACTION_WRITES) {
    batchIndex++;
    batchArray.push(db.batch());
    writeCounts = 0;
  }
};

const batchSet = (docRef, docData) => {
  batchArray[batchIndex].set(docRef, docData);
  checkRestartBatchWriteCounts();
};

const batchUpdate = (docRef, docData) => {
  batchArray[batchIndex].update(docRef, docData);
  checkRestartBatchWriteCounts();
};

const batchDelete = (docRef) => {
  batchArray[batchIndex].delete(docRef);
  checkRestartBatchWriteCounts();
};

module.exports = {
  admin,
  db,
  MAX_TRANSACTION_WRITES,
  checkRestartBatchWriteCounts,
  commitBatch,
  isFirestoreDeadlineError,
  batchSet,
  batchUpdate,
  batchDelete,
};

前端

// Firestore does not accept more than 500 writes in a transaction or batch write.
const MAX_TRANSACTION_WRITES = 499;

const isFirestoreDeadlineError = (err) => {
  return (
    err.message.includes("DEADLINE_EXCEEDED") ||
    err.message.includes("Received RST_STREAM")
  );
};

class Firebase {
  constructor(fireConfig, instanceName) {
    let app = fbApp;
    if (instanceName) {
      app = app.initializeApp(fireConfig, instanceName);
    } else {
      app.initializeApp(fireConfig);
    }
    this.name = app.name;
    this.db = app.firestore();
    this.firestore = app.firestore;
    // How many transactions/batchWrites out of 500 so far.
    // I wrote the following functions to easily use batchWrites wthout worrying about the 500 limit.
    this.writeCounts = 0;
    this.batch = this.db.batch();
    this.isCommitting = false;
  }

  async makeCommitBatch() {
    console.log("makeCommitBatch");
    if (!this.isCommitting) {
      this.isCommitting = true;
      await this.batch.commit();
      this.writeCounts = 0;
      this.batch = this.db.batch();
      this.isCommitting = false;
    } else {
      const batchWaitInterval = setInterval(async () => {
        if (!this.isCommitting) {
          this.isCommitting = true;
          await this.batch.commit();
          this.writeCounts = 0;
          this.batch = this.db.batch();
          this.isCommitting = false;
          clearInterval(batchWaitInterval);
        }
      }, 400);
    }
  }

  async commitBatch() {
    try {
      await this.makeCommitBatch();
    } catch (err) {
      console.log({ err });
      if (isFirestoreDeadlineError(err)) {
        const theInterval = setInterval(async () => {
          try {
            await this.makeCommitBatch();
            clearInterval(theInterval);
          } catch (err) {
            console.log({ err });
            if (!isFirestoreDeadlineError(err)) {
              clearInterval(theInterval);
              throw err;
            }
          }
        }, 4000);
      }
    }
  }

  async checkRestartBatchWriteCounts() {
    this.writeCounts += 1;
    if (this.writeCounts >= MAX_TRANSACTION_WRITES) {
      await this.commitBatch();
    }
  }

  async batchSet(docRef, docData) {
    if (!this.isCommitting) {
      this.batch.set(docRef, docData);
      await this.checkRestartBatchWriteCounts();
    } else {
      const batchWaitInterval = setInterval(async () => {
        if (!this.isCommitting) {
          this.batch.set(docRef, docData);
          await this.checkRestartBatchWriteCounts();
          clearInterval(batchWaitInterval);
        }
      }, 400);
    }
  }

  async batchUpdate(docRef, docData) {
    if (!this.isCommitting) {
      this.batch.update(docRef, docData);
      await this.checkRestartBatchWriteCounts();
    } else {
      const batchWaitInterval = setInterval(async () => {
        if (!this.isCommitting) {
          this.batch.update(docRef, docData);
          await this.checkRestartBatchWriteCounts();
          clearInterval(batchWaitInterval);
        }
      }, 400);
    }
  }

  async batchDelete(docRef) {
    if (!this.isCommitting) {
      this.batch.delete(docRef);
      await this.checkRestartBatchWriteCounts();
    } else {
      const batchWaitInterval = setInterval(async () => {
        if (!this.isCommitting) {
          this.batch.delete(docRef);
          await this.checkRestartBatchWriteCounts();
          clearInterval(batchWaitInterval);
        }
      }, 400);
    }
  }
}

1

前面的评论已经解释了问题。

我分享了我构建并为自己工作的最终代码,因为我需要一些以更分离的方式工作而不是上面提出的大多数解决方案的东西。

import { FireDb } from "@services/firebase"; // = firebase.firestore();

type TDocRef = FirebaseFirestore.DocumentReference;
type TDocData = FirebaseFirestore.DocumentData;

let fireBatches = [FireDb.batch()];
let batchSizes = [0];
let batchIdxToUse = 0;

export default class FirebaseUtil {
  static addBatchOperation(
    operation: "create",
    ref: TDocRef,
    data: TDocData
  ): void;
  static addBatchOperation(
    operation: "update",
    ref: TDocRef,
    data: TDocData,
    precondition?: FirebaseFirestore.Precondition
  ): void;
  static addBatchOperation(
    operation: "set",
    ref: TDocRef,
    data: TDocData,
    setOpts?: FirebaseFirestore.SetOptions
  ): void;
  static addBatchOperation(
    operation: "create" | "update" | "set",
    ref: TDocRef,
    data: TDocData,
    opts?: FirebaseFirestore.Precondition | FirebaseFirestore.SetOptions
  ): void {
    // Lines below make sure we stay below the limit of 500 writes per
    // batch
    if (batchSizes[batchIdxToUse] === 500) {
      fireBatches.push(FireDb.batch());
      batchSizes.push(0);
      batchIdxToUse++;
    }
    batchSizes[batchIdxToUse]++;

    const batchArgs: [TDocRef, TDocData] = [ref, data];
    if (opts) batchArgs.push(opts);

    switch (operation) {
      // Specific case for "set" is required because of some weird TS
      // glitch that doesn't allow me to use the arg "operation" to
      // call the function
      case "set":
        fireBatches[batchIdxToUse].set(...batchArgs);
        break;
      default:
        fireBatches[batchIdxToUse][operation](...batchArgs);
        break;
    }
  }

  public static async runBatchOperations() {
    // The lines below clear the globally available batches so we
    // don't run them twice if we call this function more than once
    const currentBatches = [...fireBatches];
    fireBatches = [FireDb.batch()];
    batchSizes = [0];
    batchIdxToUse = 0;

    await Promise.all(currentBatches.map((batch) => batch.commit()));
  }
}


1

没有引用或文档,这段代码是我自己发明的,对我来说它很干净、简单易读、易于使用。如果有人喜欢它,那么也可以使用它。

最好制作自动测试,因为代码使用了私有变量_ops,在软件包升级后可能会更改。例如,在旧版本中,它可能是_mutations

async function commitBatch(batch) {
  const MAX_OPERATIONS_PER_COMMIT = 500;

  while (batch._ops.length > MAX_OPERATIONS_PER_COMMIT) {
    const batchPart = admin.firestore().batch();

    batchPart._ops = batch._ops.splice(0, MAX_OPERATIONS_PER_COMMIT - 1);

    await batchPart.commit();
  }

  await batch.commit();
}

用法:

const batch = admin.firestore().batch();

batch.delete(someRef);
batch.update(someRef);

...

await commitBatch(batch);

您的回答可以通过提供更多支持信息来改进。请编辑并添加更多细节,例如引用或文档,以便他人可以确认您的答案是正确的。您可以在帮助中心找到有关如何编写良好答案的更多信息。 - Vimal Patel

1
自2023年3月起,Firestore不再限制可以传递给Commit操作或在事务中执行的写入数量(source)。

1
我喜欢这个实现:https://github.com/qualdesk/firestore-big-batch 这是一篇关于它的博客文章(不是我的):https://www.qualdesk.com/blog/2021/the-solution-to-firestore-batched-write-limit/ 它是Firestore批处理的替代品。不再需要这样做:
const batch = db.batch();

...做这个:

const batch = new BigBatch({ db });

这是我的版本,已更新为与最新的 firebase-admin 和 TypeScript 兼容的类型。我还添加了一个setGroup选项,确保一组操作是同一批次的一部分。
// Inspired by: https://github.com/qualdesk/firestore-big-batch

import type {
  DocumentReference,
  Firestore,
  SetOptions,
  WriteBatch,
} from 'firebase-admin/firestore';

const MAX_OPERATIONS_PER_FIRESTORE_BATCH = 499;

export class BigBatch {
  private db: Firestore;
  private currentBatch: WriteBatch;
  private batchArray: Array<WriteBatch>;
  private operationCounter: number;

  constructor({ db }: { db: Firestore }) {
    this.db = db;
    this.currentBatch = db.batch();
    this.batchArray = [this.currentBatch];
    this.operationCounter = 0;
  }

  private startNewBatch() {
    this.currentBatch = this.db.batch();
    this.batchArray.push(this.currentBatch);
    this.operationCounter = 0;
  }

  private checkLimit() {
    if (this.operationCounter < MAX_OPERATIONS_PER_FIRESTORE_BATCH)
      return;

    this.startNewBatch();
  }

  private ensureGroupOperation(operations: unknown[]) {
    if (operations.length > MAX_OPERATIONS_PER_FIRESTORE_BATCH)
      throw new Error(
        `Group can only accept ${MAX_OPERATIONS_PER_FIRESTORE_BATCH} operations.`,
      );

    if (
      this.operationCounter + operations.length >
      MAX_OPERATIONS_PER_FIRESTORE_BATCH
    )
      this.startNewBatch();
  }

  /**
   * Add a single set operation to the batch.
   */
  set(
    ref: DocumentReference,
    data: object,
    options: SetOptions = {},
  ) {
    this.currentBatch.set(ref, data, options);
    this.operationCounter++;
    this.checkLimit();
  }

  /**
   * Add a group of set operations to the batch. This method ensures that everything in a group will be included in the same batch.
   * @param group Array of objects with ref, data, and options
   */
  setGroup(
operations: {
  ref: DocumentReference;
  data: object;
  options?: SetOptions;
}[],
  ) {
    this.ensureGroupOperation(operations);
    operations.forEach(o =>
      this.currentBatch.set(o.ref, o.data, o.options ?? {}),
    );
    this.operationCounter += operations.length;
    this.checkLimit();
  }

  update(ref: DocumentReference, data: object) {
    this.currentBatch.update(ref, data);
    this.operationCounter++;
    this.checkLimit();
  }

  delete(ref: DocumentReference) {
    this.currentBatch.delete(ref);
    this.operationCounter++;
    this.checkLimit();
  }

  commit() {
    const promises = this.batchArray.map(batch => batch.commit());
    return Promise.all(promises);
  }
}

我喜欢这个!不过,我遇到了一个我不理解的类型错误。import { getFirestore } from "firebase-admin/firestore"; const fs = getFirestore(); const batch = new BigBatch({fs}); 类型错误是:"类型 '{ fs: FirebaseFirestore.Firestore; }' 的参数不能赋值给类型 '{ db: Firestore; }'"。 - undefined
啊,我明白了。在我的情况下,我需要执行const batch = new BigBatch({db: fs});。谢谢你提供的BigBatch类! - undefined

0

简单的解决方案 只需要触发两次? 我的数组名为“resultsFinal” 我先用限制为490的批处理一次,然后再用限制为数组长度(results.lenght)的批处理一次。 对我来说很有效 :) 你如何检查它? 你可以去Firebase删除你的集合,Firebase会告诉你已经删除了XXX个文档,与你的数组长度相同吗?那么你就可以放心使用了

async function quickstart(results) {
    // we get results in parameter for get the data inside quickstart function
    const resultsFinal = results;
    // console.log(resultsFinal.length);
    let batch = firestore.batch();
    // limit of firebase is 500 requests per transaction/batch/send 
    for (i = 0; i < 490; i++) {
        const doc = firestore.collection('testMore490').doc();
        const object = resultsFinal[i];
        batch.set(doc, object);
    }
    await batch.commit();
    // const batchTwo = firestore.batch();
    batch = firestore.batch();

    for (i = 491; i < 776; i++) {
        const objectPartTwo = resultsFinal[i];
        const doc = firestore.collection('testMore490').doc();
        batch.set(doc, objectPartTwo);
    }
    await batch.commit();

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接