remove minio
This commit is contained in:
+23
-18
@@ -1,43 +1,48 @@
|
||||
from fastapi import APIRouter, UploadFile, File, HTTPException, BackgroundTasks
|
||||
from app.connectors.minio import minio_connector
|
||||
from fastapi import APIRouter, UploadFile, File, HTTPException
|
||||
import pandas as pd
|
||||
import duckdb
|
||||
import io
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
router = APIRouter()
|
||||
upload_dir = Path(__file__).resolve().parents[2] / "data" / "uploads"
|
||||
upload_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@router.post("/upload/csv")
|
||||
async def upload_csv(file: UploadFile = File(...), background_tasks: BackgroundTasks = None):
|
||||
if not file.filename.endswith('.csv'):
|
||||
raise HTTPException(status_code=400, detail="Invalid file type. Only CSV allowed.")
|
||||
@router.post("/upload/file")
|
||||
async def upload_file(file: UploadFile = File(...)):
|
||||
allowed_extensions = ('.csv', '.xls', '.xlsx')
|
||||
if not file.filename.lower().endswith(allowed_extensions):
|
||||
raise HTTPException(status_code=400, detail="Invalid file type. Only CSV and Excel files allowed.")
|
||||
|
||||
try:
|
||||
content = await file.read()
|
||||
file_size = len(content)
|
||||
if not content:
|
||||
raise HTTPException(status_code=400, detail="Empty file is not allowed.")
|
||||
file_obj = io.BytesIO(content)
|
||||
|
||||
# Generate a unique filename
|
||||
unique_filename = f"{uuid.uuid4()}-{file.filename}"
|
||||
save_path = upload_dir / unique_filename
|
||||
save_path.write_bytes(content)
|
||||
file_url = f"local://{unique_filename}"
|
||||
|
||||
# Upload to MinIO
|
||||
minio_url = minio_connector.upload_file(unique_filename, file_obj, file_size, content_type="text/csv")
|
||||
|
||||
# Reset file pointer for analysis
|
||||
file_obj.seek(0)
|
||||
|
||||
# Load into DuckDB (in-memory) for quick analysis
|
||||
try:
|
||||
df = pd.read_csv(file_obj)
|
||||
if file.filename.lower().endswith('.csv'):
|
||||
df = pd.read_csv(file_obj)
|
||||
else:
|
||||
df = pd.read_excel(file_obj)
|
||||
|
||||
duckdb_conn = duckdb.connect(database=':memory:')
|
||||
duckdb_conn.register('uploaded_csv', df)
|
||||
summary = duckdb_conn.execute("DESCRIBE uploaded_csv").fetchall()
|
||||
duckdb_conn.register('uploaded_file', df)
|
||||
summary = duckdb_conn.execute("DESCRIBE uploaded_file").fetchall()
|
||||
row_count = len(df)
|
||||
columns = list(df.columns)
|
||||
|
||||
return {
|
||||
"filename": unique_filename,
|
||||
"url": minio_url,
|
||||
"url": file_url,
|
||||
"rows": row_count,
|
||||
"columns": columns,
|
||||
"summary": str(summary)
|
||||
@@ -45,7 +50,7 @@ async def upload_csv(file: UploadFile = File(...), background_tasks: BackgroundT
|
||||
except Exception as e:
|
||||
return {
|
||||
"filename": unique_filename,
|
||||
"url": minio_url,
|
||||
"url": file_url,
|
||||
"analysis_error": str(e)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
import os
|
||||
from typing import BinaryIO
|
||||
|
||||
class MinioConnector:
|
||||
def __init__(self):
|
||||
self.endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000")
|
||||
self.access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
|
||||
self.secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
|
||||
self.secure = os.getenv("MINIO_SECURE", "False").lower() == "true"
|
||||
self.bucket_name = os.getenv("MINIO_BUCKET", "dataclaw")
|
||||
|
||||
self.client = Minio(
|
||||
self.endpoint,
|
||||
access_key=self.access_key,
|
||||
secret_key=self.secret_key,
|
||||
secure=self.secure
|
||||
)
|
||||
self._ensure_bucket_exists()
|
||||
|
||||
def _ensure_bucket_exists(self):
|
||||
try:
|
||||
if not self.client.bucket_exists(self.bucket_name):
|
||||
self.client.make_bucket(self.bucket_name)
|
||||
except S3Error as e:
|
||||
print(f"MinIO Bucket Error: {e}")
|
||||
|
||||
def upload_file(self, object_name: str, file_data: BinaryIO, length: int, content_type: str = "application/octet-stream"):
|
||||
try:
|
||||
self.client.put_object(
|
||||
self.bucket_name,
|
||||
object_name,
|
||||
file_data,
|
||||
length,
|
||||
content_type=content_type
|
||||
)
|
||||
return f"http{'s' if self.secure else ''}://{self.endpoint}/{self.bucket_name}/{object_name}"
|
||||
except S3Error as e:
|
||||
print(f"MinIO Upload Error: {e}")
|
||||
raise e
|
||||
|
||||
def test_connection(self) -> bool:
|
||||
try:
|
||||
self.client.list_buckets()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"MinIO Connection Error: {e}")
|
||||
return False
|
||||
|
||||
minio_connector = MinioConnector()
|
||||
Reference in New Issue
Block a user