Files
DataClaw/backend/app/api/skills.py
T
2026-03-16 17:26:02 +08:00

324 lines
12 KiB
Python

import json
import os
import shutil
import zipfile
import tarfile
import yaml
from typing import List, Optional, Dict, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
from pydantic import BaseModel, Field
router = APIRouter()
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
DATA_FILE = os.path.join(BASE_DIR, "data", "skills.json")
SKILL_HUB_DIR = os.path.join(BASE_DIR, "data", "skill-hub")
# Ensure skill-hub directory exists
os.makedirs(SKILL_HUB_DIR, exist_ok=True)
class Skill(BaseModel):
id: str = Field(..., description="Unique identifier for the skill")
name: str = Field(..., description="Name of the skill")
description: Optional[str] = Field(None, description="Description of what the skill does")
content: str = Field(..., description="The content/prompt/logic of the skill")
type: str = Field("python", description="Type of the skill (python, sql, api)")
project_id: Optional[int] = Field(None, description="The ID of the project this skill belongs to")
source: str = Field("本地导入", description="Source of the skill (e.g., 本地导入, GitHub 导入)")
installation_time: str = Field(default_factory=lambda: datetime.now().strftime("%Y年%m月%d"), description="Time when the skill was installed")
status: str = Field("安全", description="Security status of the skill (e.g., 安全, 低风险)")
file_path: Optional[str] = Field(None, description="Path to the skill folder in skill-hub")
class SkillCreate(BaseModel):
id: str
name: str
description: Optional[str] = None
content: str
type: str = "python"
project_id: Optional[int] = None
source: str = "本地导入"
installation_time: Optional[str] = None
status: str = "安全"
file_path: Optional[str] = None
class SkillUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
content: Optional[str] = None
type: Optional[str] = None
project_id: Optional[int] = None
source: Optional[str] = None
installation_time: Optional[str] = None
status: Optional[str] = None
file_path: Optional[str] = None
def _parse_skill_md(file_path: str) -> Dict[str, Any]:
"""Parse SKILL.md for metadata and content according to agentskills.io standard."""
if not os.path.exists(file_path):
return {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
print(f"Error reading {file_path}: {e}")
return {}
# Split YAML frontmatter and Markdown body
# Support both --- and +++ for frontmatter
metadata = {}
body = content
if content.startswith('---'):
parts = content.split('---', 2)
if len(parts) >= 3:
try:
metadata = yaml.safe_load(parts[1]) or {}
body = parts[2].strip()
except Exception as e:
print(f"Error parsing YAML frontmatter: {e}")
# Extract name and description, fallback to some defaults
name = metadata.get("name")
description = metadata.get("description")
# If name not in metadata, try to find the first H1 in markdown body
if not name:
for line in body.split('\n'):
if line.startswith('# '):
name = line[2:].strip()
break
return {
"name": name,
"description": description,
"content": body,
"metadata": metadata
}
def _load_data() -> List[Dict[str, Any]]:
if not os.path.exists(DATA_FILE):
return []
try:
with open(DATA_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return []
def _save_data(data: List[Dict[str, Any]]):
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
with open(DATA_FILE, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def load_skills(project_id: Optional[int] = None) -> List[Dict[str, Any]]:
data = _load_data()
if project_id is not None:
return [item for item in data if item.get("project_id") == project_id]
return data
@router.get("/skills", response_model=List[Skill])
def list_skills(project_id: Optional[int] = None):
data = load_skills(project_id)
return [Skill(**item) for item in data]
@router.get("/skills/{skill_id}", response_model=Skill)
def get_skill(skill_id: str, project_id: Optional[int] = None):
data = _load_data()
for item in data:
if item["id"] == skill_id:
if project_id is not None and item.get("project_id") != project_id:
continue
return Skill(**item)
raise HTTPException(status_code=404, detail="Skill not found")
@router.post("/skills/upload")
async def upload_skill(
file: UploadFile = File(...),
project_id: Optional[int] = Form(None)
):
"""Upload a skill file (SKILL.md) or a packaged skill (zip/tar.gz)."""
filename = file.filename
print(f"Uploading skill: {filename}, project_id: {project_id}")
# Create a unique temp directory
temp_dir_name = f"temp_{datetime.now().timestamp()}_{os.urandom(4).hex()}"
temp_dir = os.path.join(SKILL_HUB_DIR, temp_dir_name)
os.makedirs(temp_dir, exist_ok=True)
try:
file_path = os.path.join(temp_dir, filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
skill_source_dir = None
# Handle different file types
if filename.endswith(".zip"):
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
os.remove(file_path)
# Find the directory containing SKILL.md
for root, dirs, files in os.walk(temp_dir):
if "SKILL.md" in files:
skill_source_dir = root
break
except Exception as e:
print(f"Zip extraction failed: {e}")
raise HTTPException(status_code=400, detail=f"Failed to extract zip: {str(e)}")
elif filename.endswith((".tar.gz", ".tgz")):
try:
with tarfile.open(file_path, 'r:gz') as tar_ref:
tar_ref.extractall(temp_dir)
os.remove(file_path)
for root, dirs, files in os.walk(temp_dir):
if "SKILL.md" in files:
skill_source_dir = root
break
except Exception as e:
print(f"Tarball extraction failed: {e}")
raise HTTPException(status_code=400, detail=f"Failed to extract tarball: {str(e)}")
elif filename == "SKILL.md":
skill_source_dir = temp_dir
else:
print(f"Unsupported file type: {filename}")
raise HTTPException(status_code=400, detail="Only SKILL.md or packaged skills (zip/tar.gz) are supported")
if not skill_source_dir or not os.path.exists(os.path.join(skill_source_dir, "SKILL.md")):
print(f"SKILL.md not found in {filename}")
raise HTTPException(status_code=400, detail="SKILL.md not found in the uploaded file")
# Parse metadata
skill_md_path = os.path.join(skill_source_dir, "SKILL.md")
metadata_res = _parse_skill_md(skill_md_path)
# Use metadata name, or fallback to folder name or filename
skill_name = metadata_res.get("name")
if not skill_name:
if filename == "SKILL.md":
skill_name = "unnamed_skill"
else:
# Use filename without extension
skill_name = os.path.splitext(filename)[0]
# Create a safe directory name for the skill
import re
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '_', skill_name).lower()
if not safe_name: safe_name = "skill"
final_skill_id = f"{safe_name}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
final_skill_dir = os.path.join(SKILL_HUB_DIR, final_skill_id)
print(f"Finalizing skill: {skill_name} -> {final_skill_dir}")
# Move the skill content to final destination
os.makedirs(final_skill_dir, exist_ok=True)
for item in os.listdir(skill_source_dir):
s = os.path.join(skill_source_dir, item)
d = os.path.join(final_skill_dir, item)
if os.path.isdir(s):
shutil.copytree(s, d, dirs_exist_ok=True)
else:
shutil.copy2(s, d)
# Register in skills.json
data = _load_data()
new_skill = {
"id": final_skill_id,
"name": skill_name,
"description": metadata_res.get("description") or "No description provided",
"content": metadata_res.get("content") or "",
"type": "agentskill",
"project_id": project_id,
"source": "文件上传",
"installation_time": datetime.now().strftime("%Y年%m月%d"),
"status": "安全",
"file_path": final_skill_dir
}
data.append(new_skill)
_save_data(data)
print(f"Skill registered successfully: {final_skill_id}")
return new_skill
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
finally:
# Cleanup temp directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
@router.post("/skills", response_model=Skill)
def create_skill(skill: SkillCreate):
data = _load_data()
if any(item["id"] == skill.id for item in data):
raise HTTPException(status_code=400, detail="Skill with this ID already exists")
new_skill_dict = skill.dict()
if not new_skill_dict.get("installation_time"):
new_skill_dict["installation_time"] = datetime.now().strftime("%Y年%m月%d")
data.append(new_skill_dict)
_save_data(data)
return Skill(**new_skill_dict)
@router.put("/skills/{skill_id}", response_model=Skill)
def update_skill(skill_id: str, skill: SkillUpdate, project_id: Optional[int] = None):
data = _load_data()
for i, item in enumerate(data):
if item["id"] == skill_id:
if project_id is not None and item.get("project_id") != project_id:
continue
updated_item = item.copy()
update_data = skill.dict(exclude_unset=True)
updated_item.update(update_data)
data[i] = updated_item
_save_data(data)
return Skill(**updated_item)
raise HTTPException(status_code=404, detail="Skill not found")
@router.delete("/skills/{skill_id}")
def delete_skill(skill_id: str, project_id: Optional[int] = None):
data = _load_data()
initial_len = len(data)
# If project_id is provided, we only delete if it matches
new_data = []
found = False
skill_to_delete = None
for item in data:
if item["id"] == skill_id:
if project_id is not None and item.get("project_id") != project_id:
new_data.append(item)
continue
found = True
skill_to_delete = item
else:
new_data.append(item)
if not found:
raise HTTPException(status_code=404, detail="Skill not found")
# Clean up file_path if it exists
if skill_to_delete and skill_to_delete.get("file_path"):
file_path = skill_to_delete["file_path"]
if os.path.exists(file_path):
try:
if os.path.isdir(file_path):
shutil.rmtree(file_path)
else:
os.remove(file_path)
except Exception as e:
print(f"Error deleting skill files at {file_path}: {e}")
_save_data(new_data)
return {"message": "Skill deleted successfully"}