为了账号安全,请及时绑定邮箱和手机立即绑定

在 Airflow 中的组件之间传输数据

在 Airflow 中的组件之间传输数据

潇湘沐 2021-12-29 20:19:25
我对 Airflow 很陌生,并且已经阅读了大部分文档。从文档中,我了解到 DAG 中组件之间的小数据可以使用 XCom 类共享。DAG 中发布数据的组件必须推送,订阅数据的组件必须拉取。但是,我对推和拉的语法部分不是很清楚。我指的是关于文档的XCom 部分并开发了一个代码模板。假设我有以下代码,它只有两个组件,一个 pusher 和一个 puller。pusher 发布 puller 必须消耗的当前时间并写入日志文件。from datetime import datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorlog_file_location = '/usr/local/airflow/logs/time_log.log'default_args = {'owner':'apache'}dag = DAG('pushpull', default_args = default_args)def push_function():    #push this data on the DAG as key-value pair    return(datetime.now()) #current timedef pull_function():    with open(log_file_location, 'a') as logfile:        current_time = '' #pull data from the pusher as key - value pair        logfile.writelines('current time = '+current_time)    logfile.close()with dag:    t1 = PythonOperator(        task_id = 'pusher',         python_callable = push_function)    t2 = PythonOperator(        task_id = 'puller',         python_callable = pull_function)    t2.set_upstream(t1)我需要 Airflow 大师在两种语法上的帮助:如何从推送功能连同键推送数据如何获得 pull 函数使用 key 拉取数据。
查看完整描述

1 回答

?
天涯尽头无女友

TA贡献1831条经验 获得超9个赞

使用密钥推送到 Xcom 的示例:


def push_function(**context):

    msg='the_message'

    print("message to push: '%s'" % msg)

    task_instance = context['task_instance']

    task_instance.xcom_push(key="the_message", value=msg)

使用密钥拉到 Xcom 的示例:


def pull_function(**kwargs):

    ti = kwargs['ti']

    msg = ti.xcom_pull(task_ids='push_task',key='the_message')

    print("received message: '%s'" % msg)

示例 DAY:


from datetime import datetime, timedelta

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator


DAG = DAG(

  dag_id='simple_xcom',

  start_date=datetime(2017, 10, 26),

  schedule_interval=timedelta(1)

)


def push_function(**context):

    msg='the_message'

    print("message to push: '%s'" % msg)

    task_instance = context['task_instance']

    task_instance.xcom_push(key="the_message", value=msg)


push_task = PythonOperator(

    task_id='push_task', 

    python_callable=push_function,

    provide_context=True,

    dag=DAG)


def pull_function(**kwargs):

    ti = kwargs['ti']

    msg = ti.xcom_pull(task_ids='push_task',key='the_message')

    print("received message: '%s'" % msg)


pull_task = PythonOperator(

    task_id='pull_task', 

    python_callable=pull_function,

    provide_context=True,

    dag=DAG)


push_task >> pull_task


查看完整回答
反对 回复 2021-12-29
  • 1 回答
  • 0 关注
  • 411 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号