Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Tokio RPC Client

Relevant source files

Purpose and Scope

This page documents the muxio-tokio-rpc-client crate, which provides a production-ready WebSocket client implementation for native Rust applications using the Tokio async runtime. This client connects to servers running muxio-tokio-rpc-server and enables type-safe RPC communication through shared service definitions.

Related Pages:


Architecture Overview

The RpcClient is a fully-featured WebSocket client that manages connection lifecycle, message routing, and RPC request/response correlation. It implements the RpcServiceCallerInterface trait, providing the same API as the WASM client while using native Tokio primitives.

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:131-267

graph TB
    subgraph "Client Application"
        APP["Application Code"]
end
    
    subgraph "RpcClient Structure"
        CLIENT["RpcClient"]
DISPATCHER["Arc<TokioMutex<RpcDispatcher>>"]
ENDPOINT["Arc<RpcServiceEndpoint<()>>"]
TX["mpsc::UnboundedSender<WsMessage>"]
STATE_HANDLER["RpcTransportStateChangeHandler"]
IS_CONNECTED["Arc<AtomicBool>"]
end
    
    subgraph "Background Tasks"
        HEARTBEAT["Heartbeat Task\n(JoinHandle)"]
RECV_LOOP["Receive Loop\n(JoinHandle)"]
SEND_LOOP["Send Loop\n(JoinHandle)"]
end
    
    subgraph "WebSocket Connection"
        WS_SENDER["ws_sender\n(SplitSink)"]
WS_RECEIVER["ws_receiver\n(SplitStream)"]
WS_STREAM["tokio_tungstenite::WebSocketStream"]
end
    
 
   APP --> CLIENT
 
   CLIENT --> DISPATCHER
 
   CLIENT --> ENDPOINT
 
   CLIENT --> TX
 
   CLIENT --> STATE_HANDLER
 
   CLIENT --> IS_CONNECTED
    
 
   CLIENT --> HEARTBEAT
 
   CLIENT --> RECV_LOOP
 
   CLIENT --> SEND_LOOP
    
 
   HEARTBEAT --> TX
 
   TX --> SEND_LOOP
 
   SEND_LOOP --> WS_SENDER
    
 
   WS_RECEIVER --> RECV_LOOP
 
   RECV_LOOP --> DISPATCHER
 
   RECV_LOOP --> ENDPOINT
    
 
   WS_SENDER --> WS_STREAM
 
   WS_RECEIVER --> WS_STREAM

Core Components

RpcClient Structure

The RpcClient struct is the primary type exposed by this crate. It owns all connection resources and background tasks.

FieldTypePurpose
dispatcherArc<TokioMutex<RpcDispatcher<'static>>>Manages RPC request correlation and frame encoding/decoding
endpointArc<RpcServiceEndpoint<()>>Handles server-to-client RPC calls (bidirectional support)
txmpsc::UnboundedSender<WsMessage>Channel for sending messages to WebSocket
state_change_handlerRpcTransportStateChangeHandlerOptional callback for connection state changes
is_connectedArc<AtomicBool>Atomic flag tracking connection status
task_handlesVec<JoinHandle<()>>Handles for background tasks (aborted on drop)

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32

RpcServiceCallerInterface Implementation

The client implements the RpcServiceCallerInterface trait, which defines the contract for making RPC calls.

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:278-335

graph LR
    subgraph "Trait Methods"
        GET_DISPATCHER["get_dispatcher()"]
IS_CONNECTED["is_connected()"]
GET_EMIT["get_emit_fn()"]
SET_HANDLER["set_state_change_handler()"]
end
    
    subgraph "RpcClient Implementation"
        RETURN_DISP["Returns Arc<TokioMutex<RpcDispatcher>>"]
CHECK_FLAG["Checks Arc<AtomicBool>"]
CREATE_CLOSURE["Creates closure wrapping mpsc::Sender"]
STORE_CB["Stores callback in Arc<StdMutex>"]
end
    
 
   GET_DISPATCHER --> RETURN_DISP
 
   IS_CONNECTED --> CHECK_FLAG
 
   GET_EMIT --> CREATE_CLOSURE
 
   SET_HANDLER --> STORE_CB

Connection Lifecycle

Connection Establishment

The RpcClient::new() function establishes a WebSocket connection and spawns background tasks using Arc::new_cyclic() to enable weak references.

Connection URL Construction:

