2026-03-17 20:40:56 +08:00
import asyncio
2026-03-27 15:10:33 +08:00
import base64
import binascii
2026-03-17 11:38:02 +08:00
from typing import Any , Dict , List , Optional , Literal , Tuple
2026-03-27 15:10:33 +08:00
import mimetypes
from pathlib import Path
2026-03-29 17:12:46 +08:00
from dotenv import load_dotenv
# 加载项目根目录下的 .env 文件
env_path = Path ( __file__ ) . resolve ( ) . parent . parent / " .env "
load_dotenv ( dotenv_path = env_path )
2026-03-27 15:10:33 +08:00
from fastapi import FastAPI , HTTPException , Query
2026-03-16 23:16:33 +08:00
from fastapi . encoders import jsonable_encoder
2026-03-27 15:10:33 +08:00
from fastapi . responses import FileResponse , RedirectResponse , StreamingResponse
2026-03-14 15:44:48 +08:00
from fastapi . middleware . cors import CORSMiddleware
2026-03-19 17:48:52 +08:00
from fastapi . staticfiles import StaticFiles
2026-03-14 15:44:48 +08:00
from pydantic import BaseModel
2026-03-14 22:00:36 +08:00
import json
2026-03-17 11:38:02 +08:00
import re
2026-03-19 17:48:52 +08:00
import os
2026-03-15 18:25:38 +08:00
from datetime import datetime
2026-03-14 15:44:48 +08:00
2026-03-29 19:34:58 +08:00
from app . api import upload , llm , skills , users , datasources , projects , semantic , mcp , subagents , knowledge , embedding_models , web_search
2026-03-14 15:44:48 +08:00
from app . connectors . postgres import postgres_connector
from app . connectors . clickhouse import clickhouse_connector
2026-03-27 15:10:33 +08:00
from app . core . artifacts import extract_artifacts
2026-03-27 15:59:23 +08:00
from app . core . data_root import ensure_data_layout , get_data_root , get_reports_root
2026-03-27 15:10:33 +08:00
from app . core . files import ensure_artifact_access , resolve_artifact_target
2026-03-14 15:44:48 +08:00
from app . core . nanobot import nanobot_service
2026-03-14 23:15:41 +08:00
from app . core . session_alias_store import session_alias_store
2026-03-29 00:20:53 +08:00
from app . context import (
current_session_id ,
current_progress_callback ,
current_viz_data ,
current_data_source ,
current_file_url ,
current_knowledge_base_id ,
)
from app . services . knowledge_index import knowledge_index_service
2026-03-15 19:36:02 +08:00
from app . database import engine , Base
# Import all models to ensure they are registered
2026-03-29 17:12:46 +08:00
from app . models . user import User , EmailVerification
2026-03-16 16:12:35 +08:00
from app . models . project import Project
2026-03-15 19:36:02 +08:00
from app . models . datasource import DataSource
2026-03-28 01:01:13 +08:00
from app . models . subagent import Subagent
2026-03-14 15:44:48 +08:00
app = FastAPI ( )
app . add_middleware (
CORSMiddleware ,
allow_origins = [ " http://localhost:5173 " , " http://localhost:5174 " , " * " ] ,
allow_credentials = True ,
allow_methods = [ " * " ] ,
allow_headers = [ " * " ] ,
)
2026-03-15 19:36:02 +08:00
# Initialize database tables
Base . metadata . create_all ( bind = engine )
2026-03-19 17:48:52 +08:00
# Mount static directory for reports
2026-03-27 15:59:23 +08:00
try :
ensure_data_layout ( )
except Exception as e :
raise RuntimeError ( f " DATA_ROOT 初始化失败: { e } " ) from e
reports_dir = get_reports_root ( )
app . mount ( " /reports " , StaticFiles ( directory = str ( reports_dir ) ) , name = " reports " )
2026-03-19 17:48:52 +08:00
2026-03-14 15:44:48 +08:00
app . include_router ( upload . router , prefix = " /api/v1 " )
app . include_router ( llm . router , prefix = " /api/v1 " )
app . include_router ( skills . router , prefix = " /api/v1 " )
2026-03-14 19:20:37 +08:00
app . include_router ( users . router , prefix = " /api/v1 " )
2026-03-16 16:12:35 +08:00
app . include_router ( projects . router , prefix = " /api/v1 " )
2026-03-15 19:36:02 +08:00
app . include_router ( datasources . router , prefix = " /api/v1 " )
2026-03-16 22:18:23 +08:00
app . include_router ( semantic . router , prefix = " /api/v1 " )
2026-03-27 22:06:00 +08:00
app . include_router ( mcp . router , prefix = " /api/v1 " )
2026-03-28 01:01:13 +08:00
app . include_router ( subagents . router , prefix = " /api/v1 " )
2026-03-29 00:20:53 +08:00
app . include_router ( knowledge . router , prefix = " /api/v1 " )
2026-03-29 14:44:32 +08:00
app . include_router ( embedding_models . router , prefix = " /api/v1 " )
2026-03-29 19:34:58 +08:00
app . include_router ( web_search . router , prefix = " /api/v1 " )
2026-03-14 15:44:48 +08:00
2026-03-17 16:43:55 +08:00
STREAM_DELTA_CHUNK_SIZE = 48
2026-03-27 15:10:33 +08:00
PREVIEWABLE_TEXT_EXTENSIONS = {
" .txt " ,
" .md " ,
" .json " ,
" .csv " ,
" .tsv " ,
" .yaml " ,
" .yml " ,
" .xml " ,
" .log " ,
}
2026-03-17 16:43:55 +08:00
2026-03-14 15:44:48 +08:00
@app.on_event ( " startup " )
async def startup_event ( ) :
2026-03-27 15:59:23 +08:00
try :
data_root = get_data_root ( )
data_root . mkdir ( parents = True , exist_ok = True )
if not os . access ( data_root , os . R_OK | os . W_OK | os . X_OK ) :
raise RuntimeError ( f " DATA_ROOT 权限不足: { data_root } " )
except Exception as e :
raise RuntimeError ( f " DATA_ROOT 初始化失败: { e } " ) from e
2026-03-14 15:44:48 +08:00
# Initialize nanobot in background
try :
await nanobot_service . start ( )
except Exception as e :
print ( f " Nanobot startup failed: { e } " )
@app.on_event ( " shutdown " )
async def shutdown_event ( ) :
await nanobot_service . stop ( )
@app.get ( " / " )
def read_root ( ) :
return { " Hello " : " DataClaw Backend " }
@app.get ( " /connect/postgres " )
def test_postgres ( ) :
if postgres_connector . test_connection ( ) :
return { " status " : " success " , " message " : " Connected to PostgreSQL " }
raise HTTPException ( status_code = 500 , detail = " Failed to connect to PostgreSQL " )
@app.get ( " /connect/clickhouse " )
def test_clickhouse ( ) :
if clickhouse_connector . test_connection ( ) :
return { " status " : " success " , " message " : " Connected to ClickHouse " }
raise HTTPException ( status_code = 500 , detail = " Failed to connect to ClickHouse " )
@app.get ( " /nanobot/status " )
def nanobot_status ( ) :
if nanobot_service . agent :
return { " status " : " running " , " model " : nanobot_service . agent . model }
return { " status " : " stopped " }
2026-03-27 15:10:33 +08:00
def _guess_mime_type ( path : os . PathLike [ str ] | str ) - > str :
mime_type , _ = mimetypes . guess_type ( str ( path ) )
return mime_type or " application/octet-stream "
def _resolve_checked_target ( target : str ) - > os . PathLike [ str ] :
path = resolve_artifact_target ( target )
if path is None :
raise HTTPException ( status_code = 404 , detail = " 目标文件不存在 " )
try :
return ensure_artifact_access ( path , require_file = True )
except FileNotFoundError :
raise HTTPException ( status_code = 404 , detail = " 目标文件不存在 " )
except PermissionError :
raise HTTPException ( status_code = 403 , detail = " 非法路径访问 " )
def _is_previewable ( path : os . PathLike [ str ] , mime_type : str ) - > bool :
suffix = os . path . splitext ( str ( path ) ) [ 1 ] . lower ( )
if suffix in { " .html " , " .htm " , " .pdf " , " .pptx " } :
return True
if suffix in PREVIEWABLE_TEXT_EXTENSIONS :
return True
return mime_type . startswith ( " image/ " ) or mime_type . startswith ( " text/ " )
def _encode_web_root ( path : Path ) - > str :
return base64 . urlsafe_b64encode ( str ( path ) . encode ( " utf-8 " ) ) . decode ( " utf-8 " ) . rstrip ( " = " )
def _decode_web_root ( token : str ) - > Path :
padding = " = " * ( - len ( token ) % 4 )
try :
decoded = base64 . urlsafe_b64decode ( ( token + padding ) . encode ( " utf-8 " ) ) . decode ( " utf-8 " )
except ( binascii . Error , UnicodeDecodeError ) :
raise HTTPException ( status_code = 400 , detail = " 非法预览目录标识 " )
return Path ( decoded )
@app.get ( " /nanobot/artifacts/download " )
def download_artifact ( target : str = Query ( . . . ) ) :
resolved = _resolve_checked_target ( target )
return FileResponse (
path = str ( resolved ) ,
media_type = " application/octet-stream " ,
filename = os . path . basename ( str ( resolved ) ) ,
)
@app.get ( " /nanobot/artifacts/preview " )
def preview_artifact ( target : str = Query ( . . . ) ) :
resolved = _resolve_checked_target ( target )
mime_type = _guess_mime_type ( resolved )
if not _is_previewable ( resolved , mime_type ) :
raise HTTPException ( status_code = 415 , detail = " 当前文件类型不支持预览,请使用下载 " )
suffix = os . path . splitext ( str ( resolved ) ) [ 1 ] . lower ( )
if suffix in { " .html " , " .htm " } :
root_token = _encode_web_root ( Path ( resolved ) . parent )
entry = Path ( resolved ) . name
return RedirectResponse ( url = f " /nanobot/artifacts/web/ { root_token } / { entry } " , status_code = 307 )
return FileResponse (
path = str ( resolved ) ,
media_type = mime_type ,
filename = os . path . basename ( str ( resolved ) ) ,
content_disposition_type = " inline " ,
)
@app.get ( " /nanobot/artifacts/web/ {root_token} / { resource_path:path} " )
def preview_web_artifact_resource ( root_token : str , resource_path : str ) :
root_dir = _decode_web_root ( root_token )
try :
safe_root = ensure_artifact_access ( root_dir , require_file = False )
except FileNotFoundError :
raise HTTPException ( status_code = 404 , detail = " Web 预览目录不存在 " )
except PermissionError :
raise HTTPException ( status_code = 403 , detail = " 非法路径访问 " )
candidate = os . path . join ( str ( safe_root ) , resource_path )
try :
resolved = ensure_artifact_access ( Path ( candidate ) , require_file = True )
except FileNotFoundError :
raise HTTPException ( status_code = 404 , detail = " Web 资源不存在 " )
except PermissionError :
raise HTTPException ( status_code = 403 , detail = " 非法路径访问 " )
if not Path ( resolved ) . is_relative_to ( Path ( safe_root ) ) :
raise HTTPException ( status_code = 403 , detail = " 非法路径访问 " )
return FileResponse (
path = str ( resolved ) ,
media_type = _guess_mime_type ( resolved ) ,
filename = os . path . basename ( str ( resolved ) ) ,
content_disposition_type = " inline " ,
)
2026-03-14 15:44:48 +08:00
class ChatRequest ( BaseModel ) :
message : str
2026-03-14 22:25:01 +08:00
session_id : str = " api:default "
2026-03-28 08:58:02 +08:00
project_id : Optional [ int ] = None
2026-03-14 15:44:48 +08:00
skill_ids : Optional [ List [ str ] ] = None
2026-03-14 22:00:36 +08:00
model_id : Optional [ str ] = None
2026-03-15 10:49:37 +08:00
source : str = " postgres "
prefer_sql_chart : bool = False
file_url : Optional [ str ] = None
2026-03-17 11:38:02 +08:00
route_mode : Literal [ " auto " , " chat " , " sql " ] = " auto "
2026-03-29 00:20:53 +08:00
knowledge_base_id : Optional [ str ] = None
2026-03-14 15:44:48 +08:00
2026-03-14 23:15:41 +08:00
2026-03-17 11:38:02 +08:00
def _session_context_for_routing ( session_id : str ) - > Dict [ str , Any ] :
if not nanobot_service . agent :
return { }
session = nanobot_service . agent . sessions . get_or_create ( session_id )
return session . metadata or { }
2026-03-18 21:58:11 +08:00
def _resolve_effective_source ( request : ChatRequest ) - > str :
2026-03-17 17:49:34 +08:00
session_ctx = _session_context_for_routing ( request . session_id )
session_source = ( session_ctx . get ( " selected_data_source " ) or " " ) . strip ( ) . lower ( )
request_source = ( request . source or " " ) . strip ( ) . lower ( )
effective_source = request_source
if session_source . startswith ( " ds: " ) or session_source == " upload " :
effective_source = session_source
2026-03-18 21:58:11 +08:00
return effective_source
2026-03-16 23:16:33 +08:00
2026-03-28 08:58:02 +08:00
2026-03-29 00:20:53 +08:00
def _resolve_effective_knowledge_base_id ( request : ChatRequest ) - > Optional [ str ] :
if request . knowledge_base_id :
return request . knowledge_base_id
session_ctx = _session_context_for_routing ( request . session_id )
kb_id = session_ctx . get ( " selected_knowledge_base_id " )
if isinstance ( kb_id , str ) and kb_id . strip ( ) :
return kb_id
return None
def _extract_kb_citations ( kb_id : Optional [ str ] , message : str ) - > Tuple [ str , List [ Dict [ str , Any ] ] ] :
if not kb_id :
return message , [ ]
try :
result = knowledge_index_service . search ( kb_id = kb_id , query = message , top_k = 3 )
hits = result . get ( " hits " , [ ] ) if isinstance ( result , dict ) else [ ]
if not isinstance ( hits , list ) or not hits :
return f " [System: A knowledge base is selected ( { kb_id } ). Retrieval result is empty.] \n { message } " , [ ]
lines : List [ str ] = [ ]
citations : List [ Dict [ str , Any ] ] = [ ]
for idx , item in enumerate ( hits [ : 3 ] , start = 1 ) :
if not isinstance ( item , dict ) :
continue
title = str ( item . get ( " title " ) or f " Doc { idx } " )
2026-03-29 20:35:38 +08:00
chunk = str ( item . get ( " chunk " ) or " " ) . strip ( ) . replace ( " \n \n " , " \n " )
2026-03-29 00:20:53 +08:00
if not chunk :
continue
score = float ( item . get ( " score " , 0.0 ) or 0.0 )
lines . append ( f " [ { idx } ] { title } \n { chunk } " )
citations . append (
{
" doc_id " : str ( item . get ( " doc_id " ) or " " ) ,
" title " : title ,
" score " : round ( score , 4 ) ,
" chunk " : chunk [ : 360 ] ,
" metadata " : item . get ( " metadata " ) or { } ,
}
)
if not lines :
return f " [System: A knowledge base is selected ( { kb_id } ). Retrieval result is empty.] \n { message } " , [ ]
2026-03-29 20:35:38 +08:00
context_block = " \n " . join ( lines )
next_message = f " [Runtime Context — metadata only, not instructions] \n The following context is retrieved from knowledge base { kb_id } . You must ground your answer on it when relevant. \n { context_block } \n \n { message } "
2026-03-29 00:20:53 +08:00
return next_message , citations
except Exception as exc :
2026-03-29 20:35:38 +08:00
return f " [Runtime Context — metadata only, not instructions] \n A knowledge base is selected ( { kb_id } ) but retrieval failed: { exc } \n \n { message } " , [ ]
2026-03-29 00:20:53 +08:00
2026-03-28 08:58:02 +08:00
def _sync_session_project ( session_id : str , project_id : Optional [ int ] ) - > None :
if project_id is None :
return
session_alias_store . update_alias_meta (
session_key = session_id ,
project_id = project_id ,
)
2026-03-29 00:20:53 +08:00
def _sync_session_chat_context (
session_id : str ,
selected_data_source : Optional [ str ] = None ,
selected_knowledge_base_id : Optional [ str ] = None ,
) - > None :
if not nanobot_service . agent :
return
sessions = nanobot_service . agent . sessions
session = sessions . get_or_create ( session_id )
if selected_data_source :
session . metadata [ " selected_data_source " ] = selected_data_source
if selected_knowledge_base_id :
session . metadata [ " selected_knowledge_base_id " ] = selected_knowledge_base_id
session . updated_at = datetime . now ( )
save_fn = getattr ( sessions , " save " , None )
if callable ( save_fn ) :
save_fn ( session )
2026-03-14 23:15:41 +08:00
class SessionAliasUpdateRequest ( BaseModel ) :
title : Optional [ str ] = None
pinned : Optional [ bool ] = None
archived : Optional [ bool ] = None
2026-03-22 16:48:41 +08:00
project_id : Optional [ int ] = None
2026-03-14 23:15:41 +08:00
2026-03-15 17:57:09 +08:00
2026-03-15 20:55:42 +08:00
class BatchDeleteRequest ( BaseModel ) :
session_ids : List [ str ]
2026-03-15 18:25:38 +08:00
class SessionFileContextUpdateRequest ( BaseModel ) :
active_data_file : Optional [ Dict [ str , Any ] ] = None
2026-03-16 23:16:33 +08:00
selected_data_source : Optional [ str ] = None
2026-03-29 00:20:53 +08:00
selected_knowledge_base_id : Optional [ str ] = None
2026-03-15 18:25:38 +08:00
2026-03-27 15:10:33 +08:00
def _persist_assistant_enrichment (
session_id : str ,
viz_payload : Optional [ Dict [ str , Any ] ] = None ,
artifacts : Optional [ List [ Dict [ str , Any ] ] ] = None ,
2026-03-28 14:46:50 +08:00
usage : Optional [ Dict [ str , Any ] ] = None ,
2026-03-29 00:20:53 +08:00
kb_citations : Optional [ List [ Dict [ str , Any ] ] ] = None ,
2026-03-27 15:10:33 +08:00
) - > None :
if not nanobot_service . agent :
return
session = nanobot_service . agent . sessions . get_or_create ( session_id )
if not session . messages or session . messages [ - 1 ] . get ( " role " ) != " assistant " :
return
changed = False
if viz_payload :
session . messages [ - 1 ] [ " viz " ] = viz_payload
changed = True
if artifacts :
session . messages [ - 1 ] [ " artifacts " ] = artifacts
changed = True
2026-03-28 14:46:50 +08:00
if usage :
session . messages [ - 1 ] [ " usage " ] = usage
changed = True
2026-03-29 00:20:53 +08:00
if kb_citations is not None :
session . messages [ - 1 ] [ " kb_citations " ] = kb_citations
changed = True
2026-03-27 15:10:33 +08:00
if changed :
nanobot_service . agent . sessions . save ( session )
2026-03-28 14:46:50 +08:00
def _extract_reasoning_content ( session_messages : List [ Dict [ str , Any ] ] ) - > str :
for message in reversed ( session_messages ) :
if not isinstance ( message , dict ) :
continue
if message . get ( " role " ) != " assistant " :
continue
reasoning_content = message . get ( " reasoning_content " )
if isinstance ( reasoning_content , str ) and reasoning_content . strip ( ) :
return reasoning_content
break
return " "
2026-03-14 15:44:48 +08:00
@app.post ( " /nanobot/chat " )
async def nanobot_chat ( request : ChatRequest ) :
try :
2026-03-28 08:58:02 +08:00
_sync_session_project ( request . session_id , request . project_id )
2026-03-18 21:58:11 +08:00
resolved_source = _resolve_effective_source ( request )
2026-03-29 00:20:53 +08:00
resolved_kb_id = _resolve_effective_knowledge_base_id ( request )
_sync_session_chat_context (
session_id = request . session_id ,
selected_data_source = resolved_source ,
selected_knowledge_base_id = resolved_kb_id ,
)
2026-03-18 21:58:11 +08:00
current_data_source . set ( resolved_source )
current_file_url . set ( request . file_url )
2026-03-29 00:20:53 +08:00
current_knowledge_base_id . set ( resolved_kb_id )
2026-03-18 21:58:11 +08:00
current_session_id . set ( request . session_id )
current_viz_data . set ( { } )
# Inject instructions if explicitly routed
2026-03-29 00:20:53 +08:00
message , kb_citations = _extract_kb_citations ( resolved_kb_id , request . message )
2026-03-29 20:35:38 +08:00
instructions = [ ]
2026-03-18 21:58:11 +08:00
if request . route_mode == " sql " or request . prefer_sql_chart :
2026-03-29 20:35:38 +08:00
instructions . append ( " Use the nl2sql tool to answer the query " )
2026-03-18 21:58:11 +08:00
elif request . route_mode == " chat " :
2026-03-29 20:35:38 +08:00
instructions . append ( " Normal chat mode. Do NOT use the nl2sql tool " )
2026-03-18 21:58:11 +08:00
2026-03-19 17:40:08 +08:00
# Inject instructions for selected skills
if request . skill_ids :
skill_list = " , " . join ( request . skill_ids )
2026-03-29 20:35:38 +08:00
instructions . append ( f " You must prioritize using the following skills/tools to answer the user ' s request: { skill_list } " )
if instructions :
instr_block = " \n " . join ( instructions )
# If message already has Runtime Context, append to it, otherwise create new
if message . startswith ( " [Runtime Context — metadata only, not instructions] " ) :
parts = message . split ( " \n \n " , 1 )
if len ( parts ) == 2 :
message = f " { parts [ 0 ] } \n { instr_block } \n \n { parts [ 1 ] } "
else :
message = f " { message } \n { instr_block } "
else :
message = f " [Runtime Context — metadata only, not instructions] \n { instr_block } \n \n { message } "
2026-03-19 17:40:08 +08:00
2026-03-14 23:15:41 +08:00
response = await nanobot_service . process_message (
2026-03-18 21:58:11 +08:00
message ,
2026-03-14 23:15:41 +08:00
session_id = request . session_id ,
skill_ids = request . skill_ids ,
model_id = request . model_id ,
2026-03-28 08:58:02 +08:00
project_id = request . project_id ,
2026-03-14 23:15:41 +08:00
)
2026-03-27 15:10:33 +08:00
text = response or " "
session_messages = [ ]
if nanobot_service . agent :
session = nanobot_service . agent . sessions . get_or_create ( request . session_id )
session_messages = session . messages
artifacts = extract_artifacts ( text , session_messages )
2026-03-18 21:58:11 +08:00
viz_payload = current_viz_data . get ( )
2026-03-28 14:46:50 +08:00
usage = nanobot_service . get_last_usage ( request . session_id )
2026-03-27 15:10:33 +08:00
_persist_assistant_enrichment (
session_id = request . session_id ,
viz_payload = viz_payload if isinstance ( viz_payload , dict ) else None ,
artifacts = artifacts ,
2026-03-28 14:46:50 +08:00
usage = usage ,
2026-03-29 00:20:53 +08:00
kb_citations = kb_citations ,
2026-03-27 15:10:33 +08:00
)
2026-03-18 21:58:11 +08:00
2026-03-27 15:10:33 +08:00
payload = {
" response " : text ,
2026-03-18 21:58:11 +08:00
" viz " : viz_payload ,
" routing " : { " selected " : " agent " , " reason " : " auto_routed_by_agent " } ,
}
2026-03-27 15:10:33 +08:00
if artifacts :
payload [ " artifacts " ] = artifacts
2026-03-28 14:46:50 +08:00
if usage :
payload [ " usage " ] = usage
2026-03-29 00:20:53 +08:00
if kb_citations :
payload [ " kb_citations " ] = kb_citations
2026-03-27 15:10:33 +08:00
return payload
2026-03-14 15:44:48 +08:00
except Exception as e :
raise HTTPException ( status_code = 500 , detail = str ( e ) )
2026-03-14 22:00:36 +08:00
@app.post ( " /nanobot/chat/stream " )
async def nanobot_chat_stream ( request : ChatRequest ) :
async def event_generator ( ) :
2026-03-18 16:48:09 +08:00
current_task = None
2026-03-14 22:00:36 +08:00
try :
2026-03-28 08:58:02 +08:00
_sync_session_project ( request . session_id , request . project_id )
2026-03-18 21:58:11 +08:00
resolved_source = _resolve_effective_source ( request )
2026-03-29 00:20:53 +08:00
resolved_kb_id = _resolve_effective_knowledge_base_id ( request )
_sync_session_chat_context (
session_id = request . session_id ,
selected_data_source = resolved_source ,
selected_knowledge_base_id = resolved_kb_id ,
)
2026-03-18 21:58:11 +08:00
current_data_source . set ( resolved_source )
current_file_url . set ( request . file_url )
2026-03-29 00:20:53 +08:00
current_knowledge_base_id . set ( resolved_kb_id )
2026-03-18 21:58:11 +08:00
current_session_id . set ( request . session_id )
current_viz_data . set ( { } )
yield f " data: { json . dumps ( { ' type ' : ' routing ' , ' selected ' : ' agent ' , ' reason ' : ' auto_routed_by_agent ' } , ensure_ascii = False ) } \n \n "
2026-03-28 01:01:13 +08:00
progress_queue : asyncio . Queue [ Any ] = asyncio . Queue ( )
2026-03-17 20:40:56 +08:00
2026-03-18 21:58:11 +08:00
async def _on_progress ( content : str , * * kwargs : Any ) - > None :
2026-03-17 20:40:56 +08:00
if content :
2026-03-28 14:46:50 +08:00
payload : Dict [ str , Any ] = { " type " : " progress " , " content " : content }
payload . update ( kwargs )
await progress_queue . put ( payload )
2026-03-17 20:40:56 +08:00
2026-03-28 01:01:13 +08:00
async def _on_stream ( delta : str ) - > None :
if delta :
await progress_queue . put ( { " type " : " delta " , " content " : delta } )
2026-03-18 21:58:11 +08:00
current_progress_callback . set ( _on_progress )
# Inject instructions if explicitly routed
2026-03-29 00:20:53 +08:00
message , kb_citations = _extract_kb_citations ( resolved_kb_id , request . message )
2026-03-29 20:35:38 +08:00
instructions = [ ]
2026-03-18 21:58:11 +08:00
if request . route_mode == " sql " or request . prefer_sql_chart :
2026-03-29 20:35:38 +08:00
instructions . append ( " Use the nl2sql tool to answer the query " )
2026-03-18 21:58:11 +08:00
elif request . route_mode == " chat " :
2026-03-29 20:35:38 +08:00
instructions . append ( " Normal chat mode. Do NOT use the nl2sql tool " )
2026-03-18 21:58:11 +08:00
2026-03-19 17:40:08 +08:00
# Inject instructions for selected skills
if request . skill_ids :
skill_list = " , " . join ( request . skill_ids )
2026-03-29 20:35:38 +08:00
instructions . append ( f " You must prioritize using the following skills/tools to answer the user ' s request: { skill_list } " )
if instructions :
instr_block = " \n " . join ( instructions )
# If message already has Runtime Context, append to it, otherwise create new
if message . startswith ( " [Runtime Context — metadata only, not instructions] " ) :
parts = message . split ( " \n \n " , 1 )
if len ( parts ) == 2 :
message = f " { parts [ 0 ] } \n { instr_block } \n \n { parts [ 1 ] } "
else :
message = f " { message } \n { instr_block } "
else :
message = f " [Runtime Context — metadata only, not instructions] \n { instr_block } \n \n { message } "
2026-03-19 17:40:08 +08:00
2026-03-18 16:48:09 +08:00
current_task = asyncio . create_task (
2026-03-17 20:40:56 +08:00
nanobot_service . process_message (
2026-03-18 21:58:11 +08:00
message ,
2026-03-17 20:40:56 +08:00
session_id = request . session_id ,
skill_ids = request . skill_ids ,
model_id = request . model_id ,
2026-03-28 08:58:02 +08:00
project_id = request . project_id ,
2026-03-17 20:40:56 +08:00
on_progress = _on_progress ,
2026-03-28 01:01:13 +08:00
on_stream = _on_stream ,
2026-03-17 20:40:56 +08:00
)
2026-03-14 22:07:40 +08:00
)
2026-03-18 21:58:11 +08:00
2026-03-17 20:40:56 +08:00
text = " "
2026-03-19 15:33:12 +08:00
last_viz_hash = None
2026-03-18 21:58:11 +08:00
2026-03-17 20:40:56 +08:00
while True :
2026-03-18 21:58:11 +08:00
# Check for viz payload during processing
viz_payload = current_viz_data . get ( )
2026-03-19 15:33:12 +08:00
if viz_payload :
try :
# Only hash sql and chart to avoid dumping large result arrays every 0.2s
current_hash = hash ( (
viz_payload . get ( " sql " ) ,
viz_payload . get ( " error " ) ,
json . dumps ( viz_payload . get ( " chart " ) , sort_keys = True )
) )
if current_hash != last_viz_hash :
yield f " data: { json . dumps ( { ' type ' : ' viz ' , * * viz_payload } , ensure_ascii = False ) } \n \n "
last_viz_hash = current_hash
except Exception as e :
print ( f " Error checking viz_payload: { e } " )
2026-03-18 21:58:11 +08:00
2026-03-18 16:48:09 +08:00
if current_task . done ( ) and progress_queue . empty ( ) :
2026-03-17 20:40:56 +08:00
break
try :
progress = await asyncio . wait_for ( progress_queue . get ( ) , timeout = 0.2 )
2026-03-20 16:54:21 +08:00
if isinstance ( progress , dict ) :
yield f " data: { json . dumps ( progress , ensure_ascii = False ) } \n \n "
else :
yield f " data: { json . dumps ( { ' type ' : ' progress ' , ' content ' : progress } , ensure_ascii = False ) } \n \n "
2026-03-17 20:40:56 +08:00
except asyncio . TimeoutError :
2026-03-22 00:42:48 +08:00
yield " : keep-alive \n \n "
2026-03-17 20:40:56 +08:00
continue
2026-03-18 21:58:11 +08:00
2026-03-18 16:48:09 +08:00
response = await current_task
2026-03-14 22:07:40 +08:00
text = response or " "
2026-03-27 15:10:33 +08:00
session_messages = [ ]
if nanobot_service . agent :
session = nanobot_service . agent . sessions . get_or_create ( request . session_id )
session_messages = session . messages
artifacts = extract_artifacts ( text , session_messages )
2026-03-28 14:46:50 +08:00
reasoning_content = _extract_reasoning_content ( session_messages )
2026-03-18 21:58:11 +08:00
viz_payload = current_viz_data . get ( )
2026-03-28 14:46:50 +08:00
usage = nanobot_service . get_last_usage ( request . session_id )
2026-03-19 15:33:12 +08:00
if viz_payload :
try :
current_hash = hash ( (
viz_payload . get ( " sql " ) ,
viz_payload . get ( " error " ) ,
json . dumps ( viz_payload . get ( " chart " ) , sort_keys = True )
) )
if current_hash != last_viz_hash :
yield f " data: { json . dumps ( { ' type ' : ' viz ' , * * viz_payload } , ensure_ascii = False ) } \n \n "
last_viz_hash = current_hash
except Exception as e :
pass
2026-03-18 21:58:11 +08:00
2026-03-27 15:10:33 +08:00
_persist_assistant_enrichment (
session_id = request . session_id ,
viz_payload = viz_payload if isinstance ( viz_payload , dict ) else None ,
artifacts = artifacts ,
2026-03-28 14:46:50 +08:00
usage = usage ,
2026-03-29 00:20:53 +08:00
kb_citations = kb_citations ,
2026-03-27 15:10:33 +08:00
)
2026-03-19 15:24:31 +08:00
2026-03-27 15:10:33 +08:00
final_payload = { " type " : " final " , " content " : text }
2026-03-28 14:46:50 +08:00
if reasoning_content :
final_payload [ " reasoning_content " ] = reasoning_content
2026-03-27 15:10:33 +08:00
if artifacts :
final_payload [ " artifacts " ] = artifacts
2026-03-28 14:46:50 +08:00
if usage :
final_payload [ " usage " ] = usage
2026-03-29 00:20:53 +08:00
if kb_citations :
final_payload [ " kb_citations " ] = kb_citations
2026-03-27 15:10:33 +08:00
yield f " data: { json . dumps ( final_payload , ensure_ascii = False ) } \n \n "
2026-03-14 22:07:40 +08:00
yield f " data: { json . dumps ( { ' type ' : ' done ' } , ensure_ascii = False ) } \n \n "
2026-03-18 16:48:09 +08:00
except asyncio . CancelledError :
raise
2026-03-14 22:07:40 +08:00
except Exception as e :
yield f " data: { json . dumps ( { ' type ' : ' error ' , ' content ' : str ( e ) } , ensure_ascii = False ) } \n \n "
yield f " data: { json . dumps ( { ' type ' : ' done ' } , ensure_ascii = False ) } \n \n "
2026-03-18 16:48:09 +08:00
finally :
if current_task and not current_task . done ( ) :
current_task . cancel ( )
2026-03-14 22:00:36 +08:00
return StreamingResponse (
event_generator ( ) ,
media_type = " text/event-stream " ,
headers = {
" Cache-Control " : " no-cache " ,
" Connection " : " keep-alive " ,
" X-Accel-Buffering " : " no " ,
} ,
)
2026-03-14 22:25:01 +08:00
@app.get ( " /nanobot/sessions " )
2026-03-22 16:48:41 +08:00
def get_sessions ( project_id : Optional [ int ] = None ) :
2026-03-14 22:25:01 +08:00
if not nanobot_service . agent :
2026-03-22 16:48:41 +08:00
return session_alias_store . list_cached_sessions ( project_id = project_id )
2026-03-14 22:25:01 +08:00
sessions = nanobot_service . agent . sessions . list_sessions ( )
2026-03-22 16:48:41 +08:00
return session_alias_store . sync_and_list ( sessions , project_id = project_id )
2026-03-14 22:25:01 +08:00
@app.get ( " /nanobot/sessions/ {session_id} " )
def get_session ( session_id : str ) :
if not nanobot_service . agent :
raise HTTPException ( status_code = 400 , detail = " Nanobot not running " )
session = nanobot_service . agent . sessions . get_or_create ( session_id )
2026-03-14 23:15:41 +08:00
alias = session_alias_store . get_alias ( session_id )
2026-03-14 22:25:01 +08:00
return {
" key " : session . key ,
" created_at " : session . created_at ,
" updated_at " : session . updated_at ,
" metadata " : session . metadata ,
2026-03-14 23:15:41 +08:00
" alias " : alias ,
2026-03-14 22:25:01 +08:00
" messages " : session . messages
}
2026-03-22 16:48:41 +08:00
class EnsureSessionRequest ( BaseModel ) :
project_id : Optional [ int ] = None
2026-03-15 17:05:16 +08:00
@app.post ( " /nanobot/sessions/ {session_id} /ensure " )
2026-03-22 16:48:41 +08:00
def ensure_session ( session_id : str , request : EnsureSessionRequest = EnsureSessionRequest ( ) ) :
2026-03-15 17:05:16 +08:00
if not nanobot_service . agent :
raise HTTPException ( status_code = 400 , detail = " Nanobot not running " )
session = nanobot_service . agent . sessions . get_or_create ( session_id )
nanobot_service . agent . sessions . save ( session )
2026-03-22 16:48:41 +08:00
# Save project_id to the alias store immediately upon creation
if request . project_id is not None :
session_alias_store . update_alias_meta (
session_key = session_id ,
project_id = request . project_id
)
2026-03-15 17:05:16 +08:00
alias = session_alias_store . get_alias ( session_id )
return {
" key " : session . key ,
" created_at " : session . created_at ,
" updated_at " : session . updated_at ,
" metadata " : session . metadata ,
" alias " : alias ,
2026-03-22 16:48:41 +08:00
" project_id " : request . project_id
2026-03-15 17:05:16 +08:00
}
2026-03-14 22:25:01 +08:00
@app.delete ( " /nanobot/sessions/ {session_id} " )
def delete_session ( session_id : str ) :
if not nanobot_service . agent :
raise HTTPException ( status_code = 400 , detail = " Nanobot not running " )
# Try to remove from cache and delete file
session = nanobot_service . agent . sessions . get_or_create ( session_id )
if session :
nanobot_service . agent . sessions . invalidate ( session_id )
path = nanobot_service . agent . sessions . _get_session_path ( session_id )
if path . exists ( ) :
path . unlink ( )
2026-03-14 23:15:41 +08:00
session_alias_store . delete_session ( session_id )
2026-03-14 22:25:01 +08:00
return { " status " : " success " }
raise HTTPException ( status_code = 404 , detail = " Session not found " )
2026-03-15 20:55:42 +08:00
@app.post ( " /nanobot/sessions/batch-delete " )
def batch_delete_sessions ( request : BatchDeleteRequest ) :
if not nanobot_service . agent :
raise HTTPException ( status_code = 400 , detail = " Nanobot not running " )
deleted_ids = [ ]
for session_id in request . session_ids :
try :
# Try to remove from cache and delete file
session = nanobot_service . agent . sessions . get_or_create ( session_id )
if session :
nanobot_service . agent . sessions . invalidate ( session_id )
path = nanobot_service . agent . sessions . _get_session_path ( session_id )
if path . exists ( ) :
path . unlink ( )
session_alias_store . delete_session ( session_id )
deleted_ids . append ( session_id )
except Exception as e :
print ( f " Failed to delete session { session_id } : { e } " )
return { " status " : " success " , " deleted_count " : len ( deleted_ids ) , " deleted_ids " : deleted_ids }
2026-03-14 22:25:01 +08:00
@app.put ( " /nanobot/sessions/ {session_id} " )
2026-03-14 23:15:41 +08:00
def update_session ( session_id : str , payload : SessionAliasUpdateRequest ) :
updated = session_alias_store . update_alias_meta (
session_key = session_id ,
alias = payload . title ,
pinned = payload . pinned ,
archived = payload . archived ,
2026-03-22 16:48:41 +08:00
project_id = payload . project_id ,
2026-03-14 23:15:41 +08:00
)
return { " status " : " success " , * * updated }
2026-03-14 22:25:01 +08:00
2026-03-15 18:25:38 +08:00
@app.put ( " /nanobot/sessions/ {session_id} /context-file " )
def update_session_context_file ( session_id : str , payload : SessionFileContextUpdateRequest ) :
if not nanobot_service . agent :
raise HTTPException ( status_code = 400 , detail = " Nanobot not running " )
session = nanobot_service . agent . sessions . get_or_create ( session_id )
2026-03-16 23:16:33 +08:00
updated_fields = payload . model_fields_set
if " active_data_file " in updated_fields :
if payload . active_data_file is None :
session . metadata . pop ( " active_data_file " , None )
else :
session . metadata [ " active_data_file " ] = payload . active_data_file
if " selected_data_source " in updated_fields :
if payload . selected_data_source :
session . metadata [ " selected_data_source " ] = payload . selected_data_source
else :
session . metadata . pop ( " selected_data_source " , None )
2026-03-29 00:20:53 +08:00
if " selected_knowledge_base_id " in updated_fields :
if payload . selected_knowledge_base_id :
session . metadata [ " selected_knowledge_base_id " ] = payload . selected_knowledge_base_id
else :
session . metadata . pop ( " selected_knowledge_base_id " , None )
2026-03-15 18:25:38 +08:00
session . updated_at = datetime . now ( )
nanobot_service . agent . sessions . save ( session )
return { " status " : " success " , " metadata " : session . metadata }