2026-03-15 19:36:02 +08:00
|
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
import json
|
|
|
|
|
import functools
|
|
|
|
|
from app.connectors.postgres import PostgresConnector
|
|
|
|
|
from app.connectors.clickhouse import ClickHouseConnector
|
|
|
|
|
from app.connectors.parquet import ParquetConnector
|
2026-03-16 22:18:23 +08:00
|
|
|
from app.connectors.csv import CSVConnector
|
2026-05-13 16:43:53 +08:00
|
|
|
from app.connectors.duckdb import DuckDBConnector
|
2026-03-15 19:36:02 +08:00
|
|
|
from app.models.datasource import DataSource
|
2026-03-15 20:48:40 +08:00
|
|
|
from app.core.files import resolve_upload_file_path
|
2026-03-15 19:36:02 +08:00
|
|
|
|
|
|
|
|
@functools.lru_cache(maxsize=32)
|
|
|
|
|
def _get_cached_connector(ds_type: str, config_json: str):
|
|
|
|
|
config = json.loads(config_json)
|
|
|
|
|
|
|
|
|
|
if ds_type in ["postgres", "postgresql", "supabase"]:
|
2026-03-21 20:28:02 +08:00
|
|
|
db_url = config.get("connection_string")
|
|
|
|
|
if not db_url:
|
|
|
|
|
default_port = 6543 if ds_type == "supabase" else 5432
|
|
|
|
|
port = config.get("port") or default_port
|
|
|
|
|
db_url = f"postgresql://{config.get('user')}:{config.get('password')}@{config.get('host')}:{port}/{config.get('database')}"
|
|
|
|
|
|
|
|
|
|
if ds_type == "supabase" and "?" not in db_url:
|
|
|
|
|
db_url += "?sslmode=require"
|
|
|
|
|
elif ds_type == "supabase" and "sslmode=" not in db_url:
|
|
|
|
|
db_url += "&sslmode=require"
|
|
|
|
|
|
2026-03-15 19:36:02 +08:00
|
|
|
return PostgresConnector(db_url=db_url)
|
|
|
|
|
|
2026-05-13 16:43:53 +08:00
|
|
|
elif ds_type == "mysql":
|
|
|
|
|
db_url = config.get("connection_string")
|
|
|
|
|
if not db_url:
|
|
|
|
|
port = config.get("port") or 3306
|
|
|
|
|
db_url = f"mysql+pymysql://{config.get('user')}:{config.get('password')}@{config.get('host')}:{port}/{config.get('database')}"
|
|
|
|
|
elif not db_url.startswith("mysql+pymysql://"):
|
|
|
|
|
db_url = db_url.replace("mysql://", "mysql+pymysql://")
|
|
|
|
|
return PostgresConnector(db_url=db_url)
|
|
|
|
|
|
2026-03-15 19:36:02 +08:00
|
|
|
elif ds_type == "sqlite":
|
|
|
|
|
# SQLite uses connection string usually file path
|
|
|
|
|
db_url = config.get("connection_string")
|
|
|
|
|
if not db_url and config.get("file_path"):
|
2026-03-15 20:48:40 +08:00
|
|
|
file_path = str(resolve_upload_file_path(config.get("file_path")))
|
|
|
|
|
db_url = f"sqlite:///{file_path}"
|
2026-03-15 19:36:02 +08:00
|
|
|
return PostgresConnector(db_url=db_url)
|
|
|
|
|
|
|
|
|
|
elif ds_type == "clickhouse":
|
|
|
|
|
return ClickHouseConnector(
|
|
|
|
|
host=config.get("host"),
|
|
|
|
|
port=config.get("port", 9000),
|
|
|
|
|
user=config.get("user", "default"),
|
|
|
|
|
password=config.get("password", ""),
|
|
|
|
|
database=config.get("database", "default")
|
|
|
|
|
)
|
|
|
|
|
|
2026-05-13 16:43:53 +08:00
|
|
|
elif ds_type == "duckdb":
|
|
|
|
|
db_path = config.get("database") or config.get("file_path") or ":memory:"
|
|
|
|
|
if db_path != ":memory:":
|
|
|
|
|
db_path = str(resolve_upload_file_path(db_path))
|
|
|
|
|
return DuckDBConnector(db_path=db_path)
|
|
|
|
|
|
2026-03-15 19:36:02 +08:00
|
|
|
elif ds_type == "parquet":
|
2026-03-15 20:48:40 +08:00
|
|
|
file_path = str(resolve_upload_file_path(config.get("file_path")))
|
|
|
|
|
return ParquetConnector(file_path=file_path)
|
2026-03-16 22:18:23 +08:00
|
|
|
|
|
|
|
|
elif ds_type == "csv":
|
|
|
|
|
file_path = str(resolve_upload_file_path(config.get("file_path")))
|
|
|
|
|
return CSVConnector(file_path=file_path)
|
2026-03-15 19:36:02 +08:00
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f"Unsupported data source type: {ds_type}")
|
|
|
|
|
|
|
|
|
|
def get_connector(datasource: DataSource):
|
|
|
|
|
# Use JSON string of config as cache key
|
|
|
|
|
# Ensure stable ordering of keys
|
|
|
|
|
config_str = json.dumps(datasource.config, sort_keys=True)
|
|
|
|
|
return _get_cached_connector(datasource.type.lower(), config_str)
|
|
|
|
|
|
|
|
|
|
def get_connector_from_config(ds_type: str, config: Dict[str, Any]):
|
|
|
|
|
# Helper for testing connection without saving to DB
|
|
|
|
|
config_str = json.dumps(config, sort_keys=True)
|
|
|
|
|
return _get_cached_connector(ds_type.lower(), config_str)
|