sequenceDiagram
    participant App as "Application"
    participant New as "RpcClient::new()"
    participant WS as "tokio_tungstenite"
    participant Tasks as "Background Tasks"
    
    App->>New: new(host, port)
    New->>New: construct websocket_url
    New->>WS: connect_async(url)
    WS-->>New: WebSocketStream + Response
    New->>New: split stream into sender/receiver
    New->>New: create mpsc channel
    New->>New: Arc::new_cyclic(closure)
    New->>Tasks: spawn heartbeat task
    New->>Tasks: spawn receive loop
    New->>Tasks: spawn send loop
    New-->>App: Arc<RpcClient>
  • If host parses as IpAddr: ws://{ip}:{port}/ws
  • Otherwise: ws://{host}:{port}/ws

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271

Background Tasks

The client spawns three concurrent tasks that run for the lifetime of the connection:

1. Heartbeat Task

Sends periodic WebSocket ping frames to keep the connection alive and detect disconnections.

  • Interval: 1 second
  • Payload: Empty ping message
  • Exit condition: Channel send failure

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:139-154

graph TB
    START["ws_receiver.next().await"]
MATCH["Match message type"]
BINARY["WsMessage::Binary"]
PING["WsMessage::Ping"]
OTHER["Other messages"]
ERROR["Err(e)"]
LOCK_DISP["Lock dispatcher"]
READ_BYTES["endpoint.read_bytes()"]
SEND_PONG["Send pong response"]
LOG["Log message"]
SHUTDOWN["Call shutdown_async()"]
START --> MATCH
 
   MATCH --> BINARY
 
   MATCH --> PING
 
   MATCH --> OTHER
 
   MATCH --> ERROR
    
 
   BINARY --> LOCK_DISP
 
   LOCK_DISP --> READ_BYTES
    
 
   PING --> SEND_PONG
 
   OTHER --> LOG
 
   ERROR --> SHUTDOWN

2. Receive Loop

Processes incoming WebSocket messages and routes them to the appropriate handlers.

Message Handling:

  • Binary: Decoded by RpcDispatcher and processed by RpcServiceEndpoint
  • Ping: Automatically responds with Pong
  • Pong: Logged (heartbeat responses)
  • Error: Triggers shutdown_async()

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:156-222

3. Send Loop

Drains the internal MPSC channel and transmits messages over the WebSocket.

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:224-257

Connection Shutdown

Shutdown can occur synchronously (on Drop) or asynchronously (on connection errors).

Synchronous Shutdown (shutdown_sync)

Called from Drop implementation. Uses swap to ensure single execution.

Process:

  1. Swap is_connected flag to false
  2. If previously true, invoke state change handler with Disconnected
  3. Abort all background tasks

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:55-77 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52

Asynchronous Shutdown (shutdown_async)

Called from background tasks on connection errors.

Process:

  1. Swap is_connected flag to false
  2. If previously true, invoke state change handler with Disconnected
  3. Acquire dispatcher lock
  4. Call fail_all_pending_requests() with FrameDecodeError::ReadAfterCancel

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:79-108


Message Flow

Client-to-Server RPC Call

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313

sequenceDiagram
    participant WS as "WebSocket"
    participant RecvLoop as "Receive Loop Task"
    participant Dispatcher as "RpcDispatcher"
    participant Endpoint as "RpcServiceEndpoint"
    participant Handler as "Registered Handler"
    
    WS->>RecvLoop: Binary message
    RecvLoop->>Dispatcher: Lock dispatcher
    RecvLoop->>Endpoint: read_bytes(dispatcher, (), bytes, on_emit)
    Endpoint->>Dispatcher: Decode frames
    Dispatcher->>Endpoint: Route by METHOD_ID
    Endpoint->>Handler: Invoke handler
    Handler-->>Endpoint: Response bytes
    Endpoint->>RecvLoop: on_emit(response_chunk)
    RecvLoop->>WS: Send response

Server-to-Client RPC Call

The client includes an RpcServiceEndpoint to handle bidirectional RPC (server calling client).

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:164-177 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:273-275


State Management

Connection State Tracking

The client uses Arc<AtomicBool> for lock-free state checking and Arc<StdMutex<Option<Box<dyn Fn>>>> for the state change callback.

Stateis_connected ValueTrigger
ConnectedtrueSuccessful connect_async()
DisconnectedfalseWebSocket error, explicit shutdown, or drop

State Transition Events:

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs30 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286

graph LR
    SET["set_state_change_handler(handler)"]
LOCK["Lock state_change_handler mutex"]
STORE["Store Box<dyn Fn> in Option"]
CHECK["Check is_connected flag"]
CALL_INIT["Call handler(Connected)"]
SET --> LOCK
 
   LOCK --> STORE
 
   STORE --> CHECK
 
   CHECK -->|true| CALL_INIT

State Change Handler

Applications can register a callback to be notified of connection state changes.

Handler Registration:

Important: If the client is already connected when the handler is set, it immediately invokes the handler with RpcTransportState::Connected.

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334


Error Handling and Disconnection

graph TB
    ERROR["Connection Error Detected"]
SHUTDOWN["shutdown_async()
called"]
LOCK["Acquire dispatcher lock"]
FAIL["dispatcher.fail_all_pending_requests()"]
NOTIFY["Waiting futures receive error"]
ERROR --> SHUTDOWN
 
   SHUTDOWN --> LOCK
 
   LOCK --> FAIL
 
   FAIL --> NOTIFY

Pending Request Cancellation

When the connection is lost, all pending RPC requests are failed with FrameDecodeError::ReadAfterCancel.

Cancellation Flow:

Error Propagation:

  • ReadAfterCancelRpcServiceError::TransportError → Application receives Err

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:100-103

Send Failure Handling

If the send loop cannot transmit a message, it triggers shutdown.

Failure Scenarios:

  1. WebSocket send returns error → shutdown_async()
  2. Channel closed (receiver dropped) → Loop exits
  3. is_connected flag is false → Message dropped, loop exits

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:231-253

Receive Failure Handling

If the receive loop encounters an error, it triggers shutdown and exits.

Failure Scenarios:

  1. WebSocket receive returns error → shutdown_async(), break loop
  2. Stream ends (None from next()) → shutdown_async(), exit loop

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:186-199 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:205-220


Usage Patterns

Basic Client Creation

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271

Making RPC Calls

The client implements RpcServiceCallerInterface, enabling use with any RpcMethodPrebuffered implementation:

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:278-335

Monitoring Connection State

Register a handler to track connection lifecycle:

Handler Invocation:

  • Called immediately with Connected if client is already connected
  • Called with Disconnected on any connection loss
  • Called from shutdown paths (both sync and async)

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334 extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:36-165

Bidirectional RPC (Server-to-Client Calls)

The client can handle RPC calls initiated by the server:

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:273-275


Implementation Details

Weak Reference Pattern

The client uses Arc::new_cyclic to allow background tasks to hold Weak<RpcClient> references. This prevents reference cycles while enabling tasks to access the client.

Benefits:

  • Tasks can access client methods without preventing cleanup
  • Client can be dropped while tasks are running
  • Tasks gracefully exit when client is dropped

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs131 extensions/muxio-tokio-rpc-client/src/rpc_client.rs157 extensions/muxio-tokio-rpc-client/src/rpc_client.rs225

Lock-Free is_connected Check

The is_connected flag uses AtomicBool with Ordering::Relaxed for reads and Ordering::SeqCst for writes, enabling fast connection status checks without mutexes.

Memory Ordering:

  • Read: Relaxed - No synchronization needed, flag is only hint
  • Write: SeqCst - Strong ordering for state transitions
  • Swap: SeqCst - Ensure single shutdown execution

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs30 extensions/muxio-tokio-rpc-client/src/rpc_client.rs61 extensions/muxio-tokio-rpc-client/src/rpc_client.rs85 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286

Debug Implementation

The Debug trait is manually implemented to avoid exposing closures and function pointers, showing only the connection state.

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:34-40


Testing

Connection Failure Tests

Validates error handling when connecting to non-existent servers.

Test: test_client_errors_on_connection_failure

  • Attempts connection to unused port
  • Asserts io::ErrorKind::ConnectionRefused

Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:17-31

State Change Handler Tests

Validates state change callbacks are invoked correctly during connection lifecycle.

Test: test_transport_state_change_handler

  • Spawns minimal WebSocket server
  • Registers state change handler
  • Verifies Connected callback
  • Server closes connection
  • Verifies Disconnected callback

Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:36-165

Pending Request Cancellation Tests

Validates that in-flight RPC requests fail when connection is lost.

Test: test_pending_requests_fail_on_disconnect

  • Spawns server that accepts but doesn't respond
  • Initiates RPC call (becomes pending)
  • Server closes connection
  • Asserts RPC call fails with cancellation error

Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:169-292

Dynamic Channel Tests

Validates streaming RPC functionality with bounded and unbounded channels.

Tests:

  • test_dynamic_channel_bounded
  • test_dynamic_channel_unbounded

Sources: extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:101-167


Dependencies

CratePurpose
tokioAsync runtime with "full" features
tokio-tungsteniteWebSocket protocol implementation
futures-utilStream/sink utilities for WebSocket splitting
async-traitTrait async method support
muxioCore RPC dispatcher and framing
muxio-rpc-serviceService trait definitions
muxio-rpc-service-callerCaller interface trait
muxio-rpc-service-endpointServer-to-client RPC handling

Sources: extensions/muxio-tokio-rpc-client/Cargo.toml:11-22

Dismiss

Refresh this wiki

Enter email to refresh