Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Transport State Management

Relevant source files

Purpose and Scope

This document explains how transport implementations track connection state, handle disconnection events, and notify application code through state change callbacks. All concrete transport implementations (RpcClient for Tokio and RpcWasmClient for WASM) provide consistent state management interfaces defined by the RpcServiceCallerInterface trait.

For information about the overall RPC caller interface and making RPC calls, see Service Caller Interface. For implementation details specific to each transport, see Tokio RPC Client and WASM RPC Client.

Transport State Types

The system defines connection state through the RpcTransportState enum, which represents the binary connection status of a transport.

Sources:

stateDiagram-v2
    [*] --> Disconnected : Initial State
    Disconnected --> Connected : Connection Established
    Connected --> Disconnected : Connection Lost/Closed
    Disconnected --> [*] : Client Dropped

RpcTransportState Variants

VariantDescriptionUsage
ConnectedTransport has an active connectionAllows RPC calls to proceed
DisconnectedTransport connection is closed or failedBlocks new RPC calls, cancels pending requests

The state is exposed through the is_connected() method on RpcServiceCallerInterface, which returns a boolean indicating current connection status.

Sources:

State Tracking Mechanism

Both transport implementations use Arc<AtomicBool> for thread-safe state tracking. This allows state to be checked and modified atomically across multiple concurrent tasks without requiring locks.

Sources:

graph TB
    subgraph "RpcClient (Tokio)"
        RpcClient["RpcClient"]
TokioAtomicBool["is_connected: Arc&lt;AtomicBool&gt;"]
RpcClient --> TokioAtomicBool
    end
    
    subgraph "RpcWasmClient (WASM)"
        RpcWasmClient["RpcWasmClient"]
WasmAtomicBool["is_connected: Arc&lt;AtomicBool&gt;"]
RpcWasmClient --> WasmAtomicBool
    end
    
    subgraph "RpcServiceCallerInterface"
        IsConnected["is_connected() -> bool"]
