为了账号安全,请及时绑定邮箱和手机立即绑定

报告长期运行的Celery任务的结果

报告长期运行的Celery任务的结果

紫衣仙女 2019-11-02 13:31:10
问题我已经将一个长期运行的任务划分为多个逻辑子任务,因此我可以在每个子任务完成时报告结果。但是,我正在尝试报告将永远无法完成的任务的结果(而不是不断产生价值),并且正在使用现有的解决方案来做到这一点。背景我正在为我编写的某些Python程序构建Web界面。用户可以通过Web表单提交作业,然后返回查看该作业的进度。假设我有两个函数,每个函数可以通过单独的形式进行访问:med_func:执行大约需要1分钟,结果传递给render(),这会产生其他数据。long_func:返回一个生成器。每次yield大约需要30分钟,应向用户报告。产量很多,我们可以认为此迭代器是无限的(仅在被撤销时终止)。代码,当前实现使用med_func,我报告结果如下:在表单提交时,我将保存AsyncResult到Django会话:    task_result = med_func.apply_async([form], link=render.s())    request.session["task_result"] = task_result结果页面的Django视图访问this AsyncResult。任务完成后,结果将保存到作为上下文传递给Django模板的对象中。def results(request):    """ Serve (possibly incomplete) results of a session's latest run. """    session = request.session    try:  # Load most recent task        task_result = session["task_result"]    except KeyError:  # Already cleared, or doesn't exist        if "results" not in session:            session["status"] = "No job submitted"    else:  # Extract data from Asynchronous Tasks        session["status"] = task_result.status        if task_result.ready():            session["results"] = task_result.get()            render_task = task_result.children[0]            # Decorate with rendering results            session["render_status"] = render_task.status            if render_task.ready():                session["results"].render_output = render_task.get()                del(request.session["task_result"])  # Don't need any more    return render_to_response('results.html', request.session)仅当函数实际终止时,此解决方案才有效。我无法将的逻辑子任务链接在一起long_func,因为存在未知数量的yields(每个循环long_func的循环可能不会产生结果)。题有什么明智的方法可以从极其长时间运行的Celery任务访问生成的对象,以便可以在生成器用尽之前对其进行显示?
查看完整描述

3 回答

?
临摹微笑

TA贡献1982条经验 获得超2个赞

芹菜部分:


def long_func(*args, **kwargs):

    i = 0

    while True:

        yield i

        do_something_here(*args, **kwargs)

        i += 1



@task()

def test_yield_task(task_id=None, **kwargs):

    the_progress = 0        

    for the_progress in long_func(**kwargs):

        cache.set('celery-task-%s' % task_id, the_progress)

Web客户端,开始任务:


r = test_yield_task.apply_async()

request.session['task_id'] = r.task_id

测试最后的屈服值:


   v = cache.get('celery-task-%s' % session.get('task_id'))

   if v:

        do_someting()

如果您不喜欢使用缓存或无法使用缓存,则可以使用db,文件或celery worker和服务器端都可以同时访问的任何其他位置。使用缓存是最简单的解决方案,但是工作人员和服务器必须使用相同的缓存。


查看完整回答
反对 回复 2019-11-02
  • 3 回答
  • 0 关注
  • 1390 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信