49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
import duckdb
|
|
import pandas as pd
|
|
from typing import List, Dict, Any, Optional
|
|
import os
|
|
|
|
class DuckDBConnector:
|
|
def __init__(self, db_path: str = ":memory:"):
|
|
self.db_path = db_path
|
|
|
|
def execute_query(self, query: str) -> List[Dict[str, Any]]:
|
|
conn = duckdb.connect(self.db_path)
|
|
try:
|
|
df = conn.execute(query).df()
|
|
return df.to_dict(orient="records")
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_schema(self) -> Dict[str, Any]:
|
|
conn = duckdb.connect(self.db_path)
|
|
try:
|
|
schema = {}
|
|
tables = conn.execute("SHOW TABLES").fetchall()
|
|
for (table_name,) in tables:
|
|
columns_info = conn.execute(f"DESCRIBE {table_name}").fetchall()
|
|
columns = []
|
|
for col in columns_info:
|
|
columns.append({
|
|
"name": col[0],
|
|
"type": col[1]
|
|
})
|
|
schema[table_name] = {
|
|
"columns": columns,
|
|
"primary_keys": [], # DuckDB describe doesn't easily show PKs in this format
|
|
"foreign_keys": []
|
|
}
|
|
return schema
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_connection(self) -> bool:
|
|
try:
|
|
conn = duckdb.connect(self.db_path)
|
|
conn.execute("SELECT 1")
|
|
conn.close()
|
|
return True
|
|
except Exception as e:
|
|
print(f"DuckDB Connection Error: {e}")
|
|
return False
|