GetEmitFn["get_emit_fn()
checks state"]
end
    
    TokioAtomicBool -.reads.-> IsConnected
    WasmAtomicBool -.reads.-> IsConnected
    TokioAtomicBool -.reads.-> GetEmitFn
    WasmAtomicBool -.reads.-> GetEmitFn

Atomic Ordering Semantics

The implementations use specific memory ordering for different operations:

OperationOrderingRationale
Initial state checkRelaxedNon-critical reads for logging
State transition (swap)SeqCstStrong guarantee for state transitions
Send loop state checkAcquireSynchronizes with state changes
State storeSeqCstEnsures visibility to all threads

Sources:

State Change Callbacks

Transport implementations allow application code to register callbacks that execute when connection state changes. This enables reactive patterns where applications can update UI, retry connections, or clean up resources.

Sources:

sequenceDiagram
    participant App as "Application Code"
    participant Client as "RpcClient/RpcWasmClient"
    participant Handler as "State Change Handler"
    participant Transport as "Underlying Transport"
    
    App->>Client: set_state_change_handler(callback)
    Client->>Client: Store handler in Arc&lt;Mutex&lt;Option&lt;Box&lt;...&gt;&gt;&gt;&gt;
    
    alt Already Connected
        Client->>Handler: callback(RpcTransportState::Connected)
        Handler->>App: Initial state notification
    end
    
    Transport->>Client: Connection closed/error
    Client->>Client: is_connected.swap(false)
    Client->>Handler: callback(RpcTransportState::Disconnected)
    Handler->>App: Disconnection notification

Handler Registration

The set_state_change_handler() method is defined by RpcServiceCallerInterface and implemented by both transport types:

async fn set_state_change_handler(
    &self,
    handler: impl Fn(RpcTransportState) + Send + Sync + 'static,
)

The handler is stored as Arc<StdMutex<Option<Box<dyn Fn(RpcTransportState) + Send + Sync>>>>, allowing it to be:

  • Shared across multiple tasks via Arc
  • Safely mutated when setting/clearing via Mutex
  • Called from any thread via Send + Sync bounds
  • Dynamically replaced via Option

Sources:

Initial State Notification

Both implementations immediately call the handler with RpcTransportState::Connected if already connected when the handler is registered. This ensures application code receives the current state without waiting for a transition.

Sources:

graph TB
    subgraph "RpcClient State Lifecycle"
        Constructor["RpcClient::new()"]
CreateState["Create is_connected: AtomicBool(true)"]
SpawnTasks["Spawn receive/send/heartbeat tasks"]
ReceiveLoop["Receive Loop Task"]
SendLoop["Send Loop Task"]
Heartbeat["Heartbeat Task"]
ErrorDetect["Error Detection"]
ShutdownAsync["shutdown_async()"]
ShutdownSync["shutdown_sync() (Drop)"]
FailPending["fail_all_pending_requests()"]
CallHandler["Call state_change_handler(Disconnected)"]
Constructor --> CreateState
 
       CreateState --> SpawnTasks
 
       SpawnTasks --> ReceiveLoop
 
       SpawnTasks --> SendLoop
 
       SpawnTasks --> Heartbeat
        
 
       ReceiveLoop --> ErrorDetect
 
       SendLoop --> ErrorDetect
 
       ErrorDetect --> ShutdownAsync
 
       ShutdownAsync --> CallHandler
 
       ShutdownAsync --> FailPending
        
 
       ShutdownSync --> CallHandler
    end

Tokio Client State Management

The RpcClient manages connection state across multiple concurrent tasks: receive loop, send loop, and heartbeat task.

Sources:

Connection Establishment

When RpcClient::new() successfully connects:

  1. WebSocket connection is established at extensions/muxio-tokio-rpc-client/src/rpc_client.rs:118-125
  2. is_connected is initialized to true at extensions/muxio-tokio-rpc-client/src/rpc_client.rs134
  3. Background tasks are spawned (receive, send, heartbeat) at extensions/muxio-tokio-rpc-client/src/rpc_client.rs:137-257
  4. If a state change handler is later set, it immediately receives Connected notification

Sources:

Disconnection Detection

The client detects disconnection through multiple mechanisms:

Detection PointTriggerHandler Location
WebSocket receive errorws_receiver.next() returns errorextensions/muxio-tokio-rpc-client/src/rpc_client.rs:186-198
WebSocket stream endws_receiver.next() returns Noneextensions/muxio-tokio-rpc-client/src/rpc_client.rs:206-220
Send failurews_sender.send() returns errorextensions/muxio-tokio-rpc-client/src/rpc_client.rs:239-252
Client dropDrop::drop() is calledextensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52

All detection paths eventually call shutdown_async() to ensure clean disconnection handling.

Sources:

Shutdown Implementation

The client implements two shutdown paths:

Asynchronous Shutdown (shutdown_async):

  • Used by background tasks when detecting errors
  • Acquires dispatcher lock to prevent concurrent RPC calls
  • Fails all pending requests with FrameDecodeError::ReadAfterCancel
  • Calls state change handler with Disconnected
  • Uses swap(false, Ordering::SeqCst) to ensure exactly-once semantics

Synchronous Shutdown (shutdown_sync):

  • Used by Drop implementation
  • Cannot await, so doesn't acquire dispatcher lock
  • Calls state change handler with Disconnected
  • Relies on task abortion to prevent further operations

Sources:

Send Loop State Guard

The send loop checks connection state before attempting each send operation to prevent writing to a closed connection:

This prevents unnecessary error logging and ensures clean shutdown when disconnection has been detected by another task.

Sources:

graph TB
    subgraph "WASM Client State Management"
        JSInit["JavaScript: new WebSocket()"]
WasmNew["RpcWasmClient::new()"]
InitState["Initialize is_connected: false"]
JSOnOpen["JavaScript: onopen event"]
HandleConnect["handle_connect()"]
SetConnected["is_connected.store(true)"]
NotifyConnect["Call handler(Connected)"]
JSOnMessage["JavaScript: onmessage event"]
ReadBytes["read_bytes(data)"]
ProcessRPC["Process RPC messages"]
JSOnCloseError["JavaScript: onclose/onerror"]
HandleDisconnect["handle_disconnect()"]
SwapFalse["is_connected.swap(false)"]
NotifyDisconnect["Call handler(Disconnected)"]
FailPending["fail_all_pending_requests()"]
JSInit --> WasmNew
 
       WasmNew --> InitState
        
 
       JSOnOpen --> HandleConnect
 
       HandleConnect --> SetConnected
 
       SetConnected --> NotifyConnect
        
 
       JSOnMessage --> ReadBytes
 
       ReadBytes --> ProcessRPC
        
 
       JSOnCloseError --> HandleDisconnect
 
       HandleDisconnect --> SwapFalse
 
       SwapFalse --> NotifyDisconnect
 
       SwapFalse --> FailPending
    end

WASM Client State Management

The RpcWasmClient provides explicit methods for JavaScript code to manage connection state since WASM cannot directly observe WebSocket events.

Sources:

JavaScript Integration Points

The WASM client expects JavaScript glue code to call specific methods at WebSocket lifecycle events:

WebSocket EventWASM MethodPurpose
onopenhandle_connect()Set connected state, notify handler
onmessageread_bytes(data)Process incoming RPC messages
onclosehandle_disconnect()Set disconnected state, cancel requests
onerrorhandle_disconnect()Same as onclose

Sources:

Initial Disconnected State

Unlike RpcClient, which starts connected after new() succeeds, RpcWasmClient::new() initializes with is_connected: false because:

  • Constructor cannot know if JavaScript has established a WebSocket connection
  • Connection state is entirely controlled by JavaScript
  • Prevents race conditions if RPC calls occur before handle_connect()

Sources:

Disconnection Handling

When handle_disconnect() is called:

  1. Checks if already disconnected using swap(false, Ordering::SeqCst) to ensure exactly-once handling
  2. If transitioning from connected to disconnected:
    • Calls state change handler with Disconnected state
    • Acquires dispatcher lock
    • Fails all pending requests with FrameDecodeError::ReadAfterCancel

This ensures that any RPC calls in progress are properly terminated and their futures resolve with errors rather than hanging indefinitely.

Sources:

sequenceDiagram
    participant App as "Application Code"
    participant Client as "Transport Client"
    participant Dispatcher as "RpcDispatcher"
    participant Request as "Pending Request Future"
    
    App->>Client: RpcMethod::call()
    Client->>Dispatcher: register_request()
    Dispatcher->>Request: Create future (pending)
    
    Note over Client: Connection error detected
    
    Client->>Client: is_connected.swap(false)
    Client->>Dispatcher: fail_all_pending_requests(error)
    Dispatcher->>Request: Resolve with error
    Request-->>App: Err(RpcServiceError::...)

Pending Request Cancellation

When a transport disconnects, all pending RPC requests must be cancelled to prevent application code from waiting indefinitely. Both implementations use the dispatcher's fail_all_pending_requests() method.

Sources:

Cancellation Error Type

Pending requests are failed with FrameDecodeError::ReadAfterCancel, which propagates through the error handling chain as RpcServiceError::Transport. This error type specifically indicates that the request was cancelled due to connection closure rather than a protocol error or application error.

Sources:

Race Condition Prevention

The implementations use swap() to atomically transition state and check the previous value:

This ensures that fail_all_pending_requests() is called exactly once even if multiple tasks detect disconnection simultaneously.

Sources:

Testing State Management

The codebase includes comprehensive tests for state management behavior:

Connection Failure Test

test_client_errors_on_connection_failure verifies that attempting to connect to a non-existent server produces an appropriate error rather than hanging or panicking.

Sources:

State Change Handler Test

test_transport_state_change_handler validates the complete state change callback lifecycle:

  1. Handler registered after connection established
  2. Receives immediate Connected notification
  3. Server closes connection
  4. Handler receives Disconnected notification
  5. States collected in correct order: [Connected, Disconnected]

Sources:

Pending Request Cancellation Test

test_pending_requests_fail_on_disconnect ensures that in-flight RPC calls are properly cancelled when connection closes:

  1. Client connects successfully
  2. RPC call spawned in background task (becomes pending)
  3. Server closes connection (triggered by test signal)
  4. Client detects disconnection
  5. Pending RPC call resolves with cancellation error

This test demonstrates the critical timing where the request must become pending in the dispatcher before disconnection occurs.

Sources:

Mock Client Implementation

The test suite includes MockRpcClient implementations that provide minimal state tracking for testing higher-level components without requiring actual network connections.

Sources:

Thread Safety Considerations

State management uses lock-free atomic operations where possible to minimize contention:

ComponentSynchronization PrimitiveAccess Pattern
is_connectedArc<AtomicBool>High-frequency reads, rare writes
state_change_handlerArc<StdMutex<Option<...>>>Rare reads (on state change), rare writes (handler registration)
dispatcherArc<TokioMutex<...>>Moderate frequency for RPC operations

The combination of atomics for frequent checks and mutexes for infrequent handler invocation provides good performance while maintaining correctness.

Sources:

Emit Function State Check

The get_emit_fn() implementation checks connection state before sending data to prevent writing to closed connections:

This guard is essential because the emit function may be called from any task after disconnection has been detected by another task.

Sources:

Dismiss

Refresh this wiki

Enter email to refresh