

简单介绍一下各个工具的作用:
- asyncio(异步I/O库):作用: 提供异步编程基础设施,包括事件循环、协程、任务等。特点: 单线程并发,适用于 I/O 密集型任务
- FastAPI(Web框架):作用: 提供 REST API 构建功能,原生支持异步视图函数。特点: 自动 API 文档生成,类型提示支持
- Uvicorn(ASGI服务器):作用: 运行 FastAPI 应用,处理 HTTP 请求。特点: 异步、快速,生产环境常用
- ThreadPoolExecutor(线程池):作用: 管理线程池,执行阻塞任务。特点: 适用于 CPU 密集型或阻塞 I/O 任务
工作流程如下:
Uvicorn 启动并监听端口,接收 HTTP 请求
请求被传递给 FastAPI 应用进行路由处理
FastAPI 在 AsyncIO 事件循环中执行异步视图函数
当遇到阻塞操作时,使用 ThreadPoolExecutor 在线程中执行
结果通过回调机制返回给事件循环bash创建fastapi和线程池:
# 创建web
app=FastAPI()
# 创建线程池
threadpool=ThreadPoolExecutor(max_workers=200)bash处理对ver1的get请求:
# 第一个版本
@app.get('/ver1')
async def ver1(request: Request):
# 获取参数
msg=request.query_params.get('msg')
# 在主线程中获取async io event loop
loop=asyncio.get_event_loop()
# 准备计算任务
task={
'msg': msg,
'event': asyncio.Event(),
'result': None,
}
# 计算函数
def handle_task():
# 在工作线程中
print('task received:',task['msg'])
task['result']=task['msg'].lower()
time.sleep(2) # 模拟线程阻塞
def async_callback():
print('task ends notified:',task['result'],asyncio.get_event_loop())
# 这将在主线程中执行
task['event'].set()
# 安全地将回调调度到主线程的事件循环
loop.call_soon_threadsafe(async_callback)
# 提交并等待结果
threadpool.submit(handle_task)
await task['event'].wait()
return Response(task['result'])bash其中获取当前asyncio事件循环loop=asyncio.get_event_loop()的作用是允许工作线程安全地与主线程的事件循环交互,确保回调函数在正确的线程(事件循环所在的线程)执行。
工作流程:
- 当用户启动应用后,Uvicorn服务器开始监听localhost的8000端口。程序初始化时创建了一个包含200个工作线程的线程池,用于处理耗时的阻塞任务。当接收到HTTP GET请求访问/ver1或/ver2端点时,FastAPI在主线程(事件循环线程)中处理请求。
- 对于/ver1端点,程序首先从请求中提取参数,获取当前的asyncio事件循环引用,并创建一个包含消息内容、异步事件和结果占位符的任务对象。然后定义一个在工作线程中执行的handle_task函数,该函数会进行消息处理(如转换为小写)并模拟2秒的阻塞操作。通过threadpool.submit()将任务提交到线程池后,主线程执行await task[‘event’].wait()进入非阻塞等待状态,此时事件循环可以继续处理其他请求。工作线程执行完阻塞操作后,通过之前获取的事件循环引用,使用loop.call_soon_threadsafe()将回调函数安全地调度回主线程执行,在回调中触发事件解除主线程等待,最终返回处理结果。 tips:在代码中,task 信息从主线程传递到工作线程的方式是通过闭包机制实现的。当 handle_task 函数被定义时,它会捕获外部作用域中的 task 变量引用,形成闭包。当 threadpool.submit(handle_task) 执行时,传递的是函数对象及其闭包环境。
- 对于/ver2端点,采用了更简洁的方式,通过loop.run_in_executor()直接将任务提交到线程池并await其结果,底层同样实现了异步等待和线程间通信,但代码更加简洁。
# 第二个版本
@app.get('/ver2')
async def ver2(request: Request):
# 获取参数
msg=request.query_params.get('msg')
# 获取async io event loop
loop=asyncio.get_event_loop()
# 准备计算任务
task={
'msg': msg,
}
# 计算函数
def handle_task():
print('task received:',task['msg'])
result=task['msg'].lower()
time.sleep(2) # 模拟线程阻塞
return result
# 提交并等待结果
result=await loop.run_in_executor(threadpool,handle_task)
return Response(result)bash