我正在尝试使用NiFi在两个具有相似表结构的数据库之间传输数据。以下是数据结构示例:
User: {varchar name, integer id}.
没有“最大值列”,因此无法确定是否有新数据。因此,每次我都会创建完整表内容的“快照”。问题在于,在目标数据库中不清楚应该插入还是更新特定记录。
我创建了两个处理器分支:插入和更新。只有对于新记录才使用插入,而只有对于现有记录才使用更新。但是(!) PutSQL处理器与一堆流文件一起使用。例如,批处理大小为100,处理器每天工作一次。假设昨天有98条记录。它们将被插入。今天有200条记录(昨天的98条和102条新记录)。在此流程中,如果NiFi尝试更新前100条记录并将其插入,则这两个动作都会失败:前98条记录应该被更新,而后2条应该被插入。
如何解决这个问题?我知道可以使用批处理大小为1,但速度太慢。