如何使用Apache Beam管理背压

4

我有一个非常基本的 Apache Beam 管道,它在 GCP Dataflow 上运行,从 PubSub 读取一些数据,进行转换,并将其写入到 Postgres DB 中。所有这些都是使用 Apache Beam 的标准读取器/写入器组件完成的。问题是,当我的管道开始接收大量数据时,我的 Postgres DB 会因等待 ShareLock 而出现死锁错误。

很明显,这种情况是由于 Postgres DB 溢出引起的。我的管道试图一次写入太多东西,写入速度过快,因此为了避免这种情况,它只需要放慢速度。因此,我们可以使用类似于背压的机制。我尝试查找有关 Apache Beam 背压配置的任何信息,但不幸的是,官方文档似乎对此类问题保持沉默。

我被以下类型的异常所压倒:

java.sql.BatchUpdateException: Batch entry <NUMBER>
<MY_STATEMENT>
 was aborted: ERROR: deadlock detected
  Detail: Process 87768 waits for ShareLock on transaction 1939992; blocked by process 87769.
Process 87769 waits for ShareLock on transaction 1939997; blocked by process 87768.
  Hint: See server log for query details.
  Where: while inserting index tuple (5997152,9) in relation "<MY_TABLE>"  Call getNextException to see other errors in the batch.

我想知道是否有任何反压力工具包或类似的东西,可以帮助我管理问题,而不必编写自己的PostgresIO.Writer

非常感谢。


Beam没有任何库来支持在这种情况下的背压。但是,您可以使用FinishBundle注释将请求批处理为单个操作,以减轻对Postgress的负载。 - Ankur
1个回答

2
假设您使用 JdbcIO 将数据写入Postgres,您可以尝试增加批处理大小(参见withBatchSize(long batchSize)),默认值为1K条记录,这可能不够。此外,在SQL异常情况下,如果您想要进行重试,则需要确保使用适当的重试策略(请参见withRetryStrategy(RetryStrategy retryStrategy))。在这种情况下,将应用FluentBackoff

2
这真的应该被接受作为答案吗?这只是推迟了问题,而没有解决它。 - Matt Welke
1
在我看来,在Apache Beam中没有背压概念的情况下,这个答案已经很好了。 - Iraj Hedayati
没错,Iraj。否则,答案就很明显了——“在Beam中实现反压”,这是一个相当复杂的任务——但它并不能解决最初的问题。 - Alexey Romanenko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接