Files
DataClaw/backend/main.py
T

275 lines
10 KiB
Python
Raw Normal View History

2026-03-15 18:25:38 +08:00
from typing import Any, Dict, List, Optional
2026-03-14 23:15:41 +08:00
from fastapi import FastAPI, HTTPException
2026-03-14 22:00:36 +08:00
from fastapi.responses import StreamingResponse
2026-03-14 15:44:48 +08:00
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import asyncio
2026-03-14 22:00:36 +08:00
import json
2026-03-15 18:25:38 +08:00
from datetime import datetime
2026-03-14 15:44:48 +08:00
2026-03-14 19:20:37 +08:00
from app.api import upload, llm, skills, users
2026-03-14 15:44:48 +08:00
from app.connectors.postgres import postgres_connector
from app.connectors.clickhouse import clickhouse_connector
from app.core.nanobot import nanobot_service
2026-03-14 23:15:41 +08:00
from app.core.session_alias_store import session_alias_store
2026-03-14 15:44:48 +08:00
from app.agent.nl2sql import process_nl2sql, NL2SQLRequest, NL2SQLResponse
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:5174", "*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(upload.router, prefix="/api/v1")
app.include_router(llm.router, prefix="/api/v1")
app.include_router(skills.router, prefix="/api/v1")
2026-03-14 19:20:37 +08:00
app.include_router(users.router, prefix="/api/v1")
2026-03-14 15:44:48 +08:00
@app.on_event("startup")
async def startup_event():
# Initialize nanobot in background
try:
await nanobot_service.start()
except Exception as e:
print(f"Nanobot startup failed: {e}")
@app.on_event("shutdown")
async def shutdown_event():
await nanobot_service.stop()
@app.get("/")
def read_root():
return {"Hello": "DataClaw Backend"}
@app.get("/connect/postgres")
def test_postgres():
if postgres_connector.test_connection():
return {"status": "success", "message": "Connected to PostgreSQL"}
raise HTTPException(status_code=500, detail="Failed to connect to PostgreSQL")
@app.get("/connect/clickhouse")
def test_clickhouse():
if clickhouse_connector.test_connection():
return {"status": "success", "message": "Connected to ClickHouse"}
raise HTTPException(status_code=500, detail="Failed to connect to ClickHouse")
@app.get("/nanobot/status")
def nanobot_status():
if nanobot_service.agent:
return {"status": "running", "model": nanobot_service.agent.model}
return {"status": "stopped"}
class ChatRequest(BaseModel):
message: str
2026-03-14 22:25:01 +08:00
session_id: str = "api:default"
2026-03-14 15:44:48 +08:00
skill_ids: Optional[List[str]] = None
2026-03-14 22:00:36 +08:00
model_id: Optional[str] = None
2026-03-15 10:49:37 +08:00
source: str = "postgres"
prefer_sql_chart: bool = False
file_url: Optional[str] = None
2026-03-14 15:44:48 +08:00
2026-03-14 23:15:41 +08:00
class SessionAliasUpdateRequest(BaseModel):
title: Optional[str] = None
pinned: Optional[bool] = None
archived: Optional[bool] = None
2026-03-15 17:57:09 +08:00
2026-03-15 18:25:38 +08:00
class SessionFileContextUpdateRequest(BaseModel):
active_data_file: Optional[Dict[str, Any]] = None
2026-03-15 17:57:09 +08:00
def _build_sql_chart_text(nl2sql_result: NL2SQLResponse) -> str:
chart = nl2sql_result.chart
can_visualize = bool(chart and chart.can_visualize and chart.chart_spec)
text = (
f"已为你生成 SQL 并查询到 {len(nl2sql_result.result)} 行数据。"
f"{'可视化面板已同步更新图表。' if can_visualize else '本次结果不适合图表展示。'}"
)
if chart and chart.reasoning:
return f"{text}\n\n可视化说明:{chart.reasoning}"
return text
def _build_sql_chart_viz(nl2sql_result: NL2SQLResponse) -> dict:
chart = nl2sql_result.chart
return {
"sql": nl2sql_result.sql,
"result": nl2sql_result.result,
"chart": chart.model_dump() if chart else None,
"error": nl2sql_result.error,
}
def _persist_session_turn(
session_id: str,
user_message: str,
assistant_message: str,
assistant_extra: Optional[dict] = None,
) -> None:
if not nanobot_service.agent:
return
session = nanobot_service.agent.sessions.get_or_create(session_id)
session.add_message("user", user_message)
session.add_message("assistant", assistant_message, **(assistant_extra or {}))
nanobot_service.agent.sessions.save(session)
2026-03-14 15:44:48 +08:00
@app.post("/nanobot/chat")
async def nanobot_chat(request: ChatRequest):
try:
2026-03-15 10:49:37 +08:00
if request.prefer_sql_chart:
nl2sql_result = await process_nl2sql(
NL2SQLRequest(query=request.message, source=request.source, file_url=request.file_url)
)
2026-03-15 17:57:09 +08:00
text = _build_sql_chart_text(nl2sql_result)
viz_payload = _build_sql_chart_viz(nl2sql_result)
_persist_session_turn(request.session_id, request.message, text, {"viz": viz_payload})
2026-03-15 10:49:37 +08:00
return {
"response": text,
2026-03-15 17:57:09 +08:00
"viz": viz_payload,
2026-03-15 10:49:37 +08:00
}
2026-03-14 23:15:41 +08:00
response = await nanobot_service.process_message(
request.message,
session_id=request.session_id,
skill_ids=request.skill_ids,
model_id=request.model_id,
)
2026-03-14 15:44:48 +08:00
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
2026-03-14 22:00:36 +08:00
@app.post("/nanobot/chat/stream")
async def nanobot_chat_stream(request: ChatRequest):
async def event_generator():
try:
2026-03-15 10:49:37 +08:00
if request.prefer_sql_chart:
nl2sql_result = await process_nl2sql(
NL2SQLRequest(query=request.message, source=request.source, file_url=request.file_url)
)
2026-03-15 17:57:09 +08:00
persisted_viz_payload = _build_sql_chart_viz(nl2sql_result)
2026-03-15 10:49:37 +08:00
viz_payload = {
"type": "viz",
2026-03-15 17:57:09 +08:00
**persisted_viz_payload,
2026-03-15 10:49:37 +08:00
}
yield f"data: {json.dumps(viz_payload, ensure_ascii=False)}\n\n"
2026-03-15 17:57:09 +08:00
text = _build_sql_chart_text(nl2sql_result)
_persist_session_turn(request.session_id, request.message, text, {"viz": persisted_viz_payload})
2026-03-15 10:49:37 +08:00
yield f"data: {json.dumps({'type': 'final', 'content': text}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
return
2026-03-14 22:07:40 +08:00
response = await nanobot_service.process_message(
request.message,
2026-03-14 22:25:01 +08:00
session_id=request.session_id,
2026-03-14 22:07:40 +08:00
skill_ids=request.skill_ids,
model_id=request.model_id,
)
text = response or ""
for ch in text:
yield f"data: {json.dumps({'type': 'delta', 'content': ch}, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.008)
yield f"data: {json.dumps({'type': 'final', 'content': text}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
2026-03-14 22:00:36 +08:00
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
2026-03-14 22:25:01 +08:00
@app.get("/nanobot/sessions")
def get_sessions():
if not nanobot_service.agent:
2026-03-14 23:15:41 +08:00
return session_alias_store.list_cached_sessions()
2026-03-14 22:25:01 +08:00
sessions = nanobot_service.agent.sessions.list_sessions()
2026-03-14 23:15:41 +08:00
return session_alias_store.sync_and_list(sessions)
2026-03-14 22:25:01 +08:00
@app.get("/nanobot/sessions/{session_id}")
def get_session(session_id: str):
if not nanobot_service.agent:
raise HTTPException(status_code=400, detail="Nanobot not running")
session = nanobot_service.agent.sessions.get_or_create(session_id)
2026-03-14 23:15:41 +08:00
alias = session_alias_store.get_alias(session_id)
2026-03-14 22:25:01 +08:00
return {
"key": session.key,
"created_at": session.created_at,
"updated_at": session.updated_at,
"metadata": session.metadata,
2026-03-14 23:15:41 +08:00
"alias": alias,
2026-03-14 22:25:01 +08:00
"messages": session.messages
}
2026-03-15 17:05:16 +08:00
@app.post("/nanobot/sessions/{session_id}/ensure")
def ensure_session(session_id: str):
if not nanobot_service.agent:
raise HTTPException(status_code=400, detail="Nanobot not running")
session = nanobot_service.agent.sessions.get_or_create(session_id)
nanobot_service.agent.sessions.save(session)
alias = session_alias_store.get_alias(session_id)
return {
"key": session.key,
"created_at": session.created_at,
"updated_at": session.updated_at,
"metadata": session.metadata,
"alias": alias,
}
2026-03-14 22:25:01 +08:00
@app.delete("/nanobot/sessions/{session_id}")
def delete_session(session_id: str):
if not nanobot_service.agent:
raise HTTPException(status_code=400, detail="Nanobot not running")
# Try to remove from cache and delete file
session = nanobot_service.agent.sessions.get_or_create(session_id)
if session:
nanobot_service.agent.sessions.invalidate(session_id)
path = nanobot_service.agent.sessions._get_session_path(session_id)
if path.exists():
path.unlink()
2026-03-14 23:15:41 +08:00
session_alias_store.delete_session(session_id)
2026-03-14 22:25:01 +08:00
return {"status": "success"}
raise HTTPException(status_code=404, detail="Session not found")
@app.put("/nanobot/sessions/{session_id}")
2026-03-14 23:15:41 +08:00
def update_session(session_id: str, payload: SessionAliasUpdateRequest):
updated = session_alias_store.update_alias_meta(
session_key=session_id,
alias=payload.title,
pinned=payload.pinned,
archived=payload.archived,
)
return {"status": "success", **updated}
2026-03-14 22:25:01 +08:00
2026-03-15 18:25:38 +08:00
@app.put("/nanobot/sessions/{session_id}/context-file")
def update_session_context_file(session_id: str, payload: SessionFileContextUpdateRequest):
if not nanobot_service.agent:
raise HTTPException(status_code=400, detail="Nanobot not running")
session = nanobot_service.agent.sessions.get_or_create(session_id)
if payload.active_data_file is None:
session.metadata.pop("active_data_file", None)
else:
session.metadata["active_data_file"] = payload.active_data_file
session.updated_at = datetime.now()
nanobot_service.agent.sessions.save(session)
return {"status": "success", "metadata": session.metadata}
2026-03-14 15:44:48 +08:00
@app.post("/api/v1/agent/nl2sql", response_model=NL2SQLResponse)
async def run_nl2sql(request: NL2SQLRequest):
2026-03-15 17:57:09 +08:00
result = await process_nl2sql(request)
if request.session_id:
text = _build_sql_chart_text(result)
viz_payload = _build_sql_chart_viz(result)
_persist_session_turn(request.session_id, request.query, text, {"viz": viz_payload})
return result