feat: add more granular logging for event loop (#8318)
This commit is contained in:
@@ -1444,13 +1444,27 @@ async def bounded_gather(coros: list[Coroutine], max_concurrency: int = 10) -> l
|
||||
|
||||
semaphore = asyncio.Semaphore(max_concurrency)
|
||||
|
||||
async def bounded_coro(index: int, coro: Coroutine):
|
||||
async def bounded_coro(index: int, coro: Coroutine, coro_name: str):
|
||||
async with semaphore:
|
||||
# Set task name for diagnostics
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"bounded[{coro_name}]")
|
||||
result = await coro
|
||||
return (index, result)
|
||||
|
||||
# Wrap all coroutines with semaphore control
|
||||
tasks = [bounded_coro(i, coro) for i, coro in enumerate(coros)]
|
||||
# Wrap all coroutines with semaphore control, extracting location for diagnostics
|
||||
tasks = []
|
||||
for i, coro in enumerate(coros):
|
||||
coro_code = getattr(coro, "cr_code", None)
|
||||
if coro_code:
|
||||
filename = coro_code.co_filename
|
||||
idx = filename.find("letta/")
|
||||
filename = filename[idx + 6 :] if idx != -1 else filename.split("/")[-1]
|
||||
coro_name = f"{filename}:{coro_code.co_firstlineno}:{coro_code.co_name}"
|
||||
else:
|
||||
coro_name = "unknown"
|
||||
tasks.append(bounded_coro(i, coro, coro_name))
|
||||
indexed_results = await asyncio.gather(*tasks)
|
||||
|
||||
# Sort by original index to preserve order
|
||||
|
||||
Reference in New Issue
Block a user