Python:asyncio样例
2026/1/29大约 2 分钟
两个异步例子。
闭包版
需要注意的是,run_batch_process的并发仍没有被限制
import asyncio
import time
from tqdm import tqdm as tqdm_sync # 标准同步版
from tqdm.asyncio import tqdm as tqdm_async # 异步适配版
# 外部资源锁:保护 LLM API 不被封(一共只允许 2 个并发)
llm_model_max_async = asyncio.Semaphore(2)
async def func_A(task_id):
"""底层函数:负责具体的业务逻辑"""
async with llm_model_max_async:
# 模拟 API 请求或耗时操作
await asyncio.sleep(1)
return f"结果-{task_id}"
async def run_batch_process():
"""主逻辑函数:内部耦合了限流逻辑"""
# 内部资源锁:保护本地内存不爆(一共只允许 5 个流程同时启动)
local_mem_sem = asyncio.Semaphore(5)
# 在内部定义 limited_A,直接利用闭包特性
async def limited_A(task_id):
async with local_mem_sem:
# 这里可以处理一些耗内存的操作,比如读取大文件
return await func_A(task_id)
# 1. 准备任务列表(此时任务并未开始运行)
tasks = []
for i in range(20):
tasks.append(limited_A(i))
print(f"开始执行 20 个并发任务...")
print(f"控制策略:本地流程最多 5 个,API 请求最多 2 个\n")
# 2. 一次性触发所有任务并观察进度
results = await tqdm_async.gather(*tasks)
print(f"\n任务全部执行完毕,示例结果: {results[:3]}...")
if __name__ == "__main__":
asyncio.run(run_batch_process())大数据量
如何限制“创建并发”?
如下为Gemini 3的方案,未加验证。
如果你希望像“流水线”一样,不仅限制同时跑的任务,还限制排队的任务数量,有以下两种方案:
方案 A:分批处理 (Chunking)
最简单粗暴,一次只丢给 gather 一小撮任务。
Python
async def run_batch_process(all_ids):
chunk_size = 100
for i in range(0, len(all_ids), chunk_size):
batch = all_ids[i : i + chunk_size]
tasks = [limited_A(tid) for tid in batch]
await tqdm.gather(*tasks) # 处理完这一批再处理下一批- 缺点:每一批任务中,最慢的那个会拖累整组,导致 CPU/网络在批次交替时有空闲。
方案 B:使用异步队列 asyncio.Queue (工业级方案)
这是最优雅的。创建一个固定数量的 Worker(工人),让他们从队列里领任务。
Python
async def run_batch_process(all_ids):
queue = asyncio.Queue()
# 1. 把所有任务 ID 扔进队列
for tid in all_ids:
await queue.put(tid)
async def worker():
"""工人:只要队列不空,就一直干活"""
while not queue.empty():
tid = await queue.get()
await limited_A(tid) # 这里依然可以用你的 limited_A
queue.task_done()
# 2. 启动固定数量的工人(比如 5 个工人,对应 5 个并发)
workers = [asyncio.create_task(worker()) for _ in range(5)]
# 3. 等待所有任务完成
await asyncio.gather(*workers)总结
之前的闭包版本在 任务量适中(< 1000) 时是完美的,因为代码简单。但如果你的 question_dir_list 是海量的,你的担心就完全正确了:gather(*task_list) 会瞬间把所有任务压进内存。
如果超过任务量五千,建议把代码改成“生产者-消费者(Queue)”模型
更新日志
2026/1/30 19:51
查看所有更新日志
72fda-于25785-于c0372-于2b937-于e61c2-于
