feat: integrate nanobot
This commit is contained in:
+124
-6
@@ -2,7 +2,7 @@ import asyncio
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import List, Callable, Awaitable
|
||||
|
||||
# Add project root to sys.path to allow importing nanobot
|
||||
# Assuming backend/app/core/nanobot.py -> backend/app/core -> backend/app -> backend -> root
|
||||
@@ -40,12 +40,24 @@ class NanobotIntegration:
|
||||
self.config: Config | None = None
|
||||
|
||||
def initialize(self):
|
||||
# Set workspace path to backend/data/workspace
|
||||
workspace_path = Path(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "data", "workspace"))
|
||||
workspace_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Override config workspace path via environment variable (since config is loaded from env)
|
||||
os.environ["NANOBOT_AGENTS__DEFAULTS__WORKSPACE"] = str(workspace_path)
|
||||
|
||||
self.config = load_config()
|
||||
# No need to set self.config.workspace_path as it's a property that reads from agents.defaults.workspace
|
||||
|
||||
self.bus = MessageBus()
|
||||
provider = self._make_provider(self.config)
|
||||
|
||||
cron_store_path = get_cron_dir() / "jobs.json"
|
||||
self.cron = CronService(cron_store_path)
|
||||
cron_store_path = workspace_path / "cron"
|
||||
cron_store_path.mkdir(parents=True, exist_ok=True)
|
||||
cron_store_file = cron_store_path / "jobs.json"
|
||||
|
||||
self.cron = CronService(cron_store_file)
|
||||
|
||||
session_manager = SessionManager(self.config.workspace_path)
|
||||
|
||||
@@ -75,6 +87,24 @@ class NanobotIntegration:
|
||||
provider_name = config.get_provider_name(model)
|
||||
p = config.get_provider(model)
|
||||
|
||||
# Check if model is using an ID from our database configuration
|
||||
# This requires accessing the database or a cache of LLM configs
|
||||
# Since we are inside NanobotIntegration, we can try to load from the JSON file directly for simplicity
|
||||
# or rely on the caller to have injected the right config if they used environment variables.
|
||||
# But here we need to support dynamic loading based on the `model` string if it matches a stored config ID.
|
||||
|
||||
# However, typically the `model` passed here comes from `config.agents.defaults.model`.
|
||||
# If we want to support dynamic switching per request, we should look at `agent.process_direct` arguments.
|
||||
# The `AgentLoop` initializes with a provider, but `LiteLLMProvider` might be able to handle dynamic models if we pass them.
|
||||
# BUT `LiteLLMProvider` is initialized with a specific `default_model`.
|
||||
|
||||
# To support per-request model changes, we need to ensure the `provider` object or the `agent` can accept a model override.
|
||||
# `AgentLoop` methods like `process_direct` don't typically take a `model` argument to override the provider's default.
|
||||
# We might need to reinstantiate the provider or use a "DynamicProvider" that delegates based on context.
|
||||
|
||||
# For now, let's assume standard initialization.
|
||||
# If the user provides a `model_id` in `process_message`, we will handle it there by creating a temporary provider/agent or updating the current one.
|
||||
|
||||
if provider_name == "openai_codex" or model.startswith("openai-codex/"):
|
||||
return OpenAICodexProvider(default_model=model)
|
||||
|
||||
@@ -120,11 +150,98 @@ class NanobotIntegration:
|
||||
if self.cron:
|
||||
self.cron.stop()
|
||||
|
||||
async def process_message(self, message: str, session_id: str = "api:default", skill_ids: List[str] | None = None):
|
||||
async def process_message(
|
||||
self,
|
||||
message: str,
|
||||
session_id: str = "api:default",
|
||||
skill_ids: List[str] | None = None,
|
||||
model_id: str | None = None,
|
||||
on_progress: Callable[[str], Awaitable[None]] | None = None,
|
||||
):
|
||||
if not self.agent:
|
||||
self.initialize()
|
||||
await self.start()
|
||||
|
||||
# Handle dynamic model switching
|
||||
# If model_id is provided, we need to fetch its config and create a temporary provider
|
||||
# or update the current agent's provider context for this request.
|
||||
# Since AgentLoop is stateful and tied to a provider, and we want to avoid recreating the whole agent for every request if possible,
|
||||
# but changing the provider/model is a significant change.
|
||||
#
|
||||
# A simpler approach for this "stateless API" usage pattern:
|
||||
# We can instantiate a lightweight version of the agent or provider just for this request if the model differs.
|
||||
# OR, since we are using `process_direct`, we can check if `AgentLoop` supports overriding the model.
|
||||
# Looking at `nanobot/agent/loop.py` (assumed), it uses `self.provider.completion(...)`.
|
||||
|
||||
# Strategy:
|
||||
# 1. Load the model config from our JSON file using `model_id`.
|
||||
# 2. Construct a temporary provider instance for this model.
|
||||
# 3. Inject this provider into the agent for this request OR (cleaner) instantiate a temporary agent.
|
||||
# Instantiating a whole AgentLoop might be heavy due to MCP/Cron etc.
|
||||
# BUT `process_direct` is relatively isolated.
|
||||
#
|
||||
# Let's try to fetch the config first.
|
||||
current_provider = self.agent.provider
|
||||
temp_provider = None
|
||||
|
||||
if model_id:
|
||||
from app.api.llm import _load_data
|
||||
llm_configs = _load_data()
|
||||
target_config = next((item for item in llm_configs if item["id"] == model_id), None)
|
||||
|
||||
if target_config:
|
||||
# Map our DB config to Nanobot Provider
|
||||
# We reuse LiteLLMProvider for most cases as it is generic
|
||||
|
||||
# Construct kwargs for LiteLLMProvider
|
||||
provider_name = target_config["provider"]
|
||||
model_name = target_config["model"]
|
||||
|
||||
# Handle special case where provider might need to be part of model name for LiteLLM if not standard
|
||||
# But LiteLLMProvider handles `provider_name` arg.
|
||||
|
||||
temp_provider = LiteLLMProvider(
|
||||
api_key=target_config.get("api_key"),
|
||||
api_base=target_config.get("api_base"),
|
||||
default_model=model_name,
|
||||
extra_headers=target_config.get("extra_headers"),
|
||||
provider_name=provider_name
|
||||
)
|
||||
|
||||
# If we created a temp provider, we need to use it.
|
||||
# Since AgentLoop binds the provider, we might need to swap it temporarily or create a new AgentLoop.
|
||||
# Swapping is risky for concurrency.
|
||||
# Creating a new AgentLoop is safer but heavier.
|
||||
#
|
||||
# Optimization: If we are just doing a single turn chat (process_direct), maybe we can just use the provider directly?
|
||||
# But we want the Agent's reasoning loop (ReAct / tools).
|
||||
#
|
||||
# Let's try creating a temporary AgentLoop sharing the same components (bus, tools) but different provider.
|
||||
|
||||
agent_to_use = self.agent
|
||||
if temp_provider:
|
||||
# Shallow copy or new instance
|
||||
# We need to pass all dependencies.
|
||||
agent_to_use = AgentLoop(
|
||||
bus=self.bus,
|
||||
provider=temp_provider,
|
||||
workspace=self.config.workspace_path,
|
||||
model=temp_provider.default_model,
|
||||
temperature=self.config.agents.defaults.temperature,
|
||||
max_tokens=self.config.agents.defaults.max_tokens,
|
||||
max_iterations=self.config.agents.defaults.max_tool_iterations,
|
||||
memory_window=self.config.agents.defaults.memory_window,
|
||||
reasoning_effort=self.config.agents.defaults.reasoning_effort,
|
||||
brave_api_key=self.config.tools.web.search.api_key or None,
|
||||
web_proxy=self.config.tools.web.proxy or None,
|
||||
exec_config=self.config.tools.exec,
|
||||
cron_service=self.cron,
|
||||
restrict_to_workspace=self.config.tools.restrict_to_workspace,
|
||||
session_manager=self.agent.sessions,
|
||||
mcp_servers=self.config.tools.mcp_servers,
|
||||
channels_config=self.config.channels,
|
||||
)
|
||||
|
||||
full_message = message
|
||||
if skill_ids:
|
||||
skills = load_skills()
|
||||
@@ -138,11 +255,12 @@ class NanobotIntegration:
|
||||
# Append user message after skills
|
||||
full_message = f"{skill_context}\n\n{message}"
|
||||
|
||||
response = await self.agent.process_direct(
|
||||
response = await agent_to_use.process_direct(
|
||||
full_message,
|
||||
session_key=session_id,
|
||||
channel="api",
|
||||
chat_id=session_id
|
||||
chat_id=session_id,
|
||||
on_progress=on_progress,
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
Regular → Executable
Executable
+4
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"version": 1,
|
||||
"jobs": []
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
{"_type": "metadata", "key": "api:default", "created_at": "2026-03-14T21:43:22.940821", "updated_at": "2026-03-14T22:00:05.471408", "metadata": {}, "last_consolidated": 0}
|
||||
{"role": "user", "content": "你是谁", "timestamp": "2026-03-14T21:43:23.163485"}
|
||||
{"role": "user", "content": "hi", "timestamp": "2026-03-14T21:49:14.751350"}
|
||||
{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:53:53.701083"}
|
||||
{"role": "assistant", "content": "你好!我是 nanobot 🐈,一个 AI 助手。\n\n我可以帮助你:\n- 读取和编辑文件\n- 执行 shell 命令\n- 搜索网络信息\n- 发送消息\n- 安排定时任务\n- 管理记忆和技能\n\n我运行在 macOS arm64 上,工作目录在 `/Users/qixinbo/Projects/DataClaw/backend/data/workspace`。\n\n有什么我可以帮助你的吗?", "reasoning_content": "用户用中文问了两个问题:\n1. \"你是谁\" - Who are you\n2. \"hi\" - Hello\n3. \"你好\" - Hello\n\n我应该用中文回答,介绍我自己。", "timestamp": "2026-03-14T21:53:53.701092"}
|
||||
{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:55:23.333254"}
|
||||
{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?😊\n\n我可以帮你处理文件、执行命令、搜索信息、安排任务等等。", "reasoning_content": "用户再次发送了\"你好\",这是一个简单的问候。我应该友好地回应,并询问他们需要什么帮助。", "timestamp": "2026-03-14T21:55:23.333267"}
|
||||
{"role": "user", "content": "你是谁", "timestamp": "2026-03-14T21:55:32.488828"}
|
||||
{"role": "assistant", "content": "我是 nanobot 🐈,一个 AI 助手。\n\n我可以帮助你:\n- 读取和编辑文件\n- 执行 shell 命令\n- 搜索网络信息\n- 发送消息\n- 安排定时任务\n- 管理记忆和技能\n\n当前时间是 2026-03-14 21:55 (CST),星期六。", "reasoning_content": "用户再次问\"你是谁\",这是一个简单的问题。我应该简洁地回答我的身份。", "timestamp": "2026-03-14T21:55:32.488844"}
|
||||
{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:58:52.496557"}
|
||||
{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?😊\n\n我可以帮你处理文件、执行命令、搜索信息、安排任务等等。", "reasoning_content": "用户只是说\"你好\",这是一个简单的问候。我应该友好地回应,并询问他们需要什么帮助。", "timestamp": "2026-03-14T21:58:52.496564"}
|
||||
{"role": "user", "content": "你好", "timestamp": "2026-03-14T21:59:55.201372"}
|
||||
{"role": "assistant", "content": "你好!有什么我可以帮助你的吗?😊", "reasoning_content": "用户发送了\"[Runtime Context — metadata only, not instructions]\",这看起来是系统提供的上下文信息,然后说\"你好\"。我应该简单地回应问候。", "timestamp": "2026-03-14T21:59:55.201394"}
|
||||
{"role": "user", "content": "你是谁", "timestamp": "2026-03-14T22:00:05.471381"}
|
||||
{"role": "assistant", "content": "我是 nanobot 🐈,一个 AI 助手。", "reasoning_content": "用户再次问\"你是谁\"。我已经回答过这个问题了,但用户可能是在测试或者想要一个简洁的回答。我应该简洁地回答。", "timestamp": "2026-03-14T22:00:05.471404"}
|
||||
+47
-1
@@ -1,8 +1,10 @@
|
||||
from typing import List, Optional
|
||||
from fastapi import FastAPI, HTTPException, Body
|
||||
from fastapi.responses import StreamingResponse
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from pydantic import BaseModel
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from app.api import upload, llm, skills, users
|
||||
from app.connectors.postgres import postgres_connector
|
||||
@@ -69,15 +71,59 @@ def nanobot_status():
|
||||
class ChatRequest(BaseModel):
|
||||
message: str
|
||||
skill_ids: Optional[List[str]] = None
|
||||
model_id: Optional[str] = None
|
||||
|
||||
@app.post("/nanobot/chat")
|
||||
async def nanobot_chat(request: ChatRequest):
|
||||
try:
|
||||
response = await nanobot_service.process_message(request.message, skill_ids=request.skill_ids)
|
||||
response = await nanobot_service.process_message(request.message, skill_ids=request.skill_ids, model_id=request.model_id)
|
||||
return {"response": response}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.post("/nanobot/chat/stream")
|
||||
async def nanobot_chat_stream(request: ChatRequest):
|
||||
async def event_generator():
|
||||
queue: asyncio.Queue[dict] = asyncio.Queue()
|
||||
|
||||
async def on_progress(content: str):
|
||||
await queue.put({"type": "delta", "content": content})
|
||||
|
||||
async def run_chat():
|
||||
try:
|
||||
response = await nanobot_service.process_message(
|
||||
request.message,
|
||||
skill_ids=request.skill_ids,
|
||||
model_id=request.model_id,
|
||||
on_progress=on_progress,
|
||||
)
|
||||
await queue.put({"type": "final", "content": response})
|
||||
except Exception as e:
|
||||
await queue.put({"type": "error", "content": str(e)})
|
||||
finally:
|
||||
await queue.put({"type": "done"})
|
||||
|
||||
task = asyncio.create_task(run_chat())
|
||||
try:
|
||||
while True:
|
||||
event = await queue.get()
|
||||
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
|
||||
if event.get("type") == "done":
|
||||
break
|
||||
finally:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
@app.post("/api/v1/agent/nl2sql", response_model=NL2SQLResponse)
|
||||
async def run_nl2sql(request: NL2SQLRequest):
|
||||
return await process_nl2sql(request)
|
||||
|
||||
@@ -28,7 +28,7 @@ export function ChatInterface() {
|
||||
{ id: '1', role: 'assistant', content: 'Hello! I am DataClaw. How can I help you analyze your data today?' }
|
||||
]);
|
||||
const [input, setInput] = useState("");
|
||||
const selectedSkill = "sql-generator";
|
||||
const [selectedCapability, setSelectedCapability] = useState<string>("智能问答");
|
||||
const selectedDataSource = "postgres-main";
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const scrollRef = useRef<HTMLDivElement>(null);
|
||||
@@ -84,13 +84,83 @@ export function ChatInterface() {
|
||||
setVizError(null);
|
||||
|
||||
try {
|
||||
if (selectedSkill === 'sql-generator' || selectedSkill === 'chart-creator') {
|
||||
// Use NL2SQL agent
|
||||
if (selectedCapability === "智能问答") {
|
||||
const assistantId = (Date.now() + 1).toString();
|
||||
setMessages(prev => [...prev, {
|
||||
id: assistantId,
|
||||
role: "assistant",
|
||||
content: ""
|
||||
}]);
|
||||
|
||||
const token = localStorage.getItem("token");
|
||||
const response = await fetch("/nanobot/chat/stream", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
...(token ? { Authorization: `Bearer ${token}` } : {}),
|
||||
},
|
||||
body: JSON.stringify({
|
||||
message: newMessage.content,
|
||||
model_id: selectedModelId,
|
||||
}),
|
||||
});
|
||||
|
||||
if (!response.ok || !response.body) {
|
||||
const err = await response.json().catch(() => ({}));
|
||||
throw new Error(err.detail || "流式响应失败");
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder("utf-8");
|
||||
let buffer = "";
|
||||
let streamedText = "";
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const events = buffer.split("\n\n");
|
||||
buffer = events.pop() || "";
|
||||
|
||||
for (const eventBlock of events) {
|
||||
const line = eventBlock
|
||||
.split("\n")
|
||||
.find((item) => item.startsWith("data:"));
|
||||
if (!line) continue;
|
||||
const payloadText = line.slice(5).trim();
|
||||
if (!payloadText) continue;
|
||||
const payload = JSON.parse(payloadText) as { type: string; content?: string };
|
||||
|
||||
if (payload.type === "delta" && payload.content) {
|
||||
streamedText = streamedText ? `${streamedText}\n${payload.content}` : payload.content;
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === assistantId ? { ...msg, content: streamedText } : msg
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (payload.type === "final" && payload.content) {
|
||||
streamedText = payload.content;
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === assistantId ? { ...msg, content: payload.content || "" } : msg
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (payload.type === "error") {
|
||||
throw new Error(payload.content || "流式响应错误");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback to existing NL2SQL or other skills (e.g. for "表格问答" or "深度问数")
|
||||
const source = selectedDataSource.split('-')[0]; // postgres-main -> postgres
|
||||
const response = await api.post<{sql?: string, result?: unknown, error?: string}>('/api/v1/agent/nl2sql', {
|
||||
query: newMessage.content,
|
||||
source: source,
|
||||
model_id: selectedModelId // Pass selected model ID if backend supports it
|
||||
model_id: selectedModelId
|
||||
});
|
||||
|
||||
if (response.error) {
|
||||
@@ -110,20 +180,6 @@ export function ChatInterface() {
|
||||
}]);
|
||||
setVisualization(rows, sql);
|
||||
}
|
||||
|
||||
} else {
|
||||
// General Chat
|
||||
const response = await api.post<{response: string}>('/nanobot/chat', {
|
||||
message: newMessage.content,
|
||||
skill_ids: [selectedSkill],
|
||||
model_id: selectedModelId
|
||||
});
|
||||
|
||||
setMessages(prev => [...prev, {
|
||||
id: (Date.now() + 1).toString(),
|
||||
role: 'assistant',
|
||||
content: response.response
|
||||
}]);
|
||||
}
|
||||
} catch (error: any) {
|
||||
setMessages(prev => [...prev, {
|
||||
@@ -218,7 +274,12 @@ export function ChatInterface() {
|
||||
{capabilities.map((cap) => (
|
||||
<button
|
||||
key={cap.label}
|
||||
className={`flex items-center gap-1.5 px-3 py-1.5 rounded-full text-xs font-medium transition-colors ${cap.bg} ${cap.color} hover:opacity-80`}
|
||||
onClick={() => setSelectedCapability(cap.label)}
|
||||
className={`flex items-center gap-1.5 px-3 py-1.5 rounded-full text-xs font-medium transition-colors ${
|
||||
selectedCapability === cap.label
|
||||
? `${cap.bg} ${cap.color} ring-1 ring-${cap.color.split('-')[1]}-200 shadow-sm`
|
||||
: 'bg-zinc-50 text-zinc-500 hover:bg-zinc-100'
|
||||
}`}
|
||||
>
|
||||
<cap.icon className="h-3.5 w-3.5" />
|
||||
{cap.label}
|
||||
|
||||
Reference in New Issue
Block a user