From 2b0ddfe75c8a048f0d53d7cfe0d526f5975bc524 Mon Sep 17 00:00:00 2001 From: qixinbo Date: Wed, 18 Mar 2026 16:48:09 +0800 Subject: [PATCH] fix: interrupt error --- backend/main.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/backend/main.py b/backend/main.py index 78a3cfc..d838734 100644 --- a/backend/main.py +++ b/backend/main.py @@ -246,6 +246,7 @@ async def nanobot_chat(request: ChatRequest): @app.post("/nanobot/chat/stream") async def nanobot_chat_stream(request: ChatRequest): async def event_generator(): + current_task = None try: use_nl2sql, route_reason, resolved_source = _should_use_nl2sql(request) yield f"data: {json.dumps({'type': 'routing', 'selected': 'sql' if use_nl2sql else 'chat', 'reason': route_reason}, ensure_ascii=False)}\n\n" @@ -257,7 +258,7 @@ async def nanobot_chat_stream(request: ChatRequest): if content: await sql_progress_queue.put(content) - sql_task = asyncio.create_task( + current_task = asyncio.create_task( process_nl2sql( NL2SQLRequest( query=request.message, @@ -269,14 +270,14 @@ async def nanobot_chat_stream(request: ChatRequest): ) ) while True: - if sql_task.done() and sql_progress_queue.empty(): + if current_task.done() and sql_progress_queue.empty(): break try: progress = await asyncio.wait_for(sql_progress_queue.get(), timeout=0.2) yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n" except asyncio.TimeoutError: continue - nl2sql_result = await sql_task + nl2sql_result = await current_task if nl2sql_result.error: yield f"data: {json.dumps({'type': 'progress', 'content': f'出错:{nl2sql_result.error},正在整理结果'}, ensure_ascii=False)}\n\n" else: @@ -298,7 +299,7 @@ async def nanobot_chat_stream(request: ChatRequest): if content: await progress_queue.put(content) - task = asyncio.create_task( + current_task = asyncio.create_task( nanobot_service.process_message( request.message, session_id=request.session_id, @@ -310,23 +311,28 @@ async def nanobot_chat_stream(request: ChatRequest): yield f"data: {json.dumps({'type': 'progress', 'content': '已发送给模型,正在分析问题'}, ensure_ascii=False)}\n\n" text = "" while True: - if task.done() and progress_queue.empty(): + if current_task.done() and progress_queue.empty(): break try: progress = await asyncio.wait_for(progress_queue.get(), timeout=0.2) yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n" except asyncio.TimeoutError: continue - response = await task + response = await current_task text = response or "" for idx in range(0, len(text), STREAM_DELTA_CHUNK_SIZE): chunk = text[idx: idx + STREAM_DELTA_CHUNK_SIZE] yield f"data: {json.dumps({'type': 'delta', 'content': chunk}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'type': 'final', 'content': text}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n" + except asyncio.CancelledError: + raise except Exception as e: yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n" + finally: + if current_task and not current_task.done(): + current_task.cancel() return StreamingResponse( event_generator(),