为了账号安全,请及时绑定邮箱和手机立即绑定

Celery 一项任务启动许多小任务的最佳实践

Celery 一项任务启动许多小任务的最佳实践

收到一只叮咚 2022-11-24 14:52:55

我对芹菜有新手的经验。我写过很多任务,既有预定的也有延迟的,但仅此而已。


我遇到了一个问题,我想创建一个任务来启动 1000 个较小的作业,以减轻队列长度和可能需要数小时才能完成的作业可能出现的任何问题。


当前应用程序依赖于来自外部 API 的信息。可以这么说,用户将他们的帐户与我集成的另一项服务相关联,我想每天根据他们的外部帐户的变化来更新用户的信息。


我有这样的预定工作


@app.task() 

def refresh_accounts():

    for account in Account.objects.all():

        response = retrieve_account_info(account_id=account.id)

        account.data = response.data 

        account.save() 

--


我想要的是这样的


@app.task()

def kickoff_refresh():

    for account in Account.objects.all()

        refresh_account.delay(account_id=account.id)


@app.task() 

def refresh_account(account_id=None):

    account = Account.objects.get(id=account_id)

    response = retrieve_account_info(account_id=account.id)

    account.data = response.data 

    account.save()

我想到的一种方法是kickoff_refresh在refresh_account不同的队列中。@app.task(queue=q1), @app.task(queue=q2)... 但是,我不知道是否有更好的方法。在同一队列的任务中调用任务在 celery 中似乎是不好的做法 - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks任务kickoff_refresh将是每隔几个小时运行一次周期性任务。


我很想听听什么对其他人有用。谢谢


查看完整描述

1 回答

?
幕布斯7119047

TA贡献1492条经验 获得超8个赞

from celery import group



@app.task()

def kickoff_refresh(account_id=None):

    job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()


@app.task()

def refresh_account(account_id=None):

    account = Account.objects.get(id=account_id)

    response = retrieve_account_info(account_id=account.id)

    account.data = response.data 

    account.save()


查看完整回答
反对 回复 6天前

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信