feat: knowledge base first OK

This commit is contained in:
qixinbo
2026-03-29 00:20:53 +08:00
parent bd7776d1b7
commit 92e8c40826
17 changed files with 3357 additions and 10 deletions
+110 -4
View File
@@ -16,7 +16,7 @@ import re
import os
from datetime import datetime
from app.api import upload, llm, skills, users, datasources, projects, semantic, mcp, subagents
from app.api import upload, llm, skills, users, datasources, projects, semantic, mcp, subagents, knowledge
from app.connectors.postgres import postgres_connector
from app.connectors.clickhouse import clickhouse_connector
from app.core.artifacts import extract_artifacts
@@ -24,7 +24,15 @@ from app.core.data_root import ensure_data_layout, get_data_root, get_reports_ro
from app.core.files import ensure_artifact_access, resolve_artifact_target
from app.core.nanobot import nanobot_service
from app.core.session_alias_store import session_alias_store
from app.context import current_session_id, current_progress_callback, current_viz_data, current_data_source, current_file_url
from app.context import (
current_session_id,
current_progress_callback,
current_viz_data,
current_data_source,
current_file_url,
current_knowledge_base_id,
)
from app.services.knowledge_index import knowledge_index_service
from app.database import engine, Base
# Import all models to ensure they are registered
from app.models.user import User
@@ -62,6 +70,7 @@ app.include_router(datasources.router, prefix="/api/v1")
app.include_router(semantic.router, prefix="/api/v1")
app.include_router(mcp.router, prefix="/api/v1")
app.include_router(subagents.router, prefix="/api/v1")
app.include_router(knowledge.router, prefix="/api/v1")
STREAM_DELTA_CHUNK_SIZE = 48
PREVIEWABLE_TEXT_EXTENSIONS = {
@@ -221,6 +230,7 @@ class ChatRequest(BaseModel):
prefer_sql_chart: bool = False
file_url: Optional[str] = None
route_mode: Literal["auto", "chat", "sql"] = "auto"
knowledge_base_id: Optional[str] = None
def _session_context_for_routing(session_id: str) -> Dict[str, Any]:
@@ -240,6 +250,53 @@ def _resolve_effective_source(request: ChatRequest) -> str:
return effective_source
def _resolve_effective_knowledge_base_id(request: ChatRequest) -> Optional[str]:
if request.knowledge_base_id:
return request.knowledge_base_id
session_ctx = _session_context_for_routing(request.session_id)
kb_id = session_ctx.get("selected_knowledge_base_id")
if isinstance(kb_id, str) and kb_id.strip():
return kb_id
return None
def _extract_kb_citations(kb_id: Optional[str], message: str) -> Tuple[str, List[Dict[str, Any]]]:
if not kb_id:
return message, []
try:
result = knowledge_index_service.search(kb_id=kb_id, query=message, top_k=3)
hits = result.get("hits", []) if isinstance(result, dict) else []
if not isinstance(hits, list) or not hits:
return f"[System: A knowledge base is selected ({kb_id}). Retrieval result is empty.]\n{message}", []
lines: List[str] = []
citations: List[Dict[str, Any]] = []
for idx, item in enumerate(hits[:3], start=1):
if not isinstance(item, dict):
continue
title = str(item.get("title") or f"Doc {idx}")
chunk = str(item.get("chunk") or "").strip()
if not chunk:
continue
score = float(item.get("score", 0.0) or 0.0)
lines.append(f"[{idx}] {title}\n{chunk}")
citations.append(
{
"doc_id": str(item.get("doc_id") or ""),
"title": title,
"score": round(score, 4),
"chunk": chunk[:360],
"metadata": item.get("metadata") or {},
}
)
if not lines:
return f"[System: A knowledge base is selected ({kb_id}). Retrieval result is empty.]\n{message}", []
context_block = "\n\n".join(lines)
next_message = f"[System: The following context is retrieved from knowledge base {kb_id}. You must ground your answer on it when relevant.]\n{context_block}\n\n{message}"
return next_message, citations
except Exception as exc:
return f"[System: A knowledge base is selected ({kb_id}) but retrieval failed: {exc}]\n{message}", []
def _sync_session_project(session_id: str, project_id: Optional[int]) -> None:
if project_id is None:
return
@@ -248,6 +305,25 @@ def _sync_session_project(session_id: str, project_id: Optional[int]) -> None:
project_id=project_id,
)
def _sync_session_chat_context(
session_id: str,
selected_data_source: Optional[str] = None,
selected_knowledge_base_id: Optional[str] = None,
) -> None:
if not nanobot_service.agent:
return
sessions = nanobot_service.agent.sessions
session = sessions.get_or_create(session_id)
if selected_data_source:
session.metadata["selected_data_source"] = selected_data_source
if selected_knowledge_base_id:
session.metadata["selected_knowledge_base_id"] = selected_knowledge_base_id
session.updated_at = datetime.now()
save_fn = getattr(sessions, "save", None)
if callable(save_fn):
save_fn(session)
class SessionAliasUpdateRequest(BaseModel):
title: Optional[str] = None
pinned: Optional[bool] = None
@@ -262,6 +338,7 @@ class BatchDeleteRequest(BaseModel):
class SessionFileContextUpdateRequest(BaseModel):
active_data_file: Optional[Dict[str, Any]] = None
selected_data_source: Optional[str] = None
selected_knowledge_base_id: Optional[str] = None
def _persist_assistant_enrichment(
@@ -269,6 +346,7 @@ def _persist_assistant_enrichment(
viz_payload: Optional[Dict[str, Any]] = None,
artifacts: Optional[List[Dict[str, Any]]] = None,
usage: Optional[Dict[str, Any]] = None,
kb_citations: Optional[List[Dict[str, Any]]] = None,
) -> None:
if not nanobot_service.agent:
return
@@ -285,6 +363,9 @@ def _persist_assistant_enrichment(
if usage:
session.messages[-1]["usage"] = usage
changed = True
if kb_citations is not None:
session.messages[-1]["kb_citations"] = kb_citations
changed = True
if changed:
nanobot_service.agent.sessions.save(session)
@@ -306,13 +387,20 @@ async def nanobot_chat(request: ChatRequest):
try:
_sync_session_project(request.session_id, request.project_id)
resolved_source = _resolve_effective_source(request)
resolved_kb_id = _resolve_effective_knowledge_base_id(request)
_sync_session_chat_context(
session_id=request.session_id,
selected_data_source=resolved_source,
selected_knowledge_base_id=resolved_kb_id,
)
current_data_source.set(resolved_source)
current_file_url.set(request.file_url)
current_knowledge_base_id.set(resolved_kb_id)
current_session_id.set(request.session_id)
current_viz_data.set({})
# Inject instructions if explicitly routed
message = request.message
message, kb_citations = _extract_kb_citations(resolved_kb_id, request.message)
if request.route_mode == "sql" or request.prefer_sql_chart:
message = f"[System: Use the nl2sql tool to answer the query]\n{message}"
elif request.route_mode == "chat":
@@ -344,6 +432,7 @@ async def nanobot_chat(request: ChatRequest):
viz_payload=viz_payload if isinstance(viz_payload, dict) else None,
artifacts=artifacts,
usage=usage,
kb_citations=kb_citations,
)
payload = {
@@ -355,6 +444,8 @@ async def nanobot_chat(request: ChatRequest):
payload["artifacts"] = artifacts
if usage:
payload["usage"] = usage
if kb_citations:
payload["kb_citations"] = kb_citations
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -366,8 +457,15 @@ async def nanobot_chat_stream(request: ChatRequest):
try:
_sync_session_project(request.session_id, request.project_id)
resolved_source = _resolve_effective_source(request)
resolved_kb_id = _resolve_effective_knowledge_base_id(request)
_sync_session_chat_context(
session_id=request.session_id,
selected_data_source=resolved_source,
selected_knowledge_base_id=resolved_kb_id,
)
current_data_source.set(resolved_source)
current_file_url.set(request.file_url)
current_knowledge_base_id.set(resolved_kb_id)
current_session_id.set(request.session_id)
current_viz_data.set({})
@@ -388,7 +486,7 @@ async def nanobot_chat_stream(request: ChatRequest):
current_progress_callback.set(_on_progress)
# Inject instructions if explicitly routed
message = request.message
message, kb_citations = _extract_kb_citations(resolved_kb_id, request.message)
if request.route_mode == "sql" or request.prefer_sql_chart:
message = f"[System: Use the nl2sql tool to answer the query]\n{message}"
elif request.route_mode == "chat":
@@ -472,6 +570,7 @@ async def nanobot_chat_stream(request: ChatRequest):
viz_payload=viz_payload if isinstance(viz_payload, dict) else None,
artifacts=artifacts,
usage=usage,
kb_citations=kb_citations,
)
final_payload = {"type": "final", "content": text}
@@ -481,6 +580,8 @@ async def nanobot_chat_stream(request: ChatRequest):
final_payload["artifacts"] = artifacts
if usage:
final_payload["usage"] = usage
if kb_citations:
final_payload["kb_citations"] = kb_citations
yield f"data: {json.dumps(final_payload, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
except asyncio.CancelledError:
@@ -619,6 +720,11 @@ def update_session_context_file(session_id: str, payload: SessionFileContextUpda
session.metadata["selected_data_source"] = payload.selected_data_source
else:
session.metadata.pop("selected_data_source", None)
if "selected_knowledge_base_id" in updated_fields:
if payload.selected_knowledge_base_id:
session.metadata["selected_knowledge_base_id"] = payload.selected_knowledge_base_id
else:
session.metadata.pop("selected_knowledge_base_id", None)
session.updated_at = datetime.now()
nanobot_service.agent.sessions.save(session)
return {"status": "success", "metadata": session.metadata}