Luigi工作流中的MySQL目标

4

我的TaskB需要依赖TaskA,当TaskA完成后,它会将结果写入MySQL表中,然后TaskB需要将这个输出作为它的输入。

我无法在Luigi中找到如何实现这个功能的方法。有人能给我提供一个示例或在此处给我一个快速示例吗?

1个回答

11
现有的luigi中的MySqlTarget使用一个单独的标记表来指示任务何时完成。这是我会采用的大致方法...但是你的问题非常抽象,所以实际情况可能更加复杂。
import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget


class TaskA(luigi.Task):
    rundate = luigi.DateParameter(default=datetime.now().date())
    target_table = "table_to_update"
    host = "localhost:3306"
    db = "db_to_use"
    user = "user_to_use"
    pw = "pw_to_use"

    def get_target(self):
        return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
                           update_id=str(self.rundate))

    def requires(self):
        return []

    def output(self):
        return self.get_target()

    def run(self):
        #update table
        self.get_target().touch()


class TaskB(luigi.Task):
    def requires(self):
        return [TaskA()]

    def run(self):
        # reading from target_table

谢谢你的帮助,Matt。这真的很有用。我有一个问题?这是否意味着 MySqlTarget 使用 update_id 跟踪正在更新的行,update_id 是行的主要 ID。如果我的主要 ID 是自动递增的,那我该怎么办? - Rijo Simon
哦,这很棘手。我认为你需要使用另一个唯一值来替代自增ID作为update_id。它实际上运行的是"""INSERT INTO {marker_table} (update_id, target_table) VALUES (%s, %s) ON DUPLICATE KEY UPDATE update_id = VALUES(update_id) """.format(marker_table=self.marker_table), (self.update_id, self.table) - MattMcKnight
所以解决方案似乎是有一个更新表来记录工作流程的更新吗?但我不想为每个任务维护一个表(而且我有很多任务)。所以从您对正在发生的情况的SQL解释中,看起来我应该这样做:return MySqlTarget(host = self.host,database = self.db,user = self.user,password = self.pw,table = self.target_table,update_id = str(self.rundate),update_task_type = TASK_TYPE_A) - Rijo Simon
是的,这就是MySQLTarget的默认行为,它会创建"marker_table"。该表中的一列是"target_table",可以推测这是您的作业更新的表。 - MattMcKnight

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接