为了账号安全,请及时绑定邮箱和手机立即绑定

如何在另一个达格气流中创建达格斯

如何在另一个达格气流中创建达格斯

牧羊人nacy 2022-09-13 15:10:19
我正在尝试拥有一个主匕首,它将根据我的需要创建更多的匕首。我在气流.cfg dags_folder中有以下python文件。此代码在数据库中创建主 dag。此主 dag 应读取文本文件,并应为文本文件中的每一行创建 dag。但是,在主 dag 中创建的 dag 不会添加到数据库中。创建它的正确方法是什么?版本详细信息:蟒蛇版本:3.7阿帕奇气流版本:1.10.8import datetime as dtfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.python_operator import PythonOperatorroot_dir = "/home/user/TestSpace/airflow_check/res"print("\n\n ===> \n Dag generator")default_args = {    'owner': 'airflow',    'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),    'concurrency': 1,    'retries': 0}def greet(_name):    message = "Greetings {} at UTC: {} Local: {}\n".format(_name, dt.datetime.utcnow(), dt.datetime.now())    f = open("{}/greetings.txt".format(root_dir), "a+")    print("\n\n =====> {}\n\n".format(message))    f.write(message)    f.close()def create_dag(dag_name):    with DAG(dag_name, default_args=default_args,             schedule_interval='*/2 * * * *',             catchup=False             ) as i_dag:        i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,                                     op_args=["{}_{}".format("greet", dag_name)])        i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')        i_opr_greet >> i_echo_op    return i_dagdef create_all_dags():    all_lines = []    f = open("{}/../dag_names.txt".format(root_dir), "r")    for x in f:        all_lines.append(str(x))    f.close()    for line in all_lines:        print("Dag creation for {}".format(line))        globals()[line] = create_dag(line)with DAG('master_dag', default_args=default_args,         schedule_interval='*/1 * * * *',         catchup=False         ) as dag:    echo_op = BashOperator(task_id='echo', bash_command='echo `date`')    create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)    echo_op >> create_op
查看完整描述

2 回答

?
慕少森

TA贡献2019条经验 获得超9个赞

您有 2 个选项:

  1. 使用子操作程序示例 DAG。如果您的计划间隔可以相同,请使用它。

  2. 编写蟒蛇 DAG 文件:从主 DAG 开始,在包含 DAG 的AIRFLOW_HOME中创建 Python 文件。为此,您可以使用 Jinja2 模板引擎。


查看完整回答
反对 回复 2022-09-13
  • 2 回答
  • 0 关注
  • 115 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号