为了账号安全,请及时绑定邮箱和手机立即绑定

带有a的气流Python运算符。返回类型

带有a的气流Python运算符。返回类型

蝴蝶刀刀 2021-11-09 16:31:36
我的 DAG 中有一个 python 运算符。python 可调用函数返回一个 bool 值。但是,当我运行 DAG 时,出现以下错误。类型错误:“bool”对象不可调用我修改了函数以不返回任何内容,但又一次出现以下错误错误 - 'NoneType' 对象不可调用下面是我的狗def check_poke(threshold,sleep_interval):flag=snowflake_poke(1000,10).poke()#print(flag)return flagdependency = PythonOperator(task_id='poke_check',#python_callable=check_poke(129600,600),provide_context=True,python_callable=check_poke(129600,600),dag=dag)end = BatchEndOperator(queue=QUEUE,dag=dag)start.set_downstream(dependency)dependency.set_downstream(end)无法弄清楚我错过了什么。有人可以帮我解决这个问题吗……对气流相当陌生。我在 dag 中编辑了 python 运算符,如下所示dependency = PythonOperator(task_id='poke_check',provide_context=True,python_callable=check_poke(129600,600),dag=dag)但是现在,我得到了一个不同的错误。Traceback (most recent call last):File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run    result = task_copy.execute(context=context)File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute    return_value = self.python_callable(*self.op_args, **self.op_kwargs)TypeError: () takes no arguments (25 given)[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)
查看完整描述

3 回答

?
慕标5832272

TA贡献1966条经验 获得超4个赞

参数名称给出了它。您正在传递调用的结果而不是可调用的。

python_callable=check_poke(129600,600)

第二个错误指出 callable 是用 25 个参数调用的。所以是lambda:行不通的。以下方法可行,但忽略 25 个参数确实值得怀疑。

python_callable=lambda *args, **kwargs: check_poke(129600,600)


查看完整回答
反对 回复 2021-11-09
?
达令说

TA贡献1821条经验 获得超6个赞

代码需要一个可调用的,而不是结果(正如已经指出的那样)。

您可以使用functools.Partial来填写参数:


from functools import partial


def check_poke(threshold,sleep_interval):

   flag=snowflake_poke(1000,10).poke()

   return flag


func = partial(check_poke, 129600, 600)

dependency = PythonOperator(

    task_id='poke_check',

    provide_context=True,

    python_callable=func,

    dag=dag)


查看完整回答
反对 回复 2021-11-09
?
白猪掌柜的

TA贡献1893条经验 获得超10个赞

同意@Dan D.的问题;但令人困惑的是为什么他的解决方案不起作用(它在python shell 中肯定有效)


看看这是否会给您带来任何运气(它只是@Dan D.解决方案的详细变体)


from typing import Callable


# your original check_poke function

def check_poke(arg_1: int, arg_2: int) -> bool:

    # do something

    # somehow returns a bool

    return arg_1 < arg_2


# a function that returns a callable, that in turn invokes check_poke

# with the supplied params

def check_poke_wrapper_creator(arg_1: int, arg_2: int) -> Callable[[], bool]:

    def check_poke_wrapper() -> bool:

        return check_poke(arg_1=arg_1, arg_2=arg_2)


    return check_poke_wrapper


..


# usage

python_callable=check_poke_wrapper_creator(129600, 600)


查看完整回答
反对 回复 2021-11-09
  • 3 回答
  • 0 关注
  • 149 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信