为了账号安全,请及时绑定邮箱和手机立即绑定

如何使用 Luigi 动态检查输出

如何使用 Luigi 动态检查输出

动漫人物 2021-11-09 16:33:10
我意识到我可能需要使用动态需求来完成以下任务,但是我一直无法理解这在实践中会是什么样子。目标是使用 Luigi 生成数据并将其添加到数据库中,而无需提前知道将生成什么数据。以使用 mongodb 为例:import luigifrom uuid import uuid4from luigi.contrib import mongodbimport pymongo# Make up IDs, though in practice the IDs may be generated from an APIclass MakeID(luigi.Task):    def run(self):        with self.output().open('w') as f:            f.write(','.join([str(uuid4()) for e in range(10)]))    # Write the data to file    def output(self):        return luigi.LocalTarget('data.csv')class ToDataBase(luigi.Task):    def requires(self):        return MakeID()    def run(self):        with self.input().open('r') as f:            ids = f.read().split(',')        # Add some fake data to simulate generating new data         count_data = {key: value for value, key in enumerate(ids)}        # Add data to the database        self.output().write(count_data)    def output(self):        # Attempt to read non-existent file to get the IDs to check if task is complete        with self.input().open('r') as f:            valid_ids = f.read().split(',')        client = pymongo.MongoClient('localhost',                                     27017,                                     ssl=False)        return mongodb.MongoRangeTarget(client,                                        'myDB',                                        'myData',                                        valid_ids,                                        'myField')if __name__ == '__main__':    luigi.run()目标是获取数据,对其进行修改,然后将其添加到数据库中。上面的代码在运行时失败,因为output方法在方法ToDataBase之前运行,require所以虽然函数可以访问输入,但输入尚不存在。无论如何,我仍然需要检查以确保数据已添加到数据库中。这个github 问题与我正在寻找的很接近,尽管正如我所提到的,我在实践中无法弄清楚这个用例的动态需求。
查看完整描述

1 回答

?
幕布斯7119047

TA贡献1794条经验 获得超8个赞

解决方案是创建第三个任务(在示例中Dynamic),它产生等待动态输入的任务,并使依赖项成为参数而不是requires方法。


class ToDatabase(luigi.Task):

    fp = luigi.Parameter()


    def output(self):

        with open(self.fp, 'r') as f:

            valid_ids = [str(e) for e in f.read().split(',')]

        client = pymongo.MongoClient('localhost', 27017, ssl=False)

        return mongodb.MongoRangeTarget(client, 'myDB', 'myData',

                                        valid_ids, 'myField')


    def run(self):

        with open(self.fp, 'r') as f:

            valid_ids = [str(e) for e in f.read().split(',')]

        self.output().write({k: 5 for k in valid_ids})



class Dynamic(luigi.Task):

    def output(self):

        return self.input()


    def requires(self):

        return MakeIDs()


    def run(self):

        yield(AddToDatabase(fp=self.input().path))


查看完整回答
反对 回复 2021-11-09
  • 1 回答
  • 0 关注
  • 149 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信