自动重新连接Kafka-Connect中失败的任务

6
我在使用Kafka-connect的mongo-source插件。 我检查了源任务的状态,它正在运行并监听一个Mongo集合。
我手动停止了mongod服务,并等待了约1分钟,然后重新启动了它。
我检查了源任务,希望它自己解决问题,但经过30分钟,仍然没有任何进展。
只有在重新启动连接器之后,它才重新开始工作。
由于mongo-source没有设置超时重试和退避的选项,所以我搜索了一种适用于简单场景的配置:使用Kafka-connect配置在X时间后重新启动失败的任务。但是我找不到任何相关配置... :/ 我可以使用一个简单的脚本来处理,但我想知道Kafka-connect是否有管理失败任务的功能,或者在mongo-source中是否有这样的功能... 我不希望在仅经过一分钟后就失败... :/
1个回答

18

除了使用REST API查找失败任务并提交重新启动请求 - 然后定期运行此操作,没有其他方式。例如:

curl -s "http://localhost:8083/connectors?expand=status" | \
  jq -c -M 'map({name: .status.name } +  {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})}  | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
  xargs -I{connector_and_task} curl -v -X POST "http://localhost:8083"\{connector_and_task\}

来源:https://rmoff.net/2019/06/06/automatically-restarting-failed-kafka-connect-tasks/


2
很好。应该是一个内置功能。 - Stephen
这是一个惊人的一行代码,@Robin。 - Alex Woolford

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接