This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Transport State Management
Relevant source files
- extensions/muxio-rpc-service-caller/src/lib.rs
- extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs
- extensions/muxio-tokio-rpc-client/src/lib.rs
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs
- extensions/muxio-wasm-rpc-client/src/lib.rs
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs
- extensions/muxio-wasm-rpc-client/src/static_lib/static_client.rs
Purpose and Scope
This document explains how transport implementations track connection state, handle disconnection events, and notify application code through state change callbacks. All concrete transport implementations (RpcClient for Tokio and RpcWasmClient for WASM) provide consistent state management interfaces defined by the RpcServiceCallerInterface trait.
For information about the overall RPC caller interface and making RPC calls, see Service Caller Interface. For implementation details specific to each transport, see Tokio RPC Client and WASM RPC Client.
Transport State Types
The system defines connection state through the RpcTransportState enum, which represents the binary connection status of a transport.
Sources:
stateDiagram-v2
[*] --> Disconnected : Initial State
Disconnected --> Connected : Connection Established
Connected --> Disconnected : Connection Lost/Closed
Disconnected --> [*] : Client Dropped
- extensions/muxio-rpc-service-caller/src/transport_state.rs
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:22-23
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:13-14
RpcTransportState Variants
| Variant | Description | Usage |
|---|---|---|
Connected | Transport has an active connection | Allows RPC calls to proceed |
Disconnected | Transport connection is closed or failed | Blocks new RPC calls, cancels pending requests |
The state is exposed through the is_connected() method on RpcServiceCallerInterface, which returns a boolean indicating current connection status.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:136-139
State Tracking Mechanism
Both transport implementations use Arc<AtomicBool> for thread-safe state tracking. This allows state to be checked and modified atomically across multiple concurrent tasks without requiring locks.
Sources:
graph TB
subgraph "RpcClient (Tokio)"
RpcClient["RpcClient"]
TokioAtomicBool["is_connected: Arc<AtomicBool>"]
RpcClient --> TokioAtomicBool
end
subgraph "RpcWasmClient (WASM)"
RpcWasmClient["RpcWasmClient"]
WasmAtomicBool["is_connected: Arc<AtomicBool>"]
RpcWasmClient --> WasmAtomicBool
end
subgraph "RpcServiceCallerInterface"
IsConnected["is_connected() -> bool"]
GetEmitFn["get_emit_fn()
checks state"]
end
TokioAtomicBool -.reads.-> IsConnected
WasmAtomicBool -.reads.-> IsConnected
TokioAtomicBool -.reads.-> GetEmitFn
WasmAtomicBool -.reads.-> GetEmitFn
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs30
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs23
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:136-139
Atomic Ordering Semantics
The implementations use specific memory ordering for different operations:
| Operation | Ordering | Rationale |
|---|---|---|
| Initial state check | Relaxed | Non-critical reads for logging |
| State transition (swap) | SeqCst | Strong guarantee for state transitions |
| Send loop state check | Acquire | Synchronizes with state changes |
| State store | SeqCst | Ensures visibility to all threads |
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs61
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs85
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs231
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs39
State Change Callbacks
Transport implementations allow application code to register callbacks that execute when connection state changes. This enables reactive patterns where applications can update UI, retry connections, or clean up resources.
Sources:
sequenceDiagram
participant App as "Application Code"
participant Client as "RpcClient/RpcWasmClient"
participant Handler as "State Change Handler"
participant Transport as "Underlying Transport"
App->>Client: set_state_change_handler(callback)
Client->>Client: Store handler in Arc<Mutex<Option<Box<...>>>>
alt Already Connected
Client->>Handler: callback(RpcTransportState::Connected)
Handler->>App: Initial state notification
end
Transport->>Client: Connection closed/error
Client->>Client: is_connected.swap(false)
Client->>Handler: callback(RpcTransportState::Disconnected)
Handler->>App: Disconnection notification
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:168-180
Handler Registration
The set_state_change_handler() method is defined by RpcServiceCallerInterface and implemented by both transport types:
async fn set_state_change_handler(
&self,
handler: impl Fn(RpcTransportState) + Send + Sync + 'static,
)
The handler is stored as Arc<StdMutex<Option<Box<dyn Fn(RpcTransportState) + Send + Sync>>>>, allowing it to be:
- Shared across multiple tasks via
Arc - Safely mutated when setting/clearing via
Mutex - Called from any thread via
Send + Syncbounds - Dynamically replaced via
Option
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:22-23
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:13-14
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334
Initial State Notification
Both implementations immediately call the handler with RpcTransportState::Connected if already connected when the handler is registered. This ensures application code receives the current state without waiting for a transition.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:324-333
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:175-179
graph TB
subgraph "RpcClient State Lifecycle"
Constructor["RpcClient::new()"]
CreateState["Create is_connected: AtomicBool(true)"]
SpawnTasks["Spawn receive/send/heartbeat tasks"]
ReceiveLoop["Receive Loop Task"]
SendLoop["Send Loop Task"]
Heartbeat["Heartbeat Task"]
ErrorDetect["Error Detection"]
ShutdownAsync["shutdown_async()"]
ShutdownSync["shutdown_sync() (Drop)"]
FailPending["fail_all_pending_requests()"]
CallHandler["Call state_change_handler(Disconnected)"]
Constructor --> CreateState
CreateState --> SpawnTasks
SpawnTasks --> ReceiveLoop
SpawnTasks --> SendLoop
SpawnTasks --> Heartbeat
ReceiveLoop --> ErrorDetect
SendLoop --> ErrorDetect
ErrorDetect --> ShutdownAsync
ShutdownAsync --> CallHandler
ShutdownAsync --> FailPending
ShutdownSync --> CallHandler
end
Tokio Client State Management
The RpcClient manages connection state across multiple concurrent tasks: receive loop, send loop, and heartbeat task.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:79-108
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52
Connection Establishment
When RpcClient::new() successfully connects:
- WebSocket connection is established at extensions/muxio-tokio-rpc-client/src/rpc_client.rs:118-125
is_connectedis initialized totrueat extensions/muxio-tokio-rpc-client/src/rpc_client.rs134- Background tasks are spawned (receive, send, heartbeat) at extensions/muxio-tokio-rpc-client/src/rpc_client.rs:137-257
- If a state change handler is later set, it immediately receives
Connectednotification
Sources:
Disconnection Detection
The client detects disconnection through multiple mechanisms:
| Detection Point | Trigger | Handler Location |
|---|---|---|
| WebSocket receive error | ws_receiver.next() returns error | extensions/muxio-tokio-rpc-client/src/rpc_client.rs:186-198 |
| WebSocket stream end | ws_receiver.next() returns None | extensions/muxio-tokio-rpc-client/src/rpc_client.rs:206-220 |
| Send failure | ws_sender.send() returns error | extensions/muxio-tokio-rpc-client/src/rpc_client.rs:239-252 |
| Client drop | Drop::drop() is called | extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52 |
All detection paths eventually call shutdown_async() to ensure clean disconnection handling.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:156-221
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:224-257
Shutdown Implementation
The client implements two shutdown paths:
Asynchronous Shutdown (shutdown_async):
- Used by background tasks when detecting errors
- Acquires dispatcher lock to prevent concurrent RPC calls
- Fails all pending requests with
FrameDecodeError::ReadAfterCancel - Calls state change handler with
Disconnected - Uses
swap(false, Ordering::SeqCst)to ensure exactly-once semantics
Synchronous Shutdown (shutdown_sync):
- Used by
Dropimplementation - Cannot await, so doesn't acquire dispatcher lock
- Calls state change handler with
Disconnected - Relies on task abortion to prevent further operations
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:79-108
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:55-77
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52
Send Loop State Guard
The send loop checks connection state before attempting each send operation to prevent writing to a closed connection:
This prevents unnecessary error logging and ensures clean shutdown when disconnection has been detected by another task.
Sources:
graph TB
subgraph "WASM Client State Management"
JSInit["JavaScript: new WebSocket()"]
WasmNew["RpcWasmClient::new()"]
InitState["Initialize is_connected: false"]
JSOnOpen["JavaScript: onopen event"]
HandleConnect["handle_connect()"]
SetConnected["is_connected.store(true)"]
NotifyConnect["Call handler(Connected)"]
JSOnMessage["JavaScript: onmessage event"]
ReadBytes["read_bytes(data)"]
ProcessRPC["Process RPC messages"]
JSOnCloseError["JavaScript: onclose/onerror"]
HandleDisconnect["handle_disconnect()"]
SwapFalse["is_connected.swap(false)"]
NotifyDisconnect["Call handler(Disconnected)"]
FailPending["fail_all_pending_requests()"]
JSInit --> WasmNew
WasmNew --> InitState
JSOnOpen --> HandleConnect
HandleConnect --> SetConnected
SetConnected --> NotifyConnect
JSOnMessage --> ReadBytes
ReadBytes --> ProcessRPC
JSOnCloseError --> HandleDisconnect
HandleDisconnect --> SwapFalse
SwapFalse --> NotifyDisconnect
SwapFalse --> FailPending
end
WASM Client State Management
The RpcWasmClient provides explicit methods for JavaScript code to manage connection state since WASM cannot directly observe WebSocket events.
Sources:
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:26-35
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:37-44
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:123-134
JavaScript Integration Points
The WASM client expects JavaScript glue code to call specific methods at WebSocket lifecycle events:
| WebSocket Event | WASM Method | Purpose |
|---|---|---|
onopen | handle_connect() | Set connected state, notify handler |
onmessage | read_bytes(data) | Process incoming RPC messages |
onclose | handle_disconnect() | Set disconnected state, cancel requests |
onerror | handle_disconnect() | Same as onclose |
Sources:
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:37-44
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:46-121
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:123-134
Initial Disconnected State
Unlike RpcClient, which starts connected after new() succeeds, RpcWasmClient::new() initializes with is_connected: false because:
- Constructor cannot know if JavaScript has established a WebSocket connection
- Connection state is entirely controlled by JavaScript
- Prevents race conditions if RPC calls occur before
handle_connect()
Sources:
Disconnection Handling
When handle_disconnect() is called:
- Checks if already disconnected using
swap(false, Ordering::SeqCst)to ensure exactly-once handling - If transitioning from connected to disconnected:
- Calls state change handler with
Disconnectedstate - Acquires dispatcher lock
- Fails all pending requests with
FrameDecodeError::ReadAfterCancel
- Calls state change handler with
This ensures that any RPC calls in progress are properly terminated and their futures resolve with errors rather than hanging indefinitely.
Sources:
sequenceDiagram
participant App as "Application Code"
participant Client as "Transport Client"
participant Dispatcher as "RpcDispatcher"
participant Request as "Pending Request Future"
App->>Client: RpcMethod::call()
Client->>Dispatcher: register_request()
Dispatcher->>Request: Create future (pending)
Note over Client: Connection error detected
Client->>Client: is_connected.swap(false)
Client->>Dispatcher: fail_all_pending_requests(error)
Dispatcher->>Request: Resolve with error
Request-->>App: Err(RpcServiceError::...)
Pending Request Cancellation
When a transport disconnects, all pending RPC requests must be cancelled to prevent application code from waiting indefinitely. Both implementations use the dispatcher's fail_all_pending_requests() method.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:100-103
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:130-133
Cancellation Error Type
Pending requests are failed with FrameDecodeError::ReadAfterCancel, which propagates through the error handling chain as RpcServiceError::Transport. This error type specifically indicates that the request was cancelled due to connection closure rather than a protocol error or application error.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs102
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs131
Race Condition Prevention
The implementations use swap() to atomically transition state and check the previous value:
This ensures that fail_all_pending_requests() is called exactly once even if multiple tasks detect disconnection simultaneously.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs61
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs85
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs125
Testing State Management
The codebase includes comprehensive tests for state management behavior:
Connection Failure Test
test_client_errors_on_connection_failure verifies that attempting to connect to a non-existent server produces an appropriate error rather than hanging or panicking.
Sources:
State Change Handler Test
test_transport_state_change_handler validates the complete state change callback lifecycle:
- Handler registered after connection established
- Receives immediate
Connectednotification - Server closes connection
- Handler receives
Disconnectednotification - States collected in correct order:
[Connected, Disconnected]
Sources:
Pending Request Cancellation Test
test_pending_requests_fail_on_disconnect ensures that in-flight RPC calls are properly cancelled when connection closes:
- Client connects successfully
- RPC call spawned in background task (becomes pending)
- Server closes connection (triggered by test signal)
- Client detects disconnection
- Pending RPC call resolves with cancellation error
This test demonstrates the critical timing where the request must become pending in the dispatcher before disconnection occurs.
Sources:
Mock Client Implementation
The test suite includes MockRpcClient implementations that provide minimal state tracking for testing higher-level components without requiring actual network connections.
Sources:
Thread Safety Considerations
State management uses lock-free atomic operations where possible to minimize contention:
| Component | Synchronization Primitive | Access Pattern |
|---|---|---|
is_connected | Arc<AtomicBool> | High-frequency reads, rare writes |
state_change_handler | Arc<StdMutex<Option<...>>> | Rare reads (on state change), rare writes (handler registration) |
dispatcher | Arc<TokioMutex<...>> | Moderate frequency for RPC operations |
The combination of atomics for frequent checks and mutexes for infrequent handler invocation provides good performance while maintaining correctness.
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32
- extensions/muxio-wasm-rpc-client/src/rpc_wasm_client.rs:17-24
Emit Function State Check
The get_emit_fn() implementation checks connection state before sending data to prevent writing to closed connections:
This guard is essential because the emit function may be called from any task after disconnection has been detected by another task.
Sources:
Dismiss
Refresh this wiki
Enter email to refresh