fix: interrupt error
This commit is contained in:
+12
-6
@@ -246,6 +246,7 @@ async def nanobot_chat(request: ChatRequest):
|
|||||||
@app.post("/nanobot/chat/stream")
|
@app.post("/nanobot/chat/stream")
|
||||||
async def nanobot_chat_stream(request: ChatRequest):
|
async def nanobot_chat_stream(request: ChatRequest):
|
||||||
async def event_generator():
|
async def event_generator():
|
||||||
|
current_task = None
|
||||||
try:
|
try:
|
||||||
use_nl2sql, route_reason, resolved_source = _should_use_nl2sql(request)
|
use_nl2sql, route_reason, resolved_source = _should_use_nl2sql(request)
|
||||||
yield f"data: {json.dumps({'type': 'routing', 'selected': 'sql' if use_nl2sql else 'chat', 'reason': route_reason}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'routing', 'selected': 'sql' if use_nl2sql else 'chat', 'reason': route_reason}, ensure_ascii=False)}\n\n"
|
||||||
@@ -257,7 +258,7 @@ async def nanobot_chat_stream(request: ChatRequest):
|
|||||||
if content:
|
if content:
|
||||||
await sql_progress_queue.put(content)
|
await sql_progress_queue.put(content)
|
||||||
|
|
||||||
sql_task = asyncio.create_task(
|
current_task = asyncio.create_task(
|
||||||
process_nl2sql(
|
process_nl2sql(
|
||||||
NL2SQLRequest(
|
NL2SQLRequest(
|
||||||
query=request.message,
|
query=request.message,
|
||||||
@@ -269,14 +270,14 @@ async def nanobot_chat_stream(request: ChatRequest):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
if sql_task.done() and sql_progress_queue.empty():
|
if current_task.done() and sql_progress_queue.empty():
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
progress = await asyncio.wait_for(sql_progress_queue.get(), timeout=0.2)
|
progress = await asyncio.wait_for(sql_progress_queue.get(), timeout=0.2)
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n"
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
continue
|
continue
|
||||||
nl2sql_result = await sql_task
|
nl2sql_result = await current_task
|
||||||
if nl2sql_result.error:
|
if nl2sql_result.error:
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'content': f'出错:{nl2sql_result.error},正在整理结果'}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'content': f'出错:{nl2sql_result.error},正在整理结果'}, ensure_ascii=False)}\n\n"
|
||||||
else:
|
else:
|
||||||
@@ -298,7 +299,7 @@ async def nanobot_chat_stream(request: ChatRequest):
|
|||||||
if content:
|
if content:
|
||||||
await progress_queue.put(content)
|
await progress_queue.put(content)
|
||||||
|
|
||||||
task = asyncio.create_task(
|
current_task = asyncio.create_task(
|
||||||
nanobot_service.process_message(
|
nanobot_service.process_message(
|
||||||
request.message,
|
request.message,
|
||||||
session_id=request.session_id,
|
session_id=request.session_id,
|
||||||
@@ -310,23 +311,28 @@ async def nanobot_chat_stream(request: ChatRequest):
|
|||||||
yield f"data: {json.dumps({'type': 'progress', 'content': '已发送给模型,正在分析问题'}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'content': '已发送给模型,正在分析问题'}, ensure_ascii=False)}\n\n"
|
||||||
text = ""
|
text = ""
|
||||||
while True:
|
while True:
|
||||||
if task.done() and progress_queue.empty():
|
if current_task.done() and progress_queue.empty():
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.2)
|
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.2)
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n"
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
continue
|
continue
|
||||||
response = await task
|
response = await current_task
|
||||||
text = response or ""
|
text = response or ""
|
||||||
for idx in range(0, len(text), STREAM_DELTA_CHUNK_SIZE):
|
for idx in range(0, len(text), STREAM_DELTA_CHUNK_SIZE):
|
||||||
chunk = text[idx: idx + STREAM_DELTA_CHUNK_SIZE]
|
chunk = text[idx: idx + STREAM_DELTA_CHUNK_SIZE]
|
||||||
yield f"data: {json.dumps({'type': 'delta', 'content': chunk}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'delta', 'content': chunk}, ensure_ascii=False)}\n\n"
|
||||||
yield f"data: {json.dumps({'type': 'final', 'content': text}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'final', 'content': text}, ensure_ascii=False)}\n\n"
|
||||||
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n"
|
||||||
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
|
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
|
||||||
|
finally:
|
||||||
|
if current_task and not current_task.done():
|
||||||
|
current_task.cancel()
|
||||||
|
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
event_generator(),
|
event_generator(),
|
||||||
|
|||||||
Reference in New Issue
Block a user