From 653a2782692838db56383cffd44fc68098078a2d Mon Sep 17 00:00:00 2001 From: qixinbo Date: Sat, 14 Mar 2026 22:00:36 +0800 Subject: [PATCH] feat: integrate nanobot --- backend/app/core/nanobot.py | 130 +++++++++++++++++- backend/data/llm_config.json | 0 backend/data/workspace/cron/jobs.json | 4 + .../data/workspace/sessions/api_default.jsonl | 15 ++ backend/main.py | 48 ++++++- frontend/src/components/ChatInterface.tsx | 99 ++++++++++--- 6 files changed, 270 insertions(+), 26 deletions(-) mode change 100644 => 100755 backend/data/llm_config.json create mode 100755 backend/data/workspace/cron/jobs.json create mode 100644 backend/data/workspace/sessions/api_default.jsonl diff --git a/backend/app/core/nanobot.py b/backend/app/core/nanobot.py index d3f1bdd..5b01907 100644 --- a/backend/app/core/nanobot.py +++ b/backend/app/core/nanobot.py @@ -2,7 +2,7 @@ import asyncio import sys import os from pathlib import Path -from typing import List +from typing import List, Callable, Awaitable # Add project root to sys.path to allow importing nanobot # Assuming backend/app/core/nanobot.py -> backend/app/core -> backend/app -> backend -> root @@ -40,12 +40,24 @@ class NanobotIntegration: self.config: Config | None = None def initialize(self): + # Set workspace path to backend/data/workspace + workspace_path = Path(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "data", "workspace")) + workspace_path.mkdir(parents=True, exist_ok=True) + + # Override config workspace path via environment variable (since config is loaded from env) + os.environ["NANOBOT_AGENTS__DEFAULTS__WORKSPACE"] = str(workspace_path) + self.config = load_config() + # No need to set self.config.workspace_path as it's a property that reads from agents.defaults.workspace + self.bus = MessageBus() provider = self._make_provider(self.config) - cron_store_path = get_cron_dir() / "jobs.json" - self.cron = CronService(cron_store_path) + cron_store_path = workspace_path / "cron" + cron_store_path.mkdir(parents=True, exist_ok=True) + cron_store_file = cron_store_path / "jobs.json" + + self.cron = CronService(cron_store_file) session_manager = SessionManager(self.config.workspace_path) @@ -75,6 +87,24 @@ class NanobotIntegration: provider_name = config.get_provider_name(model) p = config.get_provider(model) + # Check if model is using an ID from our database configuration + # This requires accessing the database or a cache of LLM configs + # Since we are inside NanobotIntegration, we can try to load from the JSON file directly for simplicity + # or rely on the caller to have injected the right config if they used environment variables. + # But here we need to support dynamic loading based on the `model` string if it matches a stored config ID. + + # However, typically the `model` passed here comes from `config.agents.defaults.model`. + # If we want to support dynamic switching per request, we should look at `agent.process_direct` arguments. + # The `AgentLoop` initializes with a provider, but `LiteLLMProvider` might be able to handle dynamic models if we pass them. + # BUT `LiteLLMProvider` is initialized with a specific `default_model`. + + # To support per-request model changes, we need to ensure the `provider` object or the `agent` can accept a model override. + # `AgentLoop` methods like `process_direct` don't typically take a `model` argument to override the provider's default. + # We might need to reinstantiate the provider or use a "DynamicProvider" that delegates based on context. + + # For now, let's assume standard initialization. + # If the user provides a `model_id` in `process_message`, we will handle it there by creating a temporary provider/agent or updating the current one. + if provider_name == "openai_codex" or model.startswith("openai-codex/"): return OpenAICodexProvider(default_model=model) @@ -120,11 +150,98 @@ class NanobotIntegration: if self.cron: self.cron.stop() - async def process_message(self, message: str, session_id: str = "api:default", skill_ids: List[str] | None = None): + async def process_message( + self, + message: str, + session_id: str = "api:default", + skill_ids: List[str] | None = None, + model_id: str | None = None, + on_progress: Callable[[str], Awaitable[None]] | None = None, + ): if not self.agent: self.initialize() await self.start() + # Handle dynamic model switching + # If model_id is provided, we need to fetch its config and create a temporary provider + # or update the current agent's provider context for this request. + # Since AgentLoop is stateful and tied to a provider, and we want to avoid recreating the whole agent for every request if possible, + # but changing the provider/model is a significant change. + # + # A simpler approach for this "stateless API" usage pattern: + # We can instantiate a lightweight version of the agent or provider just for this request if the model differs. + # OR, since we are using `process_direct`, we can check if `AgentLoop` supports overriding the model. + # Looking at `nanobot/agent/loop.py` (assumed), it uses `self.provider.completion(...)`. + + # Strategy: + # 1. Load the model config from our JSON file using `model_id`. + # 2. Construct a temporary provider instance for this model. + # 3. Inject this provider into the agent for this request OR (cleaner) instantiate a temporary agent. + # Instantiating a whole AgentLoop might be heavy due to MCP/Cron etc. + # BUT `process_direct` is relatively isolated. + # + # Let's try to fetch the config first. + current_provider = self.agent.provider + temp_provider = None + + if model_id: + from app.api.llm import _load_data + llm_configs = _load_data() + target_config = next((item for item in llm_configs if item["id"] == model_id), None) + + if target_config: + # Map our DB config to Nanobot Provider + # We reuse LiteLLMProvider for most cases as it is generic + + # Construct kwargs for LiteLLMProvider + provider_name = target_config["provider"] + model_name = target_config["model"] + + # Handle special case where provider might need to be part of model name for LiteLLM if not standard + # But LiteLLMProvider handles `provider_name` arg. + + temp_provider = LiteLLMProvider( + api_key=target_config.get("api_key"), + api_base=target_config.get("api_base"), + default_model=model_name, + extra_headers=target_config.get("extra_headers"), + provider_name=provider_name + ) + + # If we created a temp provider, we need to use it. + # Since AgentLoop binds the provider, we might need to swap it temporarily or create a new AgentLoop. + # Swapping is risky for concurrency. + # Creating a new AgentLoop is safer but heavier. + # + # Optimization: If we are just doing a single turn chat (process_direct), maybe we can just use the provider directly? + # But we want the Agent's reasoning loop (ReAct / tools). + # + # Let's try creating a temporary AgentLoop sharing the same components (bus, tools) but different provider. + + agent_to_use = self.agent + if temp_provider: + # Shallow copy or new instance + # We need to pass all dependencies. + agent_to_use = AgentLoop( + bus=self.bus, + provider=temp_provider, + workspace=self.config.workspace_path, + model=temp_provider.default_model, + temperature=self.config.agents.defaults.temperature, + max_tokens=self.config.agents.defaults.max_tokens, + max_iterations=self.config.agents.defaults.max_tool_iterations, + memory_window=self.config.agents.defaults.memory_window, + reasoning_effort=self.config.agents.defaults.reasoning_effort, + brave_api_key=self.config.tools.web.search.api_key or None, + web_proxy=self.config.tools.web.proxy or None, + exec_config=self.config.tools.exec, + cron_service=self.cron, + restrict_to_workspace=self.config.tools.restrict_to_workspace, + session_manager=self.agent.sessions, + mcp_servers=self.config.tools.mcp_servers, + channels_config=self.config.channels, + ) + full_message = message if skill_ids: skills = load_skills() @@ -138,11 +255,12 @@ class NanobotIntegration: # Append user message after skills full_message = f"{skill_context}\n\n{message}" - response = await self.agent.process_direct( + response = await agent_to_use.process_direct( full_message, session_key=session_id, channel="api", - chat_id=session_id + chat_id=session_id, + on_progress=on_progress, ) return response diff --git a/backend/data/llm_config.json b/backend/data/llm_config.json old mode 100644 new mode 100755 diff --git a/backend/data/workspace/cron/jobs.json b/backend/data/workspace/cron/jobs.json new file mode 100755 index 0000000..b8cdc50 --- /dev/null +++ b/backend/data/workspace/cron/jobs.json @@ -0,0 +1,4 @@ +{ + "version": 1, + "jobs": [] +} \ No newline at end of file diff --git a/backend/data/workspace/sessions/api_default.jsonl b/backend/data/workspace/sessions/api_default.jsonl new file mode 100644 index 0000000..fa5976b --- /dev/null +++ b/backend/data/workspace/sessions/api_default.jsonl @@ -0,0 +1,15 @@ +{"_type": "metadata", "key": "api:default", "created_at": "2026-03-14T21:43:22.940821", "updated_at": "2026-03-14T22:00:05.471408", "metadata": {}, "last_consolidated": 0} +{"role": "user", "content": "你是谁", "timestamp": "2026-03-14T21:43:23.163485"} +{"role": "user", "content": "hi", "timestamp": "2026-03-14T21:49:14.751350"} +{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:53:53.701083"} +{"role": "assistant", "content": "你好!我是 nanobot 🐈,一个 AI 助手。\n\n我可以帮助你:\n- 读取和编辑文件\n- 执行 shell 命令\n- 搜索网络信息\n- 发送消息\n- 安排定时任务\n- 管理记忆和技能\n\n我运行在 macOS arm64 上,工作目录在 `/Users/qixinbo/Projects/DataClaw/backend/data/workspace`。\n\n有什么我可以帮助你的吗?", "reasoning_content": "用户用中文问了两个问题:\n1. \"你是谁\" - Who are you\n2. \"hi\" - Hello\n3. \"你好\" - Hello\n\n我应该用中文回答,介绍我自己。", "timestamp": "2026-03-14T21:53:53.701092"} +{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:55:23.333254"} +{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?😊\n\n我可以帮你处理文件、执行命令、搜索信息、安排任务等等。", "reasoning_content": "用户再次发送了\"你好\",这是一个简单的问候。我应该友好地回应,并询问他们需要什么帮助。", "timestamp": "2026-03-14T21:55:23.333267"} +{"role": "user", "content": "你是谁", "timestamp": "2026-03-14T21:55:32.488828"} +{"role": "assistant", "content": "我是 nanobot 🐈,一个 AI 助手。\n\n我可以帮助你:\n- 读取和编辑文件\n- 执行 shell 命令\n- 搜索网络信息\n- 发送消息\n- 安排定时任务\n- 管理记忆和技能\n\n当前时间是 2026-03-14 21:55 (CST),星期六。", "reasoning_content": "用户再次问\"你是谁\",这是一个简单的问题。我应该简洁地回答我的身份。", "timestamp": "2026-03-14T21:55:32.488844"} +{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:58:52.496557"} +{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?😊\n\n我可以帮你处理文件、执行命令、搜索信息、安排任务等等。", "reasoning_content": "用户只是说\"你好\",这是一个简单的问候。我应该友好地回应,并询问他们需要什么帮助。", "timestamp": "2026-03-14T21:58:52.496564"} +{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:59:55.201372"} +{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?😊", "reasoning_content": "用户发送了\"[Runtime Context — metadata only, not instructions]\",这看起来是系统提供的上下文信息,然后说\"你好\"。我应该简单地回应问候。", "timestamp": "2026-03-14T21:59:55.201394"} +{"role": "user", "content": "你是谁", "timestamp": "2026-03-14T22:00:05.471381"} +{"role": "assistant", "content": "我是 nanobot 🐈,一个 AI 助手。", "reasoning_content": "用户再次问\"你是谁\"。我已经回答过这个问题了,但用户可能是在测试或者想要一个简洁的回答。我应该简洁地回答。", "timestamp": "2026-03-14T22:00:05.471404"} diff --git a/backend/main.py b/backend/main.py index ce45cb8..82ce20c 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,8 +1,10 @@ from typing import List, Optional from fastapi import FastAPI, HTTPException, Body +from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import asyncio +import json from app.api import upload, llm, skills, users from app.connectors.postgres import postgres_connector @@ -69,15 +71,59 @@ def nanobot_status(): class ChatRequest(BaseModel): message: str skill_ids: Optional[List[str]] = None + model_id: Optional[str] = None @app.post("/nanobot/chat") async def nanobot_chat(request: ChatRequest): try: - response = await nanobot_service.process_message(request.message, skill_ids=request.skill_ids) + response = await nanobot_service.process_message(request.message, skill_ids=request.skill_ids, model_id=request.model_id) return {"response": response} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) +@app.post("/nanobot/chat/stream") +async def nanobot_chat_stream(request: ChatRequest): + async def event_generator(): + queue: asyncio.Queue[dict] = asyncio.Queue() + + async def on_progress(content: str): + await queue.put({"type": "delta", "content": content}) + + async def run_chat(): + try: + response = await nanobot_service.process_message( + request.message, + skill_ids=request.skill_ids, + model_id=request.model_id, + on_progress=on_progress, + ) + await queue.put({"type": "final", "content": response}) + except Exception as e: + await queue.put({"type": "error", "content": str(e)}) + finally: + await queue.put({"type": "done"}) + + task = asyncio.create_task(run_chat()) + try: + while True: + event = await queue.get() + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + if event.get("type") == "done": + break + finally: + if not task.done(): + task.cancel() + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + @app.post("/api/v1/agent/nl2sql", response_model=NL2SQLResponse) async def run_nl2sql(request: NL2SQLRequest): return await process_nl2sql(request) diff --git a/frontend/src/components/ChatInterface.tsx b/frontend/src/components/ChatInterface.tsx index 9fd264a..13d00b5 100644 --- a/frontend/src/components/ChatInterface.tsx +++ b/frontend/src/components/ChatInterface.tsx @@ -28,7 +28,7 @@ export function ChatInterface() { { id: '1', role: 'assistant', content: 'Hello! I am DataClaw. How can I help you analyze your data today?' } ]); const [input, setInput] = useState(""); - const selectedSkill = "sql-generator"; + const [selectedCapability, setSelectedCapability] = useState("智能问答"); const selectedDataSource = "postgres-main"; const [isLoading, setIsLoading] = useState(false); const scrollRef = useRef(null); @@ -84,13 +84,83 @@ export function ChatInterface() { setVizError(null); try { - if (selectedSkill === 'sql-generator' || selectedSkill === 'chart-creator') { - // Use NL2SQL agent + if (selectedCapability === "智能问答") { + const assistantId = (Date.now() + 1).toString(); + setMessages(prev => [...prev, { + id: assistantId, + role: "assistant", + content: "" + }]); + + const token = localStorage.getItem("token"); + const response = await fetch("/nanobot/chat/stream", { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(token ? { Authorization: `Bearer ${token}` } : {}), + }, + body: JSON.stringify({ + message: newMessage.content, + model_id: selectedModelId, + }), + }); + + if (!response.ok || !response.body) { + const err = await response.json().catch(() => ({})); + throw new Error(err.detail || "流式响应失败"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + let streamedText = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const events = buffer.split("\n\n"); + buffer = events.pop() || ""; + + for (const eventBlock of events) { + const line = eventBlock + .split("\n") + .find((item) => item.startsWith("data:")); + if (!line) continue; + const payloadText = line.slice(5).trim(); + if (!payloadText) continue; + const payload = JSON.parse(payloadText) as { type: string; content?: string }; + + if (payload.type === "delta" && payload.content) { + streamedText = streamedText ? `${streamedText}\n${payload.content}` : payload.content; + setMessages((prev) => + prev.map((msg) => + msg.id === assistantId ? { ...msg, content: streamedText } : msg + ) + ); + } + + if (payload.type === "final" && payload.content) { + streamedText = payload.content; + setMessages((prev) => + prev.map((msg) => + msg.id === assistantId ? { ...msg, content: payload.content || "" } : msg + ) + ); + } + + if (payload.type === "error") { + throw new Error(payload.content || "流式响应错误"); + } + } + } + } else { + // Fallback to existing NL2SQL or other skills (e.g. for "表格问答" or "深度问数") const source = selectedDataSource.split('-')[0]; // postgres-main -> postgres const response = await api.post<{sql?: string, result?: unknown, error?: string}>('/api/v1/agent/nl2sql', { query: newMessage.content, source: source, - model_id: selectedModelId // Pass selected model ID if backend supports it + model_id: selectedModelId }); if (response.error) { @@ -110,20 +180,6 @@ export function ChatInterface() { }]); setVisualization(rows, sql); } - - } else { - // General Chat - const response = await api.post<{response: string}>('/nanobot/chat', { - message: newMessage.content, - skill_ids: [selectedSkill], - model_id: selectedModelId - }); - - setMessages(prev => [...prev, { - id: (Date.now() + 1).toString(), - role: 'assistant', - content: response.response - }]); } } catch (error: any) { setMessages(prev => [...prev, { @@ -218,7 +274,12 @@ export function ChatInterface() { {capabilities.map((cap) => (