为了账号安全,请及时绑定邮箱和手机立即绑定

从气流中的 BigQueryOperator 获取结果

从气流中的 BigQueryOperator 获取结果

德玛西亚99 2021-09-02 19:21:30
我试图从BigQueryOperator使用气流中获取结果,但我找不到办法做到这一点。我尝试调用成员中的next()方法bq_cursor(在 1.10 中可用),但它返回None. 这就是我尝试这样做的方式import datetimeimport loggingfrom airflow import modelsfrom airflow.contrib.operators import bigquery_operatorfrom airflow.operators import python_operatoryesterday = datetime.datetime.combine(    datetime.datetime.today() - datetime.timedelta(1),    datetime.datetime.min.time())def MyChequer(**kwargs):    big_query_count = bigquery_operator.BigQueryOperator(        task_id='my_bq_query',        sql='select count(*) from mydataset.mytable'    )    big_query_count.execute(context=kwargs)    logging.info(big_query_count)    logging.info(big_query_count.__dict__)    logging.info(big_query_count.bq_cursor.next())default_dag_args = {    'start_date': yesterday,    'email_on_failure': False,    'email_on_retry': False,    'project_id': 'myproject'}with models.DAG(        'bigquery_results_execution',        # Continue to run DAG once per day        schedule_interval=datetime.timedelta(days=1),        default_args=default_dag_args) as dag:    myoperator = python_operator.PythonOperator(        task_id='threshold_operator',        provide_context=True,        python_callable=MyChequer    )    # Define DAG    myoperator查看bigquery_hook.py和bigquery_operator.py似乎是获取结果的唯一可用方法。
查看完整描述

3 回答

?
梦里花落0921

TA贡献1772条经验 获得超5个赞

每当我需要从 BigQuery 查询中获取数据并将其用于某事时,我都会使用 BigQuery 钩子创建自己的运算符。我通常将其称为 BigQueryToXOperator,我们有很多用于将 BigQuery 数据发送到其他内部系统的运算符。


例如,我有一个 BigQueryToPubSub 运算符,您可能会发现它可以用作示例,说明如何查询 BigQuery,然后逐行处理结果,并将它们发送到 Google PubSub。考虑以下通用示例代码,了解如何自行执行此操作:


class BigQueryToXOperator(BaseOperator):

    template_fields = ['sql']

    ui_color = '#000000'


    @apply_defaults

    def __init__(

            self,

            sql,

            keys,

            bigquery_conn_id='bigquery_default',

            delegate_to=None,

            *args,

            **kwargs):

        super(BigQueryToXOperator, self).__init__(*args, **kwargs)

        self.sql = sql

        self.keys = keys # A list of keys for the columns in the result set of sql

        self.bigquery_conn_id = bigquery_conn_id

        self.delegate_to = delegate_to



    def execute(self, context):

        """

        Run query and handle results row by row.

        """

        cursor = self._query_bigquery()

        for row in cursor.fetchall():

            # Zip keys and row together because the cursor returns a list of list (not list of dicts)

            row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')


            # Do what you want with the row...

            handle_row(row_dict)



    def _query_bigquery(self):

        """

        Queries BigQuery and returns a cursor to the results.

        """

        bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,

                          use_legacy_sql=False)

        conn = bq.get_conn()

        cursor = conn.cursor()

        cursor.execute(self.sql)

        return cursor



查看完整回答
反对 回复 2021-09-02
  • 3 回答
  • 0 关注
  • 144 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信