diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index 8999008d70..fce9fe95a5 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -120,6 +120,30 @@ class WorkflowCollaborationService: return {"msg": "skill_file_active_updated"}, 200 + if event_type == "sync_request": + leader_sid = self._repository.get_current_leader(workflow_id) + if leader_sid and ( + self.is_session_active(workflow_id, leader_sid) + and self._repository.is_graph_active(workflow_id, leader_sid) + ): + target_sid = leader_sid + else: + if leader_sid: + self._repository.delete_leader(workflow_id) + target_sid = self._select_active_graph_leader(workflow_id, preferred_sid=sid) + if target_sid: + self._repository.set_leader(workflow_id, target_sid) + self.broadcast_leader_change(workflow_id, target_sid) + if not target_sid: + return {"msg": "no_active_leader"}, 200 + + self._socketio.emit( + "collaboration_update", + {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, + room=target_sid, + ) + return {"msg": "sync_request_forwarded"}, 200 + self._socketio.emit( "collaboration_update", {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, @@ -301,6 +325,18 @@ class WorkflowCollaborationService: return preferred_sid return session_sids[0] + def _select_active_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: + session_sids = [ + session["sid"] + for session in self._repository.list_sessions(workflow_id) + if session.get("graph_active") and self.is_session_active(workflow_id, session["sid"]) + ] + if not session_sids: + return None + if preferred_sid and preferred_sid in session_sids: + return preferred_sid + return session_sids[0] + def _select_skill_leader(self, workflow_id: str, file_id: str, preferred_sid: str | None = None) -> str | None: session_sids = [ sid