为了账号安全,请及时绑定邮箱和手机立即绑定

带有 flask 或 aiohttp 服务器的 asyncio 应用程序

带有 flask 或 aiohttp 服务器的 asyncio 应用程序

ITMISS 2022-12-20 11:00:31
我改进了去年的应用程序。一开始有两个不同的 python 应用程序——第一个用于统计数据,第二个——带有 GET 请求的网络服务器 gunicorn+flask。(这两种服务都在 centos 中)Statistics 进行计数并将所有内容存储在 Postgres 中。Web 服务器连接到该 Postgres 数据库并响应 GET 请求。在重写的版本中,我使用 pandas 框架进行了所有统计,现在我想将这两个应用程序合并为一个。我使用 asyncio 来获取数据和计数统计。一切正常,现在我要添加 Web 服务器以响应 GET。部分代码:import asynciofrom contextlib import closingimport db_cl, tks_dbfrom formdf_cl import FormatDFgetinfofromtks = tks_db.TKS() # Class object to connect to third party databaseformatdf = FormatDF() # counting class object, that stores some datadbb = db_cl.MyDatabase('mydb.ini') # Class object to connect to my databaseasync def get_some_data():    # getting information from third party database every 5 seconds.    await asyncio.sleep(5)    ans_inc, ans_out = getinfofromtks.getdf()    return ans_inc, ans_out # two huge dataframes in pandasasync def process(ans_inc, ans_out):    # counting data on CPU    await asyncio.sleep(0)    formatdf.leftjoin(ans_inc, ans_out)    # storing statistics in my Database    dbb.query_database('INSERT INTO statistic (timestamp, outgoing, incoming, stats) values (%s, %s,%s, %s)',                       formatdf.make_count())    dbb.commit_query()async def main():    while True:        ans_inc, ans_out = await get_some_data()  # blocking, get data from third party database        asyncio.ensure_future(process(ans_inc, ans_out))  # computingif __name__ == "__main__":    with closing(asyncio.get_event_loop()) as event_loop:        event_loop.run_until_complete(main())现在我希望将 http 服务器添加为线程应用程序(使用 flask 或 aiohttp),它将使用类对象“formatdf”中的参数响应 GET 请求。包含这些功能的最佳方式是什么?
查看完整描述

2 回答

?
胡说叔叔

TA贡献1804条经验 获得超8个赞

我设法添加了一个 http 服务器作为协程。首先我尝试使用 aiohttp,但最终我找到了 Quart(与 Flask 相同,但它使用 Asyncio)。在 Quart 上运行 http 服务器的示例代码:


import quart

from quart import request

import json

import time


app = quart.Quart(__name__)


def resp(code, data):

    return quart.Response(

        status=code,

        mimetype="application/json",

        response=to_json(data)

    )


def to_json(data):

    return json.dumps(data) + "\n"


@app.route('/')

def root():

    return quart.redirect('/api/status2')



@app.errorhandler(400)

def page_not_found(e):

    return resp(400, {})



@app.errorhandler(404)

def page_not_found(e):

    return resp(400, {})



@app.errorhandler(405)

def page_not_found(e):

    return resp(405, {})



@app.route('/api/status2', methods=['GET'])

def get_status():

    timestamp = request.args.get("timestamp")

    delay = request.args.get("delay")

    if timestamp:

        return resp(200, {"time": time.time()})

    elif delay:

        return resp(200, {"test is:": '1'})

    else:

        return resp(200, {"", "ask me about time"})



if __name__ == "__main__":

    app.run(debug=True, host='0.0.0.0', port=5000)

为了将此代码添加为协同程序,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。像这样更改了我的问题中的代码:


async def launcher_main():

    while True:

        ans_inc, ans_out = await get_some_data()

        asyncio.ensure_future(process(ans_inc, ans_out))



async def main():

    await asyncio.gather(launcher_main(),

                         restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))

剩下的最后一个问题是使“formatdf”类对象的可用参数到我的 http 服务器。我已经实现了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函数添加行。从 quart 调用它:


elif button:

    return resp(200, {"ok": app.config["formatdf"].attr})


查看完整回答
反对 回复 2022-12-20
?
精慕HU

TA贡献1845条经验 获得超8个赞

我只需要为我的应用程序解决这个问题。这是在现有异步应用程序中运行 aiohttp 服务器的方法。


https://docs.aiohttp.org/en/stable/web_lowlevel.html


import asyncio

from aiohttp import web


async def web_server():

    print(f'Configuring Web Server')

    app = web.Application()

    app.add_routes([web.get('/hello', web_hello)])

    runner = web.AppRunner(app)

    await runner.setup()

    site = web.TCPSite(runner)    

    print(f'Starting Web Server')

    await site.start()

    print(f'Web Server Started')


    #run forever and ever

    await asyncio.sleep(100*3600)


async def web_hello(request):

    return web.Response(text="Hello World")


async def main():

    tasks = []


    #Web Server

    tasks.append(asyncio.create_task(web_server()))


    results = await asyncio.gather(*tasks)


if __name__ == '__main__':

    asyncio.run(main())


查看完整回答
反对 回复 2022-12-20
  • 2 回答
  • 0 关注
  • 219 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号