perf: 优化数据库连接池和HTTP客户端复用 1.连接池50+30支持150-200并发 2.HTTP客户端全局复用减少开销 3.前端UI优化和默认章节调整

This commit is contained in:
xiamuceer
2025-11-22 18:23:30 +08:00
parent 45bdcabca5
commit 244aa4db4f
6 changed files with 230 additions and 79 deletions
+10 -5
View File
@@ -43,14 +43,19 @@ class Settings(BaseSettings):
# 数据库配置 - PostgreSQL
database_url: str = DATABASE_URL
# PostgreSQL连接池配置(优化后支持80-150并发用户)
database_pool_size: int = 30 # 核心连接池大小(从20提升到30
database_max_overflow: int = 20 # 最大溢出连接数(从10提升到20
database_pool_timeout: int = 60 # 连接池超时秒数(从30提升到60
database_pool_recycle: int = 1800 # 连接回收时间秒数(从3600降低到180030分钟)
# PostgreSQL连接池配置(优化后支持150-200并发用户)
database_pool_size: int = 50 # 核心连接池大小(优化:从30提升到50
database_max_overflow: int = 30 # 最大溢出连接数(优化:从20提升到30
database_pool_timeout: int = 90 # 连接池超时秒数(优化:从60提升到90
database_pool_recycle: int = 1800 # 连接回收时间秒数(30分钟,防止长时间连接失效
database_pool_pre_ping: bool = True # 连接前ping检测,确保连接有效
database_pool_use_lifo: bool = True # 使用LIFO策略提高连接复用率
# 连接池高级配置
database_echo_pool: bool = False # 是否记录连接池日志(调试用)
database_pool_reset_on_return: str = "rollback" # 连接归还时的重置策略:rollback/commit/none
database_max_identifier_length: int = 128 # PostgreSQL标识符最大长度
# 会话监控配置
database_session_max_active: int = 50 # 活跃会话警告阈值(从100降低到50)
database_session_leak_threshold: int = 100 # 会话泄漏严重告警阈值
+57 -11
View File
@@ -72,24 +72,39 @@ async def get_engine(user_id: str):
engine = create_async_engine(
settings.database_url,
echo=False, # 生产环境关闭SQL日志
echo=settings.database_echo_pool, # 根据配置决定是否输出连接池日志
echo_pool=settings.database_echo_pool, # 连接池操作日志
future=True,
pool_size=settings.database_pool_size, # 核心连接数:30
max_overflow=settings.database_max_overflow, # 溢出连接数:20
pool_timeout=settings.database_pool_timeout, # 连接超时:60秒
pool_size=settings.database_pool_size, # 核心连接数:50(优化后)
max_overflow=settings.database_max_overflow, # 溢出连接数:30(优化后)
pool_timeout=settings.database_pool_timeout, # 连接超时:90秒(优化后)
pool_pre_ping=settings.database_pool_pre_ping, # 连接前检测
pool_recycle=settings.database_pool_recycle, # 连接回收:1800秒
pool_use_lifo=settings.database_pool_use_lifo, # LIFO策略提高复用
pool_reset_on_return=settings.database_pool_reset_on_return, # 连接归还时重置
max_identifier_length=settings.database_max_identifier_length, # 标识符最大长度
connect_args=connect_args
)
_engine_cache[cache_key] = engine
# 计算总连接数和预估并发能力
total_connections = settings.database_pool_size + settings.database_max_overflow
estimated_concurrent_users = total_connections * 2 # 每个用户平均0.5个连接
logger.info(
f"✅ PostgreSQL引擎已创建(优化配置)\n"
f" ├─ 连接池: {settings.database_pool_size} 核心 + {settings.database_max_overflow} 溢出 = {settings.database_pool_size + settings.database_max_overflow} 总连接\n"
f" ├─ 超时: {settings.database_pool_timeout}\n"
f" ├─ 回收: {settings.database_pool_recycle}\n"
f" ├─ 策略: LIFO(提高复用率)\n"
f" 预估并发: 80-150用户"
f" \n"
f" ├─ 连接池配置:\n"
f" │ ├─ 核心连接: {settings.database_pool_size}\n"
f" │ ├─ 溢出连接: {settings.database_max_overflow}\n"
f" │ └─ 总连接数: {total_connections}\n"
f" 超时配置:\n"
f" │ ├─ 获取超时: {settings.database_pool_timeout}\n"
f" │ └─ 连接回收: {settings.database_pool_recycle}秒 ({settings.database_pool_recycle//60}分钟)\n"
f" ├─ 优化策略:\n"
f" │ ├─ 复用策略: LIFO(后进先出)\n"
f" │ ├─ 健康检查: Pre-ping enabled\n"
f" │ └─ 归还重置: {settings.database_pool_reset_on_return}\n"
f" └─ 预估并发: {estimated_concurrent_users}-{estimated_concurrent_users + 50}用户"
)
return _engine_cache[cache_key]
@@ -340,6 +355,24 @@ async def get_database_stats():
"""
from app.config import settings
# 获取连接池详细状态
pool_stats = {}
cache_key = "shared_postgres"
if cache_key in _engine_cache:
engine = _engine_cache[cache_key]
try:
pool = engine.pool
pool_stats = {
"size": pool.size(), # 当前连接池大小
"checked_in": pool.checkedin(), # 可用连接数
"checked_out": pool.checkedout(), # 正在使用的连接数
"overflow": pool.overflow(), # 溢出连接数
"usage_percent": (pool.checkedout() / (settings.database_pool_size + settings.database_max_overflow)) * 100,
}
except Exception as e:
logger.warning(f"获取连接池状态失败: {e}")
pool_stats = {"error": str(e)}
stats = {
"session_stats": {
"created": _session_stats["created"],
@@ -349,6 +382,7 @@ async def get_database_stats():
"generator_exits": _session_stats["generator_exits"],
"last_check": _session_stats["last_check"],
},
"pool_stats": pool_stats, # 新增:连接池实时状态
"engine_cache": {
"total_engines": len(_engine_cache),
"engine_keys": list(_engine_cache.keys()),
@@ -359,6 +393,7 @@ async def get_database_stats():
"max_overflow": settings.database_max_overflow,
"total_connections": settings.database_pool_size + settings.database_max_overflow,
"pool_timeout": settings.database_pool_timeout,
"pool_recycle": settings.database_pool_recycle,
"session_max_active_threshold": settings.database_session_max_active,
"session_leak_threshold": settings.database_session_leak_threshold,
},
@@ -385,9 +420,20 @@ async def get_database_stats():
stats["health"]["status"] = "error"
stats["health"]["errors"].append(f"活跃会话数异常: {_session_stats['active']}")
# 连接池使用率检查
if pool_stats and "usage_percent" in pool_stats:
usage = pool_stats["usage_percent"]
if usage > 90:
stats["health"]["status"] = "warning"
stats["health"]["warnings"].append(f"连接池使用率过高: {usage:.1f}%")
elif usage > 95:
stats["health"]["status"] = "critical"
stats["health"]["errors"].append(f"连接池几乎耗尽: {usage:.1f}%")
error_rate = (_session_stats["errors"] / max(_session_stats["created"], 1)) * 100
if error_rate > 5:
stats["health"]["status"] = "warning"
if stats["health"]["status"] == "healthy":
stats["health"]["status"] = "warning"
stats["health"]["warnings"].append(f"会话错误率过高: {error_rate:.2f}%")
stats["health"]["error_rate"] = f"{error_rate:.2f}%"
+7
View File
@@ -49,7 +49,14 @@ async def lifespan(app: FastAPI):
# 清理MCP插件
await mcp_registry.cleanup_all()
# 清理HTTP客户端池
from app.services.ai_service import cleanup_http_clients
await cleanup_http_clients()
# 关闭数据库连接
await close_db()
logger.info("应用已关闭")
+108 -30
View File
@@ -6,9 +6,107 @@ from app.config import settings as app_settings
from app.logger import get_logger
import httpx
import json
import hashlib
logger = get_logger(__name__)
# 全局HTTP客户端池(按配置复用)
_http_client_pool: Dict[str, httpx.AsyncClient] = {}
_client_pool_lock = False # 简单的锁标志
def _get_client_key(provider: str, base_url: Optional[str], api_key: str) -> str:
"""生成HTTP客户端的唯一键
Args:
provider: 提供商名称
base_url: API基础URL
api_key: API密钥(用于区分不同用户)
Returns:
客户端唯一键
"""
# 使用API密钥的哈希值(安全性)+ 提供商 + base_url 作为键
key_hash = hashlib.md5(api_key.encode()).hexdigest()[:8]
url_part = base_url or "default"
return f"{provider}_{url_part}_{key_hash}"
def _get_or_create_http_client(
provider: str,
base_url: Optional[str],
api_key: str
) -> httpx.AsyncClient:
"""获取或创建HTTP客户端(复用连接)
Args:
provider: 提供商名称
base_url: API基础URL
api_key: API密钥
Returns:
httpx.AsyncClient实例
"""
global _http_client_pool
client_key = _get_client_key(provider, base_url, api_key)
# 检查是否已存在
if client_key in _http_client_pool:
client = _http_client_pool[client_key]
# 检查客户端是否仍然有效
if not client.is_closed:
logger.debug(f"♻️ 复用HTTP客户端: {client_key}")
return client
else:
# 客户端已关闭,从池中移除
logger.warning(f"⚠️ HTTP客户端已关闭,重新创建: {client_key}")
del _http_client_pool[client_key]
# 创建新客户端
limits = httpx.Limits(
max_keepalive_connections=50, # 最大保持连接数
max_connections=100, # 最大总连接数
keepalive_expiry=30.0 # 保持连接30秒
)
client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=60.0, # 连接超时
read=180.0, # 读取超时
write=60.0, # 写入超时
pool=60.0 # 连接池超时
),
limits=limits,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
)
# 添加到池中
_http_client_pool[client_key] = client
logger.info(f"✅ 创建新HTTP客户端并加入池: {client_key} (池大小: {len(_http_client_pool)})")
return client
async def cleanup_http_clients():
"""清理所有HTTP客户端(应用关闭时调用)"""
global _http_client_pool
logger.info(f"🧹 开始清理HTTP客户端池 (共 {len(_http_client_pool)} 个客户端)")
for key, client in list(_http_client_pool.items()):
try:
if not client.is_closed:
await client.aclose()
logger.debug(f"✅ 关闭HTTP客户端: {key}")
except Exception as e:
logger.error(f"❌ 关闭HTTP客户端失败 {key}: {e}")
_http_client_pool.clear()
logger.info("✅ HTTP客户端池清理完成")
class AIService:
"""AI服务统一接口 - 支持从用户设置或全局配置初始化"""
@@ -39,30 +137,20 @@ class AIService:
self.default_temperature = default_temperature or app_settings.default_temperature
self.default_max_tokens = default_max_tokens or app_settings.default_max_tokens
# 初始化OpenAI客户端
# 初始化OpenAI客户端(使用HTTP客户端池)
openai_key = api_key if api_provider == "openai" else app_settings.openai_api_key
if openai_key:
try:
limits = httpx.Limits(
max_keepalive_connections=50,
max_connections=100,
keepalive_expiry=30.0
)
base_url = api_base_url if api_provider == "openai" else app_settings.openai_base_url
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=60.0, read=180.0, write=60.0, pool=60.0),
limits=limits,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
)
# 从池中获取或创建HTTP客户端(复用连接)
http_client = _get_or_create_http_client("openai", base_url, openai_key)
client_kwargs = {
"api_key": openai_key,
"http_client": http_client
}
base_url = api_base_url if api_provider == "openai" else app_settings.openai_base_url
if base_url:
client_kwargs["base_url"] = base_url
@@ -70,7 +158,7 @@ class AIService:
self.openai_http_client = http_client
self.openai_api_key = openai_key
self.openai_base_url = base_url
logger.info("✅ OpenAI客户端初始化成功")
logger.info("✅ OpenAI客户端初始化成功(复用HTTP连接)")
except Exception as e:
logger.error(f"OpenAI客户端初始化失败: {e}")
self.openai_client = None
@@ -86,35 +174,25 @@ class AIService:
if self.api_provider == "openai":
logger.warning("⚠️ OpenAI API key未配置,但被设置为当前AI提供商")
# 初始化Anthropic客户端
# 初始化Anthropic客户端(使用HTTP客户端池)
anthropic_key = api_key if api_provider == "anthropic" else app_settings.anthropic_api_key
if anthropic_key:
try:
limits = httpx.Limits(
max_keepalive_connections=50,
max_connections=100,
keepalive_expiry=30.0
)
base_url = api_base_url if api_provider == "anthropic" else app_settings.anthropic_base_url
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=60.0, read=180.0, write=60.0, pool=60.0),
limits=limits,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
)
# 从池中获取或创建HTTP客户端(复用连接)
http_client = _get_or_create_http_client("anthropic", base_url, anthropic_key)
client_kwargs = {
"api_key": anthropic_key,
"http_client": http_client
}
base_url = api_base_url if api_provider == "anthropic" else app_settings.anthropic_base_url
if base_url:
client_kwargs["base_url"] = base_url
self.anthropic_client = AsyncAnthropic(**client_kwargs)
logger.info("✅ Anthropic客户端初始化成功")
logger.info("✅ Anthropic客户端初始化成功(复用HTTP连接)")
except Exception as e:
logger.error(f"Anthropic客户端初始化失败: {e}")
self.anthropic_client = None