563 lines
22 KiB
Python
563 lines
22 KiB
Python
import json
|
|
import os
|
|
import shutil
|
|
import zipfile
|
|
import tarfile
|
|
import re
|
|
import yaml
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.core.data_root import get_data_root, get_workspace_root
|
|
from nanobot.agent.skills import BUILTIN_SKILLS_DIR as NANOBOT_BUILTIN_SKILLS_DIR
|
|
|
|
router = APIRouter()
|
|
|
|
DATA_FILE = str(get_data_root() / "skills.json")
|
|
SKILL_HUB_DIR = str(get_workspace_root() / "skills")
|
|
BACKEND_BUILTIN_SKILLS_DIR = str(Path(__file__).resolve().parents[1] / "skills_builtin")
|
|
|
|
SOURCE_LOCAL_IMPORT = "local_import"
|
|
SOURCE_SYSTEM_BUILTIN = "system_builtin"
|
|
SOURCE_BACKEND_GENERATED = "backend_generated"
|
|
SOURCE_UPLOADED_FILE = "uploaded_file"
|
|
|
|
STATUS_SAFE = "safe"
|
|
STATUS_LOW_RISK = "low_risk"
|
|
|
|
_SOURCE_ALIASES = {
|
|
SOURCE_LOCAL_IMPORT: SOURCE_LOCAL_IMPORT,
|
|
"本地导入": SOURCE_LOCAL_IMPORT,
|
|
"Local Import": SOURCE_LOCAL_IMPORT,
|
|
SOURCE_SYSTEM_BUILTIN: SOURCE_SYSTEM_BUILTIN,
|
|
"系统内置": SOURCE_SYSTEM_BUILTIN,
|
|
"System Built-in": SOURCE_SYSTEM_BUILTIN,
|
|
SOURCE_BACKEND_GENERATED: SOURCE_BACKEND_GENERATED,
|
|
"后台生成": SOURCE_BACKEND_GENERATED,
|
|
"Backend Generated": SOURCE_BACKEND_GENERATED,
|
|
SOURCE_UPLOADED_FILE: SOURCE_UPLOADED_FILE,
|
|
"文件上传": SOURCE_UPLOADED_FILE,
|
|
"File Upload": SOURCE_UPLOADED_FILE,
|
|
}
|
|
|
|
_STATUS_ALIASES = {
|
|
STATUS_SAFE: STATUS_SAFE,
|
|
"安全": STATUS_SAFE,
|
|
"Safe": STATUS_SAFE,
|
|
STATUS_LOW_RISK: STATUS_LOW_RISK,
|
|
"低风险": STATUS_LOW_RISK,
|
|
"Low Risk": STATUS_LOW_RISK,
|
|
}
|
|
|
|
|
|
def _normalize_source(value: Optional[str]) -> str:
|
|
if not value:
|
|
return SOURCE_LOCAL_IMPORT
|
|
return _SOURCE_ALIASES.get(value, value)
|
|
|
|
|
|
def _normalize_status(value: Optional[str]) -> str:
|
|
if not value:
|
|
return STATUS_SAFE
|
|
return _STATUS_ALIASES.get(value, value)
|
|
|
|
def _ensure_skill_hub_dir() -> None:
|
|
os.makedirs(SKILL_HUB_DIR, exist_ok=True)
|
|
|
|
class Skill(BaseModel):
|
|
id: str = Field(..., description="Unique identifier for the skill")
|
|
name: str = Field(..., description="Name of the skill")
|
|
description: Optional[str] = Field(None, description="Description of what the skill does")
|
|
content: str = Field(..., description="The content/prompt/logic of the skill")
|
|
type: str = Field("python", description="Type of the skill (python, sql, api)")
|
|
project_id: Optional[int] = Field(None, description="The ID of the project this skill belongs to")
|
|
source: str = Field(SOURCE_LOCAL_IMPORT, description="Stable source key of the skill")
|
|
installation_time: str = Field(default_factory=lambda: datetime.now().strftime("%Y年%m月%d日"), description="Time when the skill was installed")
|
|
status: str = Field(STATUS_SAFE, description="Stable security status key")
|
|
file_path: Optional[str] = Field(None, description="Path to the skill folder in skill-hub")
|
|
is_builtin: bool = Field(False, description="Whether this is a system builtin skill")
|
|
|
|
class SkillCreate(BaseModel):
|
|
id: str
|
|
name: str
|
|
description: Optional[str] = None
|
|
content: str
|
|
type: str = "python"
|
|
project_id: Optional[int] = None
|
|
source: str = SOURCE_LOCAL_IMPORT
|
|
installation_time: Optional[str] = None
|
|
status: str = STATUS_SAFE
|
|
file_path: Optional[str] = None
|
|
|
|
class SkillUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
content: Optional[str] = None
|
|
type: Optional[str] = None
|
|
project_id: Optional[int] = None
|
|
source: Optional[str] = None
|
|
installation_time: Optional[str] = None
|
|
status: Optional[str] = None
|
|
file_path: Optional[str] = None
|
|
|
|
def _parse_skill_md(file_path: str) -> Dict[str, Any]:
|
|
"""Parse SKILL.md for metadata and content according to agentskills.io standard."""
|
|
if not os.path.exists(file_path):
|
|
return {}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
except Exception as e:
|
|
print(f"Error reading {file_path}: {e}")
|
|
return {}
|
|
|
|
# Split YAML frontmatter and Markdown body
|
|
# Support both --- and +++ for frontmatter
|
|
metadata = {}
|
|
body = content
|
|
|
|
if content.startswith('---'):
|
|
parts = content.split('---', 2)
|
|
if len(parts) >= 3:
|
|
try:
|
|
metadata = yaml.safe_load(parts[1]) or {}
|
|
body = parts[2].strip()
|
|
except Exception as e:
|
|
print(f"Error parsing YAML frontmatter: {e}")
|
|
|
|
# Extract name and description, fallback to some defaults
|
|
name = metadata.get("name")
|
|
description = metadata.get("description")
|
|
|
|
# If name not in metadata, try to find the first H1 in markdown body
|
|
if not name:
|
|
for line in body.split('\n'):
|
|
if line.startswith('# '):
|
|
name = line[2:].strip()
|
|
break
|
|
|
|
return {
|
|
"name": name,
|
|
"description": description,
|
|
"content": body,
|
|
"metadata": metadata
|
|
}
|
|
|
|
def _load_data() -> List[Dict[str, Any]]:
|
|
if not os.path.exists(DATA_FILE):
|
|
return []
|
|
try:
|
|
with open(DATA_FILE, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
return []
|
|
|
|
def _save_data(data: List[Dict[str, Any]]):
|
|
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
|
|
with open(DATA_FILE, "w") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
def _dedupe_skills(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
deduped: Dict[str, Dict[str, Any]] = {}
|
|
for item in data:
|
|
skill_id = str(item.get("id") or "").strip()
|
|
project_id = item.get("project_id")
|
|
if not skill_id:
|
|
continue
|
|
|
|
# Use a composite key of (id, project_id) for deduplication
|
|
# so that different projects can theoretically have the same skill_id
|
|
dedupe_key = f"{skill_id}_{project_id}"
|
|
|
|
existing = deduped.get(dedupe_key)
|
|
if existing is None:
|
|
deduped[dedupe_key] = item
|
|
continue
|
|
|
|
# If they somehow have the exact same dedupe_key, we just keep the later one
|
|
deduped[dedupe_key] = item
|
|
|
|
return list(deduped.values())
|
|
|
|
def _safe_skill_dir_name(value: str) -> str:
|
|
safe = re.sub(r'[^a-zA-Z0-9_\-]', '_', value or "").lower()
|
|
return safe or "skill"
|
|
|
|
def _write_skill_markdown(skill_dir: str, skill_name: str, description: Optional[str], content: str) -> str:
|
|
os.makedirs(skill_dir, exist_ok=True)
|
|
skill_md_path = os.path.join(skill_dir, "SKILL.md")
|
|
final_description = description or "No description provided"
|
|
body = content or ""
|
|
markdown = (
|
|
f"---\n"
|
|
f"name: {skill_name}\n"
|
|
f"description: {final_description}\n"
|
|
f"---\n\n"
|
|
f"{body}\n"
|
|
)
|
|
with open(skill_md_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown)
|
|
return skill_md_path
|
|
|
|
def _scan_builtin_skills(data: List[Dict[str, Any]], registered_paths: set, source_dir: str, source_name: str):
|
|
if not os.path.exists(source_dir):
|
|
return
|
|
for item in os.listdir(source_dir):
|
|
skill_dir = os.path.abspath(os.path.join(source_dir, item))
|
|
if os.path.isdir(skill_dir):
|
|
skill_md_path = os.path.join(skill_dir, "SKILL.md")
|
|
if os.path.exists(skill_md_path):
|
|
metadata_res = _parse_skill_md(skill_md_path)
|
|
skill_name = metadata_res.get("name") or item
|
|
|
|
existing = None
|
|
for d in data:
|
|
if (d.get("id") == item and d.get("is_builtin")) or d.get("file_path") == skill_dir:
|
|
existing = d
|
|
break
|
|
|
|
if existing:
|
|
existing["name"] = skill_name
|
|
existing["description"] = metadata_res.get("description") or "No description provided"
|
|
existing["content"] = metadata_res.get("content") or ""
|
|
existing["file_path"] = skill_dir
|
|
existing["is_builtin"] = True
|
|
existing["source"] = source_name
|
|
existing["status"] = STATUS_SAFE
|
|
registered_paths.add(skill_dir)
|
|
else:
|
|
new_skill = {
|
|
"id": item,
|
|
"name": skill_name,
|
|
"description": metadata_res.get("description") or "No description provided",
|
|
"content": metadata_res.get("content") or "",
|
|
"type": "agentskill",
|
|
"project_id": None,
|
|
"source": source_name,
|
|
"installation_time": datetime.now().strftime("%Y年%m月%d日"),
|
|
"status": STATUS_SAFE,
|
|
"file_path": skill_dir,
|
|
"is_builtin": True
|
|
}
|
|
data.append(new_skill)
|
|
registered_paths.add(skill_dir)
|
|
|
|
def load_skills(project_id: Optional[int] = None) -> List[Dict[str, Any]]:
|
|
_ensure_skill_hub_dir()
|
|
data = _load_data()
|
|
|
|
registered_paths = set()
|
|
|
|
# Sync registered skills with their SKILL.md if available
|
|
for item in data:
|
|
item["source"] = _normalize_source(item.get("source"))
|
|
item["status"] = _normalize_status(item.get("status"))
|
|
if item.get("id") in ("nl2sql", "visualization") or item.get("is_builtin"):
|
|
item["is_builtin"] = True
|
|
else:
|
|
item.setdefault("is_builtin", False)
|
|
|
|
if item.get("file_path"):
|
|
abs_path = os.path.abspath(item["file_path"])
|
|
registered_paths.add(abs_path)
|
|
skill_md_path = os.path.join(abs_path, "SKILL.md")
|
|
if os.path.exists(skill_md_path):
|
|
metadata_res = _parse_skill_md(skill_md_path)
|
|
if metadata_res.get("name"):
|
|
item["name"] = metadata_res["name"]
|
|
if metadata_res.get("description"):
|
|
item["description"] = metadata_res["description"]
|
|
if metadata_res.get("content"):
|
|
item["content"] = metadata_res["content"]
|
|
|
|
# Scan builtin skills
|
|
_scan_builtin_skills(data, registered_paths, NANOBOT_BUILTIN_SKILLS_DIR, SOURCE_SYSTEM_BUILTIN)
|
|
_scan_builtin_skills(data, registered_paths, BACKEND_BUILTIN_SKILLS_DIR, SOURCE_SYSTEM_BUILTIN)
|
|
|
|
# Scan for unregistered skills in SKILL_HUB_DIR (1-level deep to match nanobot's behavior)
|
|
if os.path.exists(SKILL_HUB_DIR):
|
|
for item in os.listdir(SKILL_HUB_DIR):
|
|
skill_dir = os.path.abspath(os.path.join(SKILL_HUB_DIR, item))
|
|
if os.path.isdir(skill_dir):
|
|
skill_md_path = os.path.join(skill_dir, "SKILL.md")
|
|
if os.path.exists(skill_md_path) and skill_dir not in registered_paths:
|
|
metadata_res = _parse_skill_md(skill_md_path)
|
|
skill_name = metadata_res.get("name") or item
|
|
|
|
# Try to deduce project_id from directory prefix (e.g., p123_skillname)
|
|
deduced_project_id = None
|
|
match = re.match(r'^p(\d+)_', item)
|
|
if match:
|
|
deduced_project_id = int(match.group(1))
|
|
|
|
new_skill = {
|
|
"id": item,
|
|
"name": skill_name,
|
|
"description": metadata_res.get("description") or "No description provided",
|
|
"content": metadata_res.get("content") or "",
|
|
"type": "agentskill",
|
|
"project_id": deduced_project_id,
|
|
"source": SOURCE_BACKEND_GENERATED,
|
|
"installation_time": datetime.now().strftime("%Y年%m月%d日"),
|
|
"status": STATUS_SAFE,
|
|
"file_path": skill_dir,
|
|
"is_builtin": item in ("nl2sql", "visualization")
|
|
}
|
|
data.append(new_skill)
|
|
registered_paths.add(skill_dir)
|
|
|
|
deduped = _dedupe_skills(data)
|
|
if project_id is not None:
|
|
return [item for item in deduped if item.get("project_id") == project_id or item.get("project_id") is None]
|
|
return deduped
|
|
|
|
@router.get("/skills", response_model=List[Skill])
|
|
def list_skills(project_id: Optional[int] = None):
|
|
data = load_skills(project_id)
|
|
return [Skill(**item) for item in data]
|
|
|
|
@router.get("/skills/{skill_id}", response_model=Skill)
|
|
def get_skill(skill_id: str, project_id: Optional[int] = None):
|
|
data = load_skills()
|
|
for item in data:
|
|
if item["id"] == skill_id:
|
|
if project_id is not None and item.get("project_id") != project_id:
|
|
continue
|
|
return Skill(**item)
|
|
raise HTTPException(status_code=404, detail="Skill not found")
|
|
|
|
@router.post("/skills/upload")
|
|
async def upload_skill(
|
|
file: UploadFile = File(...),
|
|
project_id: Optional[int] = Form(None)
|
|
):
|
|
"""Upload a skill file (SKILL.md) or a packaged skill (zip/tar.gz)."""
|
|
filename = file.filename
|
|
print(f"Uploading skill: {filename}, project_id: {project_id}")
|
|
_ensure_skill_hub_dir()
|
|
|
|
# Create a unique temp directory
|
|
temp_dir_name = f"temp_{datetime.now().timestamp()}_{os.urandom(4).hex()}"
|
|
temp_dir = os.path.join(SKILL_HUB_DIR, temp_dir_name)
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
try:
|
|
file_path = os.path.join(temp_dir, filename)
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
skill_source_dir = None
|
|
|
|
# Handle different file types
|
|
if filename.endswith(".zip"):
|
|
try:
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
zip_ref.extractall(temp_dir)
|
|
os.remove(file_path)
|
|
# Find the directory containing SKILL.md
|
|
for root, dirs, files in os.walk(temp_dir):
|
|
if "SKILL.md" in files:
|
|
skill_source_dir = root
|
|
break
|
|
except Exception as e:
|
|
print(f"Zip extraction failed: {e}")
|
|
raise HTTPException(status_code=400, detail=f"Failed to extract zip: {str(e)}")
|
|
|
|
elif filename.endswith((".tar.gz", ".tgz")):
|
|
try:
|
|
with tarfile.open(file_path, 'r:gz') as tar_ref:
|
|
tar_ref.extractall(temp_dir)
|
|
os.remove(file_path)
|
|
for root, dirs, files in os.walk(temp_dir):
|
|
if "SKILL.md" in files:
|
|
skill_source_dir = root
|
|
break
|
|
except Exception as e:
|
|
print(f"Tarball extraction failed: {e}")
|
|
raise HTTPException(status_code=400, detail=f"Failed to extract tarball: {str(e)}")
|
|
|
|
elif filename == "SKILL.md":
|
|
skill_source_dir = temp_dir
|
|
else:
|
|
print(f"Unsupported file type: {filename}")
|
|
raise HTTPException(status_code=400, detail="Only SKILL.md or packaged skills (zip/tar.gz) are supported")
|
|
|
|
if not skill_source_dir or not os.path.exists(os.path.join(skill_source_dir, "SKILL.md")):
|
|
print(f"SKILL.md not found in {filename}")
|
|
raise HTTPException(status_code=400, detail="SKILL.md not found in the uploaded file")
|
|
|
|
# Parse metadata
|
|
skill_md_path = os.path.join(skill_source_dir, "SKILL.md")
|
|
metadata_res = _parse_skill_md(skill_md_path)
|
|
|
|
# Use metadata name, or fallback to folder name or filename
|
|
skill_name = metadata_res.get("name")
|
|
if not skill_name:
|
|
if filename == "SKILL.md":
|
|
skill_name = "unnamed_skill"
|
|
else:
|
|
# Use filename without extension
|
|
skill_name = os.path.splitext(filename)[0]
|
|
|
|
# Create a safe directory name for the skill
|
|
safe_name = _safe_skill_dir_name(skill_name)
|
|
final_skill_id = f"{safe_name}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
|
|
|
if project_id is not None:
|
|
# Prefix the folder name with p{project_id}_ to distinguish projects in storage
|
|
# without breaking nanobot's 1-level-deep skill loader
|
|
final_skill_dir = os.path.join(SKILL_HUB_DIR, f"p{project_id}_{final_skill_id}")
|
|
final_skill_id = f"p{project_id}_{final_skill_id}"
|
|
else:
|
|
final_skill_dir = os.path.join(SKILL_HUB_DIR, final_skill_id)
|
|
|
|
print(f"Finalizing skill: {skill_name} -> {final_skill_dir}")
|
|
|
|
# Move the skill content to final destination
|
|
os.makedirs(final_skill_dir, exist_ok=True)
|
|
for item in os.listdir(skill_source_dir):
|
|
s = os.path.join(skill_source_dir, item)
|
|
d = os.path.join(final_skill_dir, item)
|
|
if os.path.isdir(s):
|
|
shutil.copytree(s, d, dirs_exist_ok=True)
|
|
else:
|
|
shutil.copy2(s, d)
|
|
|
|
# Register in skills.json
|
|
data = load_skills()
|
|
new_skill = {
|
|
"id": final_skill_id,
|
|
"name": skill_name,
|
|
"description": metadata_res.get("description") or "No description provided",
|
|
"content": metadata_res.get("content") or "",
|
|
"type": "agentskill",
|
|
"project_id": project_id,
|
|
"source": SOURCE_UPLOADED_FILE,
|
|
"installation_time": datetime.now().strftime("%Y年%m月%d日"),
|
|
"status": STATUS_SAFE,
|
|
"file_path": final_skill_dir
|
|
}
|
|
|
|
data.append(new_skill)
|
|
_save_data(data)
|
|
print(f"Skill registered successfully: {final_skill_id}")
|
|
|
|
return new_skill
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
|
|
finally:
|
|
# Cleanup temp directory
|
|
if os.path.exists(temp_dir):
|
|
shutil.rmtree(temp_dir)
|
|
|
|
@router.post("/skills", response_model=Skill)
|
|
def create_skill(skill: SkillCreate):
|
|
_ensure_skill_hub_dir()
|
|
data = load_skills()
|
|
if any(item["id"] == skill.id and item.get("project_id") == skill.project_id for item in data):
|
|
raise HTTPException(status_code=400, detail="Skill with this ID already exists in this project")
|
|
|
|
new_skill_dict = skill.dict()
|
|
new_skill_dict["source"] = _normalize_source(new_skill_dict.get("source"))
|
|
new_skill_dict["status"] = _normalize_status(new_skill_dict.get("status"))
|
|
if not new_skill_dict.get("installation_time"):
|
|
new_skill_dict["installation_time"] = datetime.now().strftime("%Y年%m月%d日")
|
|
if not new_skill_dict.get("file_path"):
|
|
project_id = new_skill_dict.get("project_id")
|
|
base_dir_name = _safe_skill_dir_name(new_skill_dict["id"])
|
|
if project_id is not None:
|
|
# Add prefix for project storage distinction
|
|
if not base_dir_name.startswith(f"p{project_id}_"):
|
|
base_dir_name = f"p{project_id}_{base_dir_name}"
|
|
skill_dir = os.path.join(SKILL_HUB_DIR, base_dir_name)
|
|
else:
|
|
skill_dir = os.path.join(SKILL_HUB_DIR, base_dir_name)
|
|
|
|
_write_skill_markdown(
|
|
skill_dir=skill_dir,
|
|
skill_name=new_skill_dict["name"],
|
|
description=new_skill_dict.get("description"),
|
|
content=new_skill_dict.get("content", ""),
|
|
)
|
|
new_skill_dict["file_path"] = skill_dir
|
|
new_skill_dict["id"] = base_dir_name
|
|
|
|
data.append(new_skill_dict)
|
|
_save_data(data)
|
|
return Skill(**new_skill_dict)
|
|
|
|
@router.put("/skills/{skill_id}", response_model=Skill)
|
|
def update_skill(skill_id: str, skill: SkillUpdate, project_id: Optional[int] = None):
|
|
data = load_skills()
|
|
for i, item in enumerate(data):
|
|
if item["id"] == skill_id:
|
|
if project_id is not None and item.get("project_id") != project_id:
|
|
continue
|
|
updated_item = item.copy()
|
|
update_data = skill.dict(exclude_unset=True)
|
|
if "source" in update_data:
|
|
update_data["source"] = _normalize_source(update_data.get("source"))
|
|
if "status" in update_data:
|
|
update_data["status"] = _normalize_status(update_data.get("status"))
|
|
updated_item.update(update_data)
|
|
if updated_item.get("file_path"):
|
|
_write_skill_markdown(
|
|
skill_dir=updated_item["file_path"],
|
|
skill_name=updated_item.get("name") or item.get("name") or "skill",
|
|
description=updated_item.get("description"),
|
|
content=updated_item.get("content", ""),
|
|
)
|
|
data[i] = updated_item
|
|
_save_data(data)
|
|
return Skill(**updated_item)
|
|
raise HTTPException(status_code=404, detail="Skill not found")
|
|
|
|
@router.delete("/skills/{skill_id}")
|
|
def delete_skill(skill_id: str, project_id: Optional[int] = None):
|
|
data = load_skills()
|
|
initial_len = len(data)
|
|
|
|
# If project_id is provided, we only delete if it matches
|
|
new_data = []
|
|
found = False
|
|
skill_to_delete = None
|
|
|
|
for item in data:
|
|
if item["id"] == skill_id:
|
|
if item.get("is_builtin"):
|
|
raise HTTPException(status_code=400, detail="Builtin skills cannot be deleted")
|
|
if project_id is not None and item.get("project_id") not in (project_id, None):
|
|
new_data.append(item)
|
|
continue
|
|
found = True
|
|
skill_to_delete = item
|
|
else:
|
|
new_data.append(item)
|
|
|
|
if not found:
|
|
raise HTTPException(status_code=404, detail="Skill not found")
|
|
|
|
# Clean up file_path if it exists
|
|
if skill_to_delete and skill_to_delete.get("file_path"):
|
|
file_path = skill_to_delete["file_path"]
|
|
if os.path.exists(file_path):
|
|
try:
|
|
if os.path.isdir(file_path):
|
|
shutil.rmtree(file_path)
|
|
else:
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
print(f"Error deleting skill files at {file_path}: {e}")
|
|
|
|
_save_data(new_data)
|
|
return {"message": "Skill deleted successfully"}
|