我对芹菜有新手的经验。我写过很多任务,既有预定的也有延迟的,但仅此而已。我遇到了一个问题,我想创建一个任务来启动 1000 个较小的作业,以减轻队列长度和可能需要数小时才能完成的作业可能出现的任何问题。当前应用程序依赖于来自外部 API 的信息。可以这么说,用户将他们的帐户与我集成的另一项服务相关联,我想每天根据他们的外部帐户的变化来更新用户的信息。我有这样的预定工作@app.task() def refresh_accounts(): for account in Account.objects.all(): response = retrieve_account_info(account_id=account.id) account.data = response.data account.save() --我想要的是这样的@app.task()def kickoff_refresh(): for account in Account.objects.all() refresh_account.delay(account_id=account.id)@app.task() def refresh_account(account_id=None): account = Account.objects.get(id=account_id) response = retrieve_account_info(account_id=account.id) account.data = response.data account.save()我想到的一种方法是kickoff_refresh在refresh_account不同的队列中。@app.task(queue=q1), @app.task(queue=q2)... 但是,我不知道是否有更好的方法。在同一队列的任务中调用任务在 celery 中似乎是不好的做法 - https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks任务kickoff_refresh将是每隔几个小时运行一次周期性任务。我很想听听什么对其他人有用。谢谢
1 回答
幕布斯7119047
TA贡献1794条经验 获得超8个赞
from celery import group
@app.task()
def kickoff_refresh(account_id=None):
job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
添加回答
举报
0/150
提交
取消