我的TaskB需要依赖TaskA,当TaskA完成后,它会将结果写入MySQL表中,然后TaskB需要将这个输出作为它的输入。
我无法在Luigi中找到如何实现这个功能的方法。有人能给我提供一个示例或在此处给我一个快速示例吗?
import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget
class TaskA(luigi.Task):
rundate = luigi.DateParameter(default=datetime.now().date())
target_table = "table_to_update"
host = "localhost:3306"
db = "db_to_use"
user = "user_to_use"
pw = "pw_to_use"
def get_target(self):
return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
update_id=str(self.rundate))
def requires(self):
return []
def output(self):
return self.get_target()
def run(self):
#update table
self.get_target().touch()
class TaskB(luigi.Task):
def requires(self):
return [TaskA()]
def run(self):
# reading from target_table
"""INSERT INTO {marker_table} (update_id, target_table) VALUES (%s, %s) ON DUPLICATE KEY UPDATE update_id = VALUES(update_id) """.format(marker_table=self.marker_table), (self.update_id, self.table)
- MattMcKnightreturn MySqlTarget(host = self.host,database = self.db,user = self.user,password = self.pw,table = self.target_table,update_id = str(self.rundate),update_task_type = TASK_TYPE_A)
。 - Rijo Simon