Files
2026-05-13 16:43:53 +08:00

51 lines
1.8 KiB
Python

from clickhouse_driver import Client
import os
class ClickHouseConnector:
def __init__(self, host: str = None, port: int = 9000, user: str = 'default', password: str = '', database: str = 'default'):
self.host = host or os.getenv("CLICKHOUSE_HOST", "localhost")
self.port = port or int(os.getenv("CLICKHOUSE_PORT", 9000))
self.user = user or os.getenv("CLICKHOUSE_USER", "default")
self.password = password or os.getenv("CLICKHOUSE_PASSWORD", "")
self.database = database or os.getenv("CLICKHOUSE_DB", "default")
self.client = Client(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database
)
def execute_query(self, query: str):
try:
return self.client.execute(query, with_column_types=True)
except Exception as e:
print(f"ClickHouse Query Error: {e}")
raise e
def get_schema(self):
query = "SELECT table, name, type FROM system.columns WHERE database = currentDatabase()"
try:
results = self.client.execute(query)
schema = {}
for row in results:
table = row[0]
if table not in schema:
schema[table] = []
schema[table].append({"name": row[1], "type": row[2]})
return schema
except Exception as e:
print(f"Error getting schema: {e}")
return {}
def test_connection(self) -> bool:
try:
self.client.execute("SELECT 1")
return True
except Exception as e:
print(f"ClickHouse Connection Error: {e}")
return False
clickhouse_connector = ClickHouseConnector()