Files

178 lines
5.7 KiB
Python
Raw Permalink Normal View History

2026-03-14 15:44:48 +08:00
"""Base channel interface for chat platforms."""
2026-03-28 01:01:13 +08:00
from __future__ import annotations
2026-03-14 15:44:48 +08:00
from abc import ABC, abstractmethod
2026-03-28 01:01:13 +08:00
from pathlib import Path
2026-03-14 15:44:48 +08:00
from typing import Any
from loguru import logger
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
class BaseChannel(ABC):
"""
Abstract base class for chat channel implementations.
Each channel (Telegram, Discord, etc.) should implement this interface
to integrate with the nanobot message bus.
"""
name: str = "base"
2026-03-28 01:01:13 +08:00
display_name: str = "Base"
transcription_api_key: str = ""
2026-03-14 15:44:48 +08:00
def __init__(self, config: Any, bus: MessageBus):
"""
Initialize the channel.
Args:
config: Channel-specific configuration.
bus: The message bus for communication.
"""
self.config = config
self.bus = bus
self._running = False
2026-03-28 01:01:13 +08:00
async def transcribe_audio(self, file_path: str | Path) -> str:
"""Transcribe an audio file via Groq Whisper. Returns empty string on failure."""
if not self.transcription_api_key:
return ""
try:
from nanobot.providers.transcription import GroqTranscriptionProvider
provider = GroqTranscriptionProvider(api_key=self.transcription_api_key)
return await provider.transcribe(file_path)
except Exception as e:
logger.warning("{}: audio transcription failed: {}", self.name, e)
return ""
async def login(self, force: bool = False) -> bool:
"""
Perform channel-specific interactive login (e.g. QR code scan).
Args:
force: If True, ignore existing credentials and force re-authentication.
Returns True if already authenticated or login succeeds.
Override in subclasses that support interactive login.
"""
return True
2026-03-14 15:44:48 +08:00
@abstractmethod
async def start(self) -> None:
"""
Start the channel and begin listening for messages.
This should be a long-running async task that:
1. Connects to the chat platform
2. Listens for incoming messages
3. Forwards messages to the bus via _handle_message()
"""
pass
@abstractmethod
async def stop(self) -> None:
"""Stop the channel and clean up resources."""
pass
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""
Send a message through this channel.
Args:
msg: The message to send.
2026-03-28 01:01:13 +08:00
Implementations should raise on delivery failure so the channel manager
can apply any retry policy in one place.
2026-03-14 15:44:48 +08:00
"""
pass
2026-03-28 01:01:13 +08:00
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Deliver a streaming text chunk.
Override in subclasses to enable streaming. Implementations should
raise on delivery failure so the channel manager can retry.
Streaming contract: ``_stream_delta`` is a chunk, ``_stream_end`` ends
the current segment, and stateful implementations must key buffers by
``_stream_id`` rather than only by ``chat_id``.
"""
pass
@property
def supports_streaming(self) -> bool:
"""True when config enables streaming AND this subclass implements send_delta."""
cfg = self.config
streaming = cfg.get("streaming", False) if isinstance(cfg, dict) else getattr(cfg, "streaming", False)
return bool(streaming) and type(self).send_delta is not BaseChannel.send_delta
2026-03-14 15:44:48 +08:00
def is_allowed(self, sender_id: str) -> bool:
"""Check if *sender_id* is permitted. Empty list → deny all; ``"*"`` → allow all."""
allow_list = getattr(self.config, "allow_from", [])
if not allow_list:
logger.warning("{}: allow_from is empty — all access denied", self.name)
return False
if "*" in allow_list:
return True
return str(sender_id) in allow_list
async def _handle_message(
self,
sender_id: str,
chat_id: str,
content: str,
media: list[str] | None = None,
metadata: dict[str, Any] | None = None,
session_key: str | None = None,
) -> None:
"""
Handle an incoming message from the chat platform.
This method checks permissions and forwards to the bus.
Args:
sender_id: The sender's identifier.
chat_id: The chat/channel identifier.
content: Message text content.
media: Optional list of media URLs.
metadata: Optional channel-specific metadata.
session_key: Optional session key override (e.g. thread-scoped sessions).
"""
if not self.is_allowed(sender_id):
logger.warning(
"Access denied for sender {} on channel {}. "
"Add them to allowFrom list in config to grant access.",
sender_id, self.name,
)
return
2026-03-28 01:01:13 +08:00
meta = metadata or {}
if self.supports_streaming:
meta = {**meta, "_wants_stream": True}
2026-03-14 15:44:48 +08:00
msg = InboundMessage(
channel=self.name,
sender_id=str(sender_id),
chat_id=str(chat_id),
content=content,
media=media or [],
2026-03-28 01:01:13 +08:00
metadata=meta,
2026-03-14 15:44:48 +08:00
session_key_override=session_key,
)
await self.bus.publish_inbound(msg)
2026-03-28 01:01:13 +08:00
@classmethod
def default_config(cls) -> dict[str, Any]:
"""Return default config for onboard. Override in plugins to auto-populate config.json."""
return {"enabled": False}
2026-03-14 15:44:48 +08:00
@property
def is_running(self) -> bool:
"""Check if the channel is running."""
return self._running