From 13fad02db84ce1646595e8f1fb71f7ac28ea5b8b Mon Sep 17 00:00:00 2001 From: ekko <152005280+EKKOLearnAI@users.noreply.github.com> Date: Fri, 15 May 2026 10:31:26 +0800 Subject: [PATCH] [codex] fix bridge state db sync (#740) * fix bridge session db flush cursor * fix bridge state db result sync --- .../hermes/agent-bridge/hermes_bridge.py | 98 ++++++++++++++++--- 1 file changed, 87 insertions(+), 11 deletions(-) diff --git a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py index 9a976aa..7cc4d97 100644 --- a/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py +++ b/packages/server/src/services/hermes/agent-bridge/hermes_bridge.py @@ -688,6 +688,8 @@ class AgentPool: if db is None: return False + history_len = len(conversation_history) if conversation_history else 0 + try: if hasattr(db, "create_session"): db.create_session( @@ -701,6 +703,7 @@ class AgentPool: if messages: last = messages[-1] if last.get("role") == "user" and last.get("content") == user_content: + self._align_prepersist_flush_cursor(session, history_len) return False db.append_message( @@ -710,21 +713,86 @@ class AgentPool: ) # AIAgent will build messages as conversation_history + current user. - # Since the current user was pre-persisted above, advance its flush - # cursor so the normal end-of-turn flush only writes assistant/tool - # messages for this turn. - history_len = len(conversation_history) if conversation_history else 0 - try: - session.agent._last_flushed_db_idx = max( - int(getattr(session.agent, "_last_flushed_db_idx", 0) or 0), - history_len + 1, - ) - except Exception: - pass + # Since the current user was pre-persisted above, align the flush + # cursor so the normal end-of-turn flush starts at assistant/tool + # messages generated by this run. + self._align_prepersist_flush_cursor(session, history_len) return True except Exception: return False + def _align_prepersist_flush_cursor(self, session: AgentSession, history_len: int) -> None: + try: + session.agent._last_flushed_db_idx = history_len + 1 + except Exception: + pass + + def _session_db_message_count(self, session_id: str, profile: str | None) -> int | None: + db = self._db.get_for_profile(profile) + if db is None or not hasattr(db, "get_messages"): + return None + try: + return len(db.get_messages(session_id) or []) + except Exception: + return None + + def _sync_result_tail_to_session_db( + self, + session: AgentSession, + result: dict[str, Any], + conversation_history: list[dict[str, Any]] | None, + profile: str | None, + db_count_after_prepersist: int | None, + ) -> None: + db = self._db.get_for_profile(profile) + if db is None or db_count_after_prepersist is None: + return + + after_count = self._session_db_message_count(session.session_id, profile) + if after_count is None or after_count > db_count_after_prepersist: + return + + messages = result.get("messages") + if not isinstance(messages, list): + return + + history_len = len(conversation_history) if conversation_history else 0 + generated = [ + msg for msg in messages[history_len + 1:] + if isinstance(msg, dict) and msg.get("role") in {"assistant", "tool"} + ] + if not generated: + return + + appended = 0 + for msg in generated: + try: + db.append_message( + session_id=session.session_id, + role=str(msg.get("role") or "assistant"), + content=msg.get("content"), + tool_name=msg.get("tool_name"), + tool_calls=msg.get("tool_calls") if isinstance(msg.get("tool_calls"), list) else None, + tool_call_id=msg.get("tool_call_id"), + finish_reason=msg.get("finish_reason"), + reasoning=msg.get("reasoning") if msg.get("role") == "assistant" else None, + reasoning_content=msg.get("reasoning_content") if msg.get("role") == "assistant" else None, + reasoning_details=msg.get("reasoning_details") if msg.get("role") == "assistant" else None, + codex_reasoning_items=msg.get("codex_reasoning_items") if msg.get("role") == "assistant" else None, + codex_message_items=msg.get("codex_message_items") if msg.get("role") == "assistant" else None, + ) + appended += 1 + except Exception: + break + + if appended: + print( + "[hermes_bridge] synced missing result tail to session db " + f"session={session.session_id} appended={appended}", + file=sys.stderr, + flush=True, + ) + def start_chat( self, session_id: str, @@ -776,6 +844,7 @@ class AgentPool: except Exception: previous_approval_callback = None self._prepersist_user_message(session, message, conversation_history, profile) + db_count_after_prepersist = self._session_db_message_count(session.session_id, profile) if force_compress: compress = getattr(session.agent, "_compress_context", None) if callable(compress): @@ -802,6 +871,13 @@ class AgentPool: **kwargs, ) result = _jsonable(result if isinstance(result, dict) else {"value": result}) + self._sync_result_tail_to_session_db( + session, + result, + conversation_history, + profile, + db_count_after_prepersist, + ) with session.lock: if isinstance(result.get("messages"), list): session.history = result["messages"]