Files
DataClaw/backend/app/connectors/postgres.py
T

72 lines
2.4 KiB
Python
Raw Normal View History

2026-03-14 15:44:48 +08:00
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from typing import Generator
import os
class PostgresConnector:
def __init__(self, db_url: str = None):
self.db_url = db_url or os.getenv("POSTGRES_URL", "postgresql://user:password@localhost:5432/dbname")
self.engine = create_engine(self.db_url)
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
def get_db(self) -> Generator:
db = self.SessionLocal()
try:
yield db
finally:
db.close()
def execute_query(self, query: str):
with self.engine.connect() as connection:
result = connection.execute(text(query))
return [dict(row._mapping) for row in result]
def get_schema(self):
2026-03-16 22:18:23 +08:00
if self.engine.dialect.name == "sqlite":
return self._get_sqlite_schema()
2026-03-14 15:44:48 +08:00
query = """
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position;
"""
try:
results = self.execute_query(query)
schema = {}
for row in results:
table = row['table_name']
if table not in schema:
schema[table] = []
2026-03-16 22:18:23 +08:00
schema[table].append({"name": row['column_name'], "type": row['data_type']})
2026-03-14 15:44:48 +08:00
return schema
except Exception as e:
print(f"Error getting schema: {e}")
return {}
2026-03-16 22:18:23 +08:00
def _get_sqlite_schema(self):
try:
from sqlalchemy import inspect
inspector = inspect(self.engine)
schema = {}
for table_name in inspector.get_table_names():
columns = []
for col in inspector.get_columns(table_name):
columns.append({"name": col['name'], "type": str(col['type'])})
schema[table_name] = columns
return schema
except Exception as e:
print(f"Error getting SQLite schema: {e}")
return {}
2026-03-14 15:44:48 +08:00
def test_connection(self) -> bool:
try:
with self.engine.connect() as connection:
connection.execute(text("SELECT 1"))
return True
except Exception as e:
print(f"PostgreSQL Connection Error: {e}")
return False
postgres_connector = PostgresConnector()