feat: add langfuse

This commit is contained in:
qixinbo
2026-03-31 00:18:32 +08:00
parent ed0075c910
commit 01524aaff5
11 changed files with 1034 additions and 330 deletions
+176 -135
View File
@@ -39,6 +39,12 @@ from app.context import (
)
from app.services.knowledge_index import knowledge_index_service
from app.database import engine, Base
from app.trace import (
build_chat_trace_attributes,
build_error_attributes,
build_usage_attributes,
trace_service,
)
# Import all models to ensure they are registered
from app.models.user import User, EmailVerification
from app.models.project import Project
@@ -106,10 +112,12 @@ async def startup_event():
await nanobot_service.start()
except Exception as e:
print(f"Nanobot startup failed: {e}")
trace_service.initialize()
@app.on_event("shutdown")
async def shutdown_event():
await nanobot_service.stop()
trace_service.shutdown()
@app.get("/")
def read_root():
@@ -374,7 +382,9 @@ def _persist_assistant_enrichment(
session.messages[-1]["kb_citations"] = kb_citations
changed = True
if changed:
nanobot_service.agent.sessions.save(session)
save_fn = getattr(nanobot_service.agent.sessions, "save", None)
if callable(save_fn):
save_fn(session)
def _extract_reasoning_content(session_messages: List[Dict[str, Any]]) -> str:
@@ -470,153 +480,184 @@ async def nanobot_chat(request: ChatRequest):
async def nanobot_chat_stream(request: ChatRequest):
async def event_generator():
current_task = None
try:
_sync_session_project(request.session_id, request.project_id)
resolved_source = _resolve_effective_source(request)
resolved_kb_id = _resolve_effective_knowledge_base_id(request)
_sync_session_chat_context(
trace_attrs = build_chat_trace_attributes(
session_id=request.session_id,
project_id=request.project_id,
model_id=request.model_id,
route_mode=request.route_mode,
source=request.source,
knowledge_base_id=request.knowledge_base_id,
)
with trace_service.start_span(
"chat.stream",
attributes=trace_attrs,
input_payload={"message": request.message},
) as root_span:
root_span.update_trace(
session_id=request.session_id,
selected_data_source=resolved_source,
selected_knowledge_base_id=resolved_kb_id,
metadata=trace_attrs,
input={"message": request.message},
)
current_data_source.set(resolved_source)
current_file_url.set(request.file_url)
current_knowledge_base_id.set(resolved_kb_id)
current_session_id.set(request.session_id)
current_viz_data.set({})
yield f"data: {json.dumps({'type': 'routing', 'selected': 'agent', 'reason': 'auto_routed_by_agent'}, ensure_ascii=False)}\n\n"
progress_queue: asyncio.Queue[Any] = asyncio.Queue()
async def _on_progress(content: str, **kwargs: Any) -> None:
if content:
payload: Dict[str, Any] = {"type": "progress", "content": content}
payload.update(kwargs)
await progress_queue.put(payload)
async def _on_stream(delta: str) -> None:
if delta:
await progress_queue.put({"type": "delta", "content": delta})
current_progress_callback.set(_on_progress)
# Inject instructions if explicitly routed
message, kb_citations = _extract_kb_citations(resolved_kb_id, request.message)
instructions = []
if request.route_mode == "sql" or request.prefer_sql_chart:
instructions.append("Use the nl2sql tool to answer the query")
elif request.route_mode == "chat":
instructions.append("Normal chat mode. Do NOT use the nl2sql tool")
if instructions:
instr_block = "\n".join(instructions)
# If message already has Runtime Context, append to it, otherwise create new
if message.startswith("[Runtime Context — metadata only, not instructions]"):
parts = message.split("\n\n", 1)
if len(parts) == 2:
message = f"{parts[0]}\n{instr_block}\n\n{parts[1]}"
else:
message = f"{message}\n{instr_block}"
else:
message = f"[Runtime Context — metadata only, not instructions]\n{instr_block}\n\n{message}"
current_task = asyncio.create_task(
nanobot_service.process_message(
message,
try:
_sync_session_project(request.session_id, request.project_id)
resolved_source = _resolve_effective_source(request)
resolved_kb_id = _resolve_effective_knowledge_base_id(request)
_sync_session_chat_context(
session_id=request.session_id,
skill_ids=request.skill_ids,
model_id=request.model_id,
project_id=request.project_id,
on_progress=_on_progress,
on_stream=_on_stream,
selected_data_source=resolved_source,
selected_knowledge_base_id=resolved_kb_id,
)
)
text = ""
last_viz_hash = None
current_data_source.set(resolved_source)
current_file_url.set(request.file_url)
current_knowledge_base_id.set(resolved_kb_id)
current_session_id.set(request.session_id)
current_viz_data.set({})
while True:
# Check for viz payload during processing
yield f"data: {json.dumps({'type': 'routing', 'selected': 'agent', 'reason': 'auto_routed_by_agent'}, ensure_ascii=False)}\n\n"
progress_queue: asyncio.Queue[Any] = asyncio.Queue()
async def _on_progress(content: str, **kwargs: Any) -> None:
if content:
payload: Dict[str, Any] = {"type": "progress", "content": content}
payload.update(kwargs)
await progress_queue.put(payload)
async def _on_stream(delta: str) -> None:
if delta:
await progress_queue.put({"type": "delta", "content": delta})
current_progress_callback.set(_on_progress)
message, kb_citations = _extract_kb_citations(resolved_kb_id, request.message)
instructions = []
if request.route_mode == "sql" or request.prefer_sql_chart:
instructions.append("Use the nl2sql tool to answer the query")
elif request.route_mode == "chat":
instructions.append("Normal chat mode. Do NOT use the nl2sql tool")
if instructions:
instr_block = "\n".join(instructions)
if message.startswith("[Runtime Context — metadata only, not instructions]"):
parts = message.split("\n\n", 1)
if len(parts) == 2:
message = f"{parts[0]}\n{instr_block}\n\n{parts[1]}"
else:
message = f"{message}\n{instr_block}"
else:
message = f"[Runtime Context — metadata only, not instructions]\n{instr_block}\n\n{message}"
current_task = asyncio.create_task(
nanobot_service.process_message(
message,
session_id=request.session_id,
skill_ids=request.skill_ids,
model_id=request.model_id,
project_id=request.project_id,
on_progress=_on_progress,
on_stream=_on_stream,
)
)
text = ""
last_viz_hash = None
while True:
viz_payload = current_viz_data.get()
if viz_payload:
try:
current_hash = hash(
(
viz_payload.get("sql"),
viz_payload.get("error"),
json.dumps(viz_payload.get("chart"), sort_keys=True),
)
)
if current_hash != last_viz_hash:
yield f"data: {json.dumps({'type': 'viz', **viz_payload}, ensure_ascii=False)}\n\n"
last_viz_hash = current_hash
except Exception as e:
print(f"Error checking viz_payload: {e}")
if current_task.done() and progress_queue.empty():
break
try:
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.2)
if isinstance(progress, dict):
yield f"data: {json.dumps(progress, ensure_ascii=False)}\n\n"
else:
yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
yield ": keep-alive\n\n"
continue
response = await current_task
text = response or ""
session_messages = []
if nanobot_service.agent:
session = nanobot_service.agent.sessions.get_or_create(request.session_id)
session_messages = session.messages
artifacts = extract_artifacts(text, session_messages)
reasoning_content = _extract_reasoning_content(session_messages)
viz_payload = current_viz_data.get()
usage = nanobot_service.get_last_usage(request.session_id)
root_span.set_attributes(
{
"response.length": len(text),
"response.has_reasoning": bool(reasoning_content),
"response.has_artifacts": bool(artifacts),
"response.has_viz": bool(viz_payload),
"response.has_kb_citations": bool(kb_citations),
}
)
root_span.set_attributes(build_usage_attributes(usage))
if viz_payload:
try:
# Only hash sql and chart to avoid dumping large result arrays every 0.2s
current_hash = hash((
viz_payload.get("sql"),
viz_payload.get("error"),
json.dumps(viz_payload.get("chart"), sort_keys=True)
))
current_hash = hash(
(
viz_payload.get("sql"),
viz_payload.get("error"),
json.dumps(viz_payload.get("chart"), sort_keys=True),
)
)
if current_hash != last_viz_hash:
yield f"data: {json.dumps({'type': 'viz', **viz_payload}, ensure_ascii=False)}\n\n"
last_viz_hash = current_hash
except Exception as e:
print(f"Error checking viz_payload: {e}")
except Exception:
pass
if current_task.done() and progress_queue.empty():
break
try:
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.2)
if isinstance(progress, dict):
yield f"data: {json.dumps(progress, ensure_ascii=False)}\n\n"
else:
yield f"data: {json.dumps({'type': 'progress', 'content': progress}, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
yield ": keep-alive\n\n"
continue
_persist_assistant_enrichment(
session_id=request.session_id,
viz_payload=viz_payload if isinstance(viz_payload, dict) else None,
artifacts=artifacts,
usage=usage,
kb_citations=kb_citations,
)
response = await current_task
text = response or ""
session_messages = []
if nanobot_service.agent:
session = nanobot_service.agent.sessions.get_or_create(request.session_id)
session_messages = session.messages
artifacts = extract_artifacts(text, session_messages)
reasoning_content = _extract_reasoning_content(session_messages)
final_payload = {"type": "final", "content": text}
if reasoning_content:
final_payload["reasoning_content"] = reasoning_content
if artifacts:
final_payload["artifacts"] = artifacts
if usage:
final_payload["usage"] = usage
if kb_citations:
final_payload["kb_citations"] = kb_citations
viz_payload = current_viz_data.get()
usage = nanobot_service.get_last_usage(request.session_id)
if viz_payload:
try:
current_hash = hash((
viz_payload.get("sql"),
viz_payload.get("error"),
json.dumps(viz_payload.get("chart"), sort_keys=True)
))
if current_hash != last_viz_hash:
yield f"data: {json.dumps({'type': 'viz', **viz_payload}, ensure_ascii=False)}\n\n"
last_viz_hash = current_hash
except Exception as e:
pass
_persist_assistant_enrichment(
session_id=request.session_id,
viz_payload=viz_payload if isinstance(viz_payload, dict) else None,
artifacts=artifacts,
usage=usage,
kb_citations=kb_citations,
)
final_payload = {"type": "final", "content": text}
if reasoning_content:
final_payload["reasoning_content"] = reasoning_content
if artifacts:
final_payload["artifacts"] = artifacts
if usage:
final_payload["usage"] = usage
if kb_citations:
final_payload["kb_citations"] = kb_citations
yield f"data: {json.dumps(final_payload, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
except asyncio.CancelledError:
raise
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
finally:
if current_task and not current_task.done():
current_task.cancel()
root_span.update(output={"content": text, "usage": usage})
yield f"data: {json.dumps(final_payload, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
except asyncio.CancelledError:
root_span.set_attributes({"cancelled": True})
raise
except Exception as e:
root_span.set_attributes(build_error_attributes(e, stage="chat_stream"))
root_span.record_error(e, stage="chat_stream")
yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
finally:
if current_task and not current_task.done():
current_task.cancel()
return StreamingResponse(
event_generator(),