2 回答
TA贡献1804条经验 获得超8个赞
我设法添加了一个 http 服务器作为协程。首先我尝试使用 aiohttp,但最终我找到了 Quart(与 Flask 相同,但它使用 Asyncio)。在 Quart 上运行 http 服务器的示例代码:
import quart
from quart import request
import json
import time
app = quart.Quart(__name__)
def resp(code, data):
return quart.Response(
status=code,
mimetype="application/json",
response=to_json(data)
)
def to_json(data):
return json.dumps(data) + "\n"
@app.route('/')
def root():
return quart.redirect('/api/status2')
@app.errorhandler(400)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(404)
def page_not_found(e):
return resp(400, {})
@app.errorhandler(405)
def page_not_found(e):
return resp(405, {})
@app.route('/api/status2', methods=['GET'])
def get_status():
timestamp = request.args.get("timestamp")
delay = request.args.get("delay")
if timestamp:
return resp(200, {"time": time.time()})
elif delay:
return resp(200, {"test is:": '1'})
else:
return resp(200, {"", "ask me about time"})
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=5000)
为了将此代码添加为协同程序,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。像这样更改了我的问题中的代码:
async def launcher_main():
while True:
ans_inc, ans_out = await get_some_data()
asyncio.ensure_future(process(ans_inc, ans_out))
async def main():
await asyncio.gather(launcher_main(),
restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))
剩下的最后一个问题是使“formatdf”类对象的可用参数到我的 http 服务器。我已经实现了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函数添加行。从 quart 调用它:
elif button:
return resp(200, {"ok": app.config["formatdf"].attr})
TA贡献1845条经验 获得超8个赞
我只需要为我的应用程序解决这个问题。这是在现有异步应用程序中运行 aiohttp 服务器的方法。
https://docs.aiohttp.org/en/stable/web_lowlevel.html
import asyncio
from aiohttp import web
async def web_server():
print(f'Configuring Web Server')
app = web.Application()
app.add_routes([web.get('/hello', web_hello)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner)
print(f'Starting Web Server')
await site.start()
print(f'Web Server Started')
#run forever and ever
await asyncio.sleep(100*3600)
async def web_hello(request):
return web.Response(text="Hello World")
async def main():
tasks = []
#Web Server
tasks.append(asyncio.create_task(web_server()))
results = await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
添加回答
举报
