Forward custom coordinator events through ConnectionManager#237
Conversation
The coordinator WS now subscribes to both video and chat products, allowing chat events (message.new, reactions, typing, etc.) to arrive on the same connection as video call events. No second WS needed. - Add "chat" to products in StreamAPIWS auth payload - Add watch_channels() in connection_utils (same pattern as watch_call) - ConnectionManager subscribes to messaging channel after watch_call - Expose coordinator_ws property for event listener registration
📝 WalkthroughWalkthroughAdded a coordinator WebSocket wildcard subscription to re-emit coordinator call-scoped events through Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Chat event subscription needs discussion with the team -- JS SDK uses two separate WS connections for chat and video, and mixing them on one connection may affect MAU billing. Keep coordinator_ws property for custom events which already work via watch_call.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@getstream/video/rtc/connection_manager.py`:
- Around line 623-632: The property coordinator_ws should honor "Returns None if
not connected" by returning the websocket client only when it is actually
connected; update coordinator_ws to check _coordinator_ws_client's connection
state (e.g., _coordinator_ws_client.connected or
_coordinator_ws_client.is_connected) and/or ensure the coordinator helper/task
(e.g., _coordinator_task) is still running, and return None otherwise; use
getattr checks to avoid attribute errors so callers never receive a
non-connected client from coordinator_ws.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 66b96399-2289-4c8e-be81-8a56b62ec916
📒 Files selected for processing (1)
getstream/video/rtc/connection_manager.py
Return Optional[StreamAPIWS] and check _connected before returning, so callers get None instead of a disconnected client.
…r_ws
Verifies the full pub/sub cycle: send a custom event via REST
(call.send_call_event), receive it on ConnectionManager.coordinator_ws
through ws.on("custom") listener. Uses asyncio.Event for reliable
timing instead of fixed sleep.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/rtc/coordinator/test_custom_events.py (1)
36-69: Group integration tests in a test class for consistency.This test is currently top-level; move it under a test class to match repository test-organization rules.
Suggested structure
-@pytest.mark.asyncio -@pytest.mark.integration -@skip_on_rate_limit -async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str): - """Send a custom event via REST and verify it arrives on coordinator_ws.""" - call = async_client.video.call("default", str(uuid.uuid4())) - await call.get_or_create(data=CallRequest(created_by_id=test_user)) - - async with await rtc.join(call, test_user) as connection: - assert connection.connection_state == ConnectionState.JOINED - - ws = connection.coordinator_ws - assert ws is not None - - received_event = None - event_received = asyncio.Event() - - `@ws.on`("custom") - def on_custom(event): - nonlocal received_event - received_event = event - event_received.set() - - await call.send_call_event( - user_id=test_user, - custom={"type": "test_event", "payload": "hello from test"}, - ) - - await asyncio.wait_for(event_received.wait(), timeout=10.0) - - assert received_event is not None - custom_data = received_event.get("custom", {}) - assert custom_data.get("type") == "test_event" - assert custom_data.get("payload") == "hello from test" +class TestCoordinatorCustomEvents: + `@pytest.mark.asyncio` + `@pytest.mark.integration` + `@skip_on_rate_limit` + async def test_custom_event_round_trip(self, async_client: AsyncStream, test_user: str): + """Send a custom event via REST and verify it arrives on coordinator_ws.""" + call = async_client.video.call("default", str(uuid.uuid4())) + await call.get_or_create(data=CallRequest(created_by_id=test_user)) + + async with await rtc.join(call, test_user) as connection: + assert connection.connection_state == ConnectionState.JOINED + + ws = connection.coordinator_ws + assert ws is not None + + received_event = None + event_received = asyncio.Event() + + `@ws.on`("custom") + def on_custom(event): + nonlocal received_event + received_event = event + event_received.set() + + await call.send_call_event( + user_id=test_user, + custom={"type": "test_event", "payload": "hello from test"}, + ) + + await asyncio.wait_for(event_received.wait(), timeout=10.0) + + assert received_event is not None + custom_data = received_event.get("custom", {}) + assert custom_data.get("type") == "test_event" + assert custom_data.get("payload") == "hello from test"As per coding guidelines, "
**/test_*.py: Keep tests well organized and use test classes to group similar tests."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/rtc/coordinator/test_custom_events.py` around lines 36 - 69, Move the standalone test_custom_event_round_trip coroutine into a test class (e.g., class TestCustomEvents:) to group integration tests; keep all existing decorators (`@pytest.mark.asyncio`, `@pytest.mark.integration`, `@skip_on_rate_limit`) on the method and retain the signature (async_client: AsyncStream, test_user: str) and body including the call variable, rtc.join usage, ws.on("custom") handler, and assertions; ensure the new method name remains test_custom_event_round_trip and update any imports or fixtures if needed so pytest discovers the test class and method.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 28-33: The teardown currently swallows all errors in the
try/except around async_client.delete_users (the block calling await
async_client.delete_users(user_ids=[user_id], user="hard", conversations="hard",
messages="hard")), which hides failures; change the except to either log the
exception (e.g., with logger.exception or print) or re-raise/assert so CI fails
— locate the try/except around delete_users in
tests/rtc/coordinator/test_custom_events.py and replace the bare "except
Exception: pass" with an exception handler that records the error details and/or
raises the exception to surface teardown failures.
---
Nitpick comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 36-69: Move the standalone test_custom_event_round_trip coroutine
into a test class (e.g., class TestCustomEvents:) to group integration tests;
keep all existing decorators (`@pytest.mark.asyncio`, `@pytest.mark.integration`,
`@skip_on_rate_limit`) on the method and retain the signature (async_client:
AsyncStream, test_user: str) and body including the call variable, rtc.join
usage, ws.on("custom") handler, and assertions; ensure the new method name
remains test_custom_event_round_trip and update any imports or fixtures if
needed so pytest discovers the test class and method.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 47b0d709-cb15-49fa-8573-c6965a0645f9
📒 Files selected for processing (2)
getstream/video/rtc/connection_manager.pytests/rtc/coordinator/test_custom_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
- getstream/video/rtc/connection_manager.py
Sender creates the call and sends the event via REST, receiver joins via rtc.join and verifies the event arrives on coordinator_ws. Confirms cross-participant event delivery.
Subscribers no longer need to reach through `connection.coordinator_ws` to listen for call-scoped events — the raw WS client stays internal.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/rtc/coordinator/test_custom_events.py (1)
26-42: Rename fixture to avoid thetest_prefix.pytest collects any top-level callable starting with
test_and will emit aPytestCollectionWarningfor thetest_usersfixture (it's a coroutine function, not a Test item). Renaming also avoids confusion with the actual test function and aligns with idiomatic fixture naming.♻️ Proposed rename
`@pytest_asyncio.fixture`() -async def test_users(async_client: AsyncStream): +async def stream_users(async_client: AsyncStream): user_ids = [f"test-user-{uuid.uuid4()}" for _ in range(2)] await async_client.upsert_users(*[UserRequest(id=uid) for uid in user_ids]) yield user_ids @@ -async def test_custom_event_round_trip(async_client: AsyncStream, test_users: list): +async def test_custom_event_round_trip(async_client: AsyncStream, stream_users: list): """Send a custom event via REST and verify it arrives on coordinator_ws.""" - sender, receiver = test_users + sender, receiver = stream_users🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/rtc/coordinator/test_custom_events.py` around lines 26 - 42, The fixture function named test_users is being treated as a test by pytest; rename it (e.g., to users or fixture_users) to remove the test_ prefix, update its definition (currently async def test_users(...)) and all usages (e.g., the parameter in test_custom_event_round_trip and any other tests that depend on it) to the new name, keeping the same body (user_ids generation, upsert, yield, and cleanup) and preserving async_client and the cleanup try/except logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 26-42: The fixture function named test_users is being treated as a
test by pytest; rename it (e.g., to users or fixture_users) to remove the test_
prefix, update its definition (currently async def test_users(...)) and all
usages (e.g., the parameter in test_custom_event_round_trip and any other tests
that depend on it) to the new name, keeping the same body (user_ids generation,
upsert, yield, and cleanup) and preserving async_client and the cleanup
try/except logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8c461789-3ab7-4986-bd14-f2bccb92e922
📒 Files selected for processing (2)
getstream/video/rtc/connection_manager.pytests/rtc/coordinator/test_custom_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
- getstream/video/rtc/connection_manager.py
Re-emitting every coordinator event risks name collisions with SFU events already forwarded through ConnectionManager. Narrow the bridge to the one event the public API exposes.
Custom events are already re-emitted on the ConnectionManager itself, so the escape hatch to the raw WS client is no longer part of the public surface. Keeps the abstraction sealed.
Why
The coordinator WebSocket receives call-scoped events (e.g. user-defined
customevents sent viacall.send_call_event(...)) but theConnectionManagernever surfaced them. This blocks use cases like forwarding custom events (turn detection, agent signals, etc.) from the Python SDK to external systems.Changes
customevents from the internal coordinator WS client throughConnectionManagerso callers can subscribe via the existing event emitter surface.rtc.joinreceives it on theConnectionManager.Usage