"""Utility functions for nanobot.""" import base64 import json import re import time from datetime import datetime from pathlib import Path from typing import Any import tiktoken def strip_think(text: str) -> str: """Remove blocks and any unclosed trailing tag.""" text = re.sub(r"[\s\S]*?", "", text) text = re.sub(r"[\s\S]*$", "", text) return text.strip() def detect_image_mime(data: bytes) -> str | None: """Detect image MIME type from magic bytes, ignoring file extension.""" if data[:8] == b"\x89PNG\r\n\x1a\n": return "image/png" if data[:3] == b"\xff\xd8\xff": return "image/jpeg" if data[:6] in (b"GIF87a", b"GIF89a"): return "image/gif" if data[:4] == b"RIFF" and data[8:12] == b"WEBP": return "image/webp" return None def build_image_content_blocks(raw: bytes, mime: str, path: str, label: str) -> list[dict[str, Any]]: """Build native image blocks plus a short text label.""" b64 = base64.b64encode(raw).decode() return [ { "type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}, "_meta": {"path": path}, }, {"type": "text", "text": label}, ] def ensure_dir(path: Path) -> Path: """Ensure directory exists, return it.""" path.mkdir(parents=True, exist_ok=True) return path def timestamp() -> str: """Current ISO timestamp.""" return datetime.now().isoformat() def current_time_str(timezone: str | None = None) -> str: """Human-readable current time with weekday and UTC offset. When *timezone* is a valid IANA name (e.g. ``"Asia/Shanghai"``), the time is converted to that zone. Otherwise falls back to the host local time. """ from zoneinfo import ZoneInfo try: tz = ZoneInfo(timezone) if timezone else None except (KeyError, Exception): tz = None now = datetime.now(tz=tz) if tz else datetime.now().astimezone() offset = now.strftime("%z") offset_fmt = f"{offset[:3]}:{offset[3:]}" if len(offset) == 5 else offset tz_name = timezone or (time.strftime("%Z") or "UTC") return f"{now.strftime('%Y-%m-%d %H:%M (%A)')} ({tz_name}, UTC{offset_fmt})" _UNSAFE_CHARS = re.compile(r'[<>:"/\\|?*]') def safe_filename(name: str) -> str: """Replace unsafe path characters with underscores.""" return _UNSAFE_CHARS.sub("_", name).strip() def split_message(content: str, max_len: int = 2000) -> list[str]: """ Split content into chunks within max_len, preferring line breaks. Args: content: The text content to split. max_len: Maximum length per chunk (default 2000 for Discord compatibility). Returns: List of message chunks, each within max_len. """ if not content: return [] if len(content) <= max_len: return [content] chunks: list[str] = [] while content: if len(content) <= max_len: chunks.append(content) break cut = content[:max_len] # Try to break at newline first, then space, then hard break pos = cut.rfind('\n') if pos <= 0: pos = cut.rfind(' ') if pos <= 0: pos = max_len chunks.append(content[:pos]) content = content[pos:].lstrip() return chunks def build_assistant_message( content: str | None, tool_calls: list[dict[str, Any]] | None = None, reasoning_content: str | None = None, thinking_blocks: list[dict] | None = None, ) -> dict[str, Any]: """Build a provider-safe assistant message with optional reasoning fields.""" msg: dict[str, Any] = {"role": "assistant", "content": content} if tool_calls: msg["tool_calls"] = tool_calls if reasoning_content is not None: msg["reasoning_content"] = reasoning_content if thinking_blocks: msg["thinking_blocks"] = thinking_blocks return msg def estimate_prompt_tokens( messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, ) -> int: """Estimate prompt tokens with tiktoken. Counts all fields that providers send to the LLM: content, tool_calls, reasoning_content, tool_call_id, name, plus per-message framing overhead. """ try: enc = tiktoken.get_encoding("cl100k_base") parts: list[str] = [] for msg in messages: content = msg.get("content") if isinstance(content, str): parts.append(content) elif isinstance(content, list): for part in content: if isinstance(part, dict) and part.get("type") == "text": txt = part.get("text", "") if txt: parts.append(txt) tc = msg.get("tool_calls") if tc: parts.append(json.dumps(tc, ensure_ascii=False)) rc = msg.get("reasoning_content") if isinstance(rc, str) and rc: parts.append(rc) for key in ("name", "tool_call_id"): value = msg.get(key) if isinstance(value, str) and value: parts.append(value) if tools: parts.append(json.dumps(tools, ensure_ascii=False)) per_message_overhead = len(messages) * 4 return len(enc.encode("\n".join(parts))) + per_message_overhead except Exception: return 0 def estimate_message_tokens(message: dict[str, Any]) -> int: """Estimate prompt tokens contributed by one persisted message.""" content = message.get("content") parts: list[str] = [] if isinstance(content, str): parts.append(content) elif isinstance(content, list): for part in content: if isinstance(part, dict) and part.get("type") == "text": text = part.get("text", "") if text: parts.append(text) else: parts.append(json.dumps(part, ensure_ascii=False)) elif content is not None: parts.append(json.dumps(content, ensure_ascii=False)) for key in ("name", "tool_call_id"): value = message.get(key) if isinstance(value, str) and value: parts.append(value) if message.get("tool_calls"): parts.append(json.dumps(message["tool_calls"], ensure_ascii=False)) rc = message.get("reasoning_content") if isinstance(rc, str) and rc: parts.append(rc) payload = "\n".join(parts) if not payload: return 4 try: enc = tiktoken.get_encoding("cl100k_base") return max(4, len(enc.encode(payload)) + 4) except Exception: return max(4, len(payload) // 4 + 4) def estimate_prompt_tokens_chain( provider: Any, model: str | None, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, ) -> tuple[int, str]: """Estimate prompt tokens via provider counter first, then tiktoken fallback.""" provider_counter = getattr(provider, "estimate_prompt_tokens", None) if callable(provider_counter): try: tokens, source = provider_counter(messages, tools, model) if isinstance(tokens, (int, float)) and tokens > 0: return int(tokens), str(source or "provider_counter") except Exception: pass estimated = estimate_prompt_tokens(messages, tools) if estimated > 0: return int(estimated), "tiktoken" return 0, "none" def build_status_content( *, version: str, model: str, start_time: float, last_usage: dict[str, int], context_window_tokens: int, session_msg_count: int, context_tokens_estimate: int, ) -> str: """Build a human-readable runtime status snapshot.""" uptime_s = int(time.time() - start_time) uptime = ( f"{uptime_s // 3600}h {(uptime_s % 3600) // 60}m" if uptime_s >= 3600 else f"{uptime_s // 60}m {uptime_s % 60}s" ) last_in = last_usage.get("prompt_tokens", 0) last_out = last_usage.get("completion_tokens", 0) ctx_total = max(context_window_tokens, 0) ctx_pct = int((context_tokens_estimate / ctx_total) * 100) if ctx_total > 0 else 0 ctx_used_str = f"{context_tokens_estimate // 1000}k" if context_tokens_estimate >= 1000 else str(context_tokens_estimate) ctx_total_str = f"{ctx_total // 1024}k" if ctx_total > 0 else "n/a" return "\n".join([ f"\U0001f408 nanobot v{version}", f"\U0001f9e0 Model: {model}", f"\U0001f4ca Tokens: {last_in} in / {last_out} out", f"\U0001f4da Context: {ctx_used_str}/{ctx_total_str} ({ctx_pct}%)", f"\U0001f4ac Session: {session_msg_count} messages", f"\u23f1 Uptime: {uptime}", ]) def sync_workspace_templates(workspace: Path, silent: bool = False) -> list[str]: """Sync bundled templates to workspace. Only creates missing files.""" from importlib.resources import files as pkg_files try: tpl = pkg_files("nanobot") / "templates" except Exception: return [] if not tpl.is_dir(): return [] added: list[str] = [] def _write(src, dest: Path): if dest.exists(): return dest.parent.mkdir(parents=True, exist_ok=True) dest.write_text(src.read_text(encoding="utf-8") if src else "", encoding="utf-8") added.append(str(dest.relative_to(workspace))) for item in tpl.iterdir(): if item.name.endswith(".md") and not item.name.startswith("."): _write(item, workspace / item.name) _write(tpl / "memory" / "MEMORY.md", workspace / "memory" / "MEMORY.md") _write(None, workspace / "memory" / "HISTORY.md") (workspace / "skills").mkdir(exist_ok=True) if added and not silent: from rich.console import Console for name in added: Console().print(f" [dim]Created {name}[/dim]") return added