为了账号安全,请及时绑定邮箱和手机立即绑定

气流 - 创建 dag 和任务动态地为一个对象创建管道

气流 - 创建 dag 和任务动态地为一个对象创建管道

慕码人2483693 2024-01-15 17:09:42
在气流中,我想将一些表从 pg 导出到 BQ。task1: get the max id from BQtask2: export the data from PG (id>maxid)task3: GCS to BQ stagetask4: BQ stage to BQ main但有一个小挑战,日程间隔不同。所以我创建了一个 JSON 文件来告诉同步间隔。因此,如果是 2 分钟,那么它将使用 DAG upsert_2mins,否则将使用 10 分钟间隔 ( upsert_10mins) 。我使用这个语法来动态生成它。JSON 配置文件:{    "tbl1": ["update_timestamp", "2mins", "stg"],    "tbl2": ["update_timestamp", "2mins", "stg"]}它实际上创建了 dag,但问题是来自 Web UI,我能够看到最后一个表的任务。但它必须显示 2 个表的任务。
查看完整描述

1 回答

?
慕森卡

TA贡献1806条经验 获得超8个赞

您的代码正在创建 2 个 dags,每个表一个,但用第二个覆盖第一个。


我的建议是将 JSON 文件的格式更改为:


{

    "2mins": [

                "tbl1": ["update_timestamp", "stg"],

                "tbl2": ["update_timestamp", "stg"]

             ],

    "10mins": [

                "tbl3": ["update_timestamp", "stg"],

                "tbl4": ["update_timestamp", "stg"]

             ]

}

让您的代码迭代计划并为每个表创建所需的任务(您将需要两个循环):


# looping on the schedules to create two dags

for schedule, tables in config.items():


cron_time = '*/10 * * * *'


if schedule== '2mins':

    cron_time = '*/20 * * * *'


dag_id = 'upsert_every_{}'.format(schedule)


dag = DAG(

    dag_id ,

    default_args=default_args,

    description='Incremental load - Every 10mins',

    schedule_interval=cron_time,

    catchup=False,

    max_active_runs=1,

    doc_md = docs

)


# Looping over the tables to create the tasks for 

# each table in the current schedule

for table_name, table_config in tables.items():

    max_ts = PythonOperator(

        task_id="get_maxts_{}".format(table_name),

        python_callable=get_max_ts,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )


    export_gcs = PythonOperator(

        task_id='export_gcs_{}'.format(table_name),

        python_callable=pgexport,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )


    stg_load = PythonOperator(

        task_id='stg_load_{}'.format(table_name),

        python_callable=stg_bqimport,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )    


    merge = PythonOperator(

        task_id='merge_{}'.format(table_name),

        python_callable=prd_merge,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )

    

    # Tasks for the same table will be chained

    max_ts >> export_gcs >> stg_load >> merge


# DAG is created among the global objects

globals()[dag_id] = dag


查看完整回答
反对 回复 2024-01-15
  • 1 回答
  • 0 关注
  • 28 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信