[codex] fix bridge state db sync (#740)

* fix bridge session db flush cursor

* fix bridge state db result sync
This commit is contained in:
ekko
2026-05-15 10:31:26 +08:00
committed by GitHub
parent da067a5a78
commit 13fad02db8
@@ -688,6 +688,8 @@ class AgentPool:
if db is None:
return False
history_len = len(conversation_history) if conversation_history else 0
try:
if hasattr(db, "create_session"):
db.create_session(
@@ -701,6 +703,7 @@ class AgentPool:
if messages:
last = messages[-1]
if last.get("role") == "user" and last.get("content") == user_content:
self._align_prepersist_flush_cursor(session, history_len)
return False
db.append_message(
@@ -710,21 +713,86 @@ class AgentPool:
)
# AIAgent will build messages as conversation_history + current user.
# Since the current user was pre-persisted above, advance its flush
# cursor so the normal end-of-turn flush only writes assistant/tool
# messages for this turn.
history_len = len(conversation_history) if conversation_history else 0
try:
session.agent._last_flushed_db_idx = max(
int(getattr(session.agent, "_last_flushed_db_idx", 0) or 0),
history_len + 1,
)
except Exception:
pass
# Since the current user was pre-persisted above, align the flush
# cursor so the normal end-of-turn flush starts at assistant/tool
# messages generated by this run.
self._align_prepersist_flush_cursor(session, history_len)
return True
except Exception:
return False
def _align_prepersist_flush_cursor(self, session: AgentSession, history_len: int) -> None:
try:
session.agent._last_flushed_db_idx = history_len + 1
except Exception:
pass
def _session_db_message_count(self, session_id: str, profile: str | None) -> int | None:
db = self._db.get_for_profile(profile)
if db is None or not hasattr(db, "get_messages"):
return None
try:
return len(db.get_messages(session_id) or [])
except Exception:
return None
def _sync_result_tail_to_session_db(
self,
session: AgentSession,
result: dict[str, Any],
conversation_history: list[dict[str, Any]] | None,
profile: str | None,
db_count_after_prepersist: int | None,
) -> None:
db = self._db.get_for_profile(profile)
if db is None or db_count_after_prepersist is None:
return
after_count = self._session_db_message_count(session.session_id, profile)
if after_count is None or after_count > db_count_after_prepersist:
return
messages = result.get("messages")
if not isinstance(messages, list):
return
history_len = len(conversation_history) if conversation_history else 0
generated = [
msg for msg in messages[history_len + 1:]
if isinstance(msg, dict) and msg.get("role") in {"assistant", "tool"}
]
if not generated:
return
appended = 0
for msg in generated:
try:
db.append_message(
session_id=session.session_id,
role=str(msg.get("role") or "assistant"),
content=msg.get("content"),
tool_name=msg.get("tool_name"),
tool_calls=msg.get("tool_calls") if isinstance(msg.get("tool_calls"), list) else None,
tool_call_id=msg.get("tool_call_id"),
finish_reason=msg.get("finish_reason"),
reasoning=msg.get("reasoning") if msg.get("role") == "assistant" else None,
reasoning_content=msg.get("reasoning_content") if msg.get("role") == "assistant" else None,
reasoning_details=msg.get("reasoning_details") if msg.get("role") == "assistant" else None,
codex_reasoning_items=msg.get("codex_reasoning_items") if msg.get("role") == "assistant" else None,
codex_message_items=msg.get("codex_message_items") if msg.get("role") == "assistant" else None,
)
appended += 1
except Exception:
break
if appended:
print(
"[hermes_bridge] synced missing result tail to session db "
f"session={session.session_id} appended={appended}",
file=sys.stderr,
flush=True,
)
def start_chat(
self,
session_id: str,
@@ -776,6 +844,7 @@ class AgentPool:
except Exception:
previous_approval_callback = None
self._prepersist_user_message(session, message, conversation_history, profile)
db_count_after_prepersist = self._session_db_message_count(session.session_id, profile)
if force_compress:
compress = getattr(session.agent, "_compress_context", None)
if callable(compress):
@@ -802,6 +871,13 @@ class AgentPool:
**kwargs,
)
result = _jsonable(result if isinstance(result, dict) else {"value": result})
self._sync_result_tail_to_session_db(
session,
result,
conversation_history,
profile,
db_count_after_prepersist,
)
with session.lock:
if isinstance(result.get("messages"), list):
session.history = result["messages"]