Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Tokio RPC Client

Loading…

Tokio RPC Client

Relevant source files

Purpose and Scope

This document describes the muxio-tokio-rpc-client crate, which provides a Tokio-based native RPC client implementation for connecting to muxio WebSocket servers. The client uses tokio-tungstenite for WebSocket transport and implements the platform-agnostic RpcServiceCallerInterface trait to enable type-safe RPC calls.

For server-side implementation details, see Tokio RPC Server. For the browser-based WASM client alternative, see WASM RPC Client. For general connection lifecycle concepts applicable to both platforms, see Connection Lifecycle and State Management.

Sources : extensions/muxio-tokio-rpc-client/Cargo.toml:1-31 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:1-336


Architecture Overview

The RpcClient is built on Tokio’s asynchronous runtime and manages a persistent WebSocket connection to an muxio server. It encapsulates bidirectional RPC functionality through two primary abstractions:

  • Client-side calling : Via the RpcServiceCallerInterface trait, enabling outbound RPC invocations
  • Server-to-client calling : Via an embedded RpcServiceEndpoint, enabling the server to invoke methods on the client
graph TB
    subgraph "RpcClient Structure"
        CLIENT["RpcClient"]
DISPATCHER["Arc<TokioMutex<RpcDispatcher>>\nRequest correlation & routing"]
ENDPOINT["Arc<RpcServiceEndpoint<()>>\nServer-to-client RPC handler"]
TX["mpsc::UnboundedSender<WsMessage>\nInternal message queue"]
STATE_HANDLER["Arc<StdMutex<Option<Box<dyn Fn>>>>\nState change callback"]
IS_CONNECTED["Arc<AtomicBool>\nConnection state flag"]
TASK_HANDLES["Vec<JoinHandle<()>>\nBackground task handles"]
end
    
    subgraph "Background Tasks"
        HEARTBEAT["Heartbeat Task\nSends Ping every 1s"]
RECV["Receive Loop Task\nProcesses WS frames"]
SEND["Send Loop Task\nTransmits WS messages"]
end
    
    subgraph "WebSocket Layer"
        WS_STREAM["tokio_tungstenite::WebSocketStream"]
WS_SENDER["SplitSink\nWrite half"]
WS_RECEIVER["SplitStream\nRead half"]
end
    
 
   CLIENT -->|owns| DISPATCHER
 
   CLIENT -->|owns| ENDPOINT
 
   CLIENT -->|owns| TX
 
   CLIENT -->|owns| STATE_HANDLER
 
   CLIENT -->|owns| IS_CONNECTED
 
   CLIENT -->|owns| TASK_HANDLES
    
 
   TASK_HANDLES -->|contains| HEARTBEAT
 
   TASK_HANDLES -->|contains| RECV
 
   TASK_HANDLES -->|contains| SEND
    
 
   HEARTBEAT -->|sends Ping via| TX
 
   SEND -->|reads from| TX
 
   SEND -->|writes to| WS_SENDER
    
 
   RECV -->|reads from| WS_RECEIVER
 
   RECV -->|processes via| DISPATCHER
 
   RECV -->|processes via| ENDPOINT
    
 
   WS_SENDER -->|part of| WS_STREAM
 
   WS_RECEIVER -->|part of| WS_STREAM

The client spawns three background Tokio tasks to manage the WebSocket lifecycle: a heartbeat task for periodic pings, a receive loop for processing incoming WebSocket frames, and a send loop for transmitting outbound messages.

High-Level Component Structure

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:131-267


Core Components

RpcClient Structure

The RpcClient struct holds all state and resources necessary for maintaining an active WebSocket connection and processing RPC operations.

FieldTypePurpose
dispatcherArc<TokioMutex<RpcDispatcher<'static>>>Manages request ID allocation, response correlation, and stream multiplexing
endpointArc<RpcServiceEndpoint<()>>Handles server-to-client RPC method dispatch
txmpsc::UnboundedSender<WsMessage>Internal channel for queuing outbound WebSocket messages
state_change_handlerRpcTransportStateChangeHandlerUser-registered callback invoked on connection state changes
is_connectedArc<AtomicBool>Atomic flag tracking current connection status
task_handlesVec<JoinHandle<()>>Handles to the three background Tokio tasks

Type Alias : RpcTransportStateChangeHandler is defined as Arc<StdMutex<Option<Box<dyn Fn(RpcTransportState) + Send + Sync>>>>, allowing thread-safe storage and invocation of the state change callback.

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:22-32

Debug and Drop Implementations

The RpcClient implements Debug to display connection status and Drop to ensure proper cleanup:

The Drop implementation ensures that when the last Arc<RpcClient> reference is dropped, all background tasks are aborted and the state change handler is notified of disconnection.

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:34-52


Lifecycle Management

Client Creation

The RpcClient::new method establishes a WebSocket connection and initializes all client components. It uses Arc::new_cyclic to enable background tasks to hold weak references to the client, preventing circular reference cycles.

sequenceDiagram
    participant App as "Application Code"
    participant New as "RpcClient::new"
    participant WS as "tokio_tungstenite"
    participant Cyclic as "Arc::new_cyclic"
    participant Tasks as "Background Tasks"
    
    App->>New: RpcClient::new(host, port)
    New->>New: Build websocket_url
    New->>WS: connect_async(url)
    WS-->>New: WebSocketStream + Response
    New->>New: ws_stream.split()
    New->>New: mpsc::unbounded_channel()
    
    New->>Cyclic: "Arc::new_cyclic(|weak_client|)"
    Cyclic->>Tasks: Spawn heartbeat task
    Cyclic->>Tasks: Spawn receive loop task
    Cyclic->>Tasks: Spawn send loop task
    Cyclic-->>New: Arc<RpcClient>
    
    New-->>App: Ok(Arc<RpcClient>)

Connection Establishment Flow

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271

Key Initialization Steps

  1. WebSocket URL Construction : Parses the host to determine if it’s an IP address or hostname, constructing the appropriate ws:// URL (rpc_client.rs:112-115)
  2. Connection Establishment : Calls connect_async from tokio-tungstenite (rpc_client.rs:118-121)
  3. Stream Splitting : Splits the WebSocket stream into separate read and write halves (rpc_client.rs127)
  4. Channel Creation : Creates an unbounded MPSC channel for internal message passing (rpc_client.rs128)
  5. Cyclic Arc Initialization : Uses Arc::new_cyclic to allow tasks to hold weak references (rpc_client.rs131)
  6. Component Initialization : Creates RpcDispatcher, RpcServiceEndpoint, and state tracking (rpc_client.rs:132-136)
  7. Task Spawning : Spawns the three background tasks (rpc_client.rs:139-257)

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271

Shutdown Mechanisms

The client provides both synchronous and asynchronous shutdown paths to handle different termination scenarios.

Shutdown Flow Comparison

Aspectshutdown_sync()shutdown_async()
ContextCalled from Drop implementationCalled from background tasks on error
LockingUses StdMutex::lock() (blocking)Uses TokioMutex::lock().await (async)
DispatcherDoes not acquire dispatcher lockAcquires dispatcher lock to fail pending requests
Pending RequestsLeft in dispatcher (client is dropping)Explicitly failed with ReadAfterCancel error
State HandlerInvokes with Disconnected if connectedInvokes with Disconnected if connected

Synchronous Shutdown (rpc_client.rs:56-77):

  • Used when the client is being dropped
  • Checks is_connected and swaps to false atomically
  • Invokes the state change handler if connected
  • Does not fail pending requests (client is being destroyed)

Asynchronous Shutdown (rpc_client.rs:79-108):

  • Used when background tasks detect connection errors
  • Swaps is_connected to false atomically
  • Acquires the dispatcher lock asynchronously
  • Calls fail_all_pending_requests with FrameDecodeError::ReadAfterCancel
  • Invokes the state change handler if connected

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:56-108


graph LR
    subgraph "Task Lifecycle"
        SPAWN["Arc::new_cyclic\ncreates Weak references"]
HEARTBEAT_TASK["Heartbeat Task"]
RECV_TASK["Receive Loop Task"]
SEND_TASK["Send Loop Task"]
SPAWN --> HEARTBEAT_TASK
 
       SPAWN --> RECV_TASK
 
       SPAWN --> SEND_TASK
    end
    
    subgraph "Shared State"
        APP_TX["mpsc::UnboundedSender\nMessage queue"]
WS_SENDER["WebSocket write half"]
WS_RECEIVER["WebSocket read half"]
WEAK_CLIENT["Weak<RpcClient>\nUpgradable reference"]
end
    
 
   HEARTBEAT_TASK -->|send Ping| APP_TX
 
   SEND_TASK -->|recv| APP_TX
 
   SEND_TASK -->|send msg| WS_SENDER
 
   RECV_TASK -->|next| WS_RECEIVER
 
   RECV_TASK -->|upgrade| WEAK_CLIENT
 
   SEND_TASK -->|upgrade| WEAK_CLIENT

Background Tasks

The client spawns three independent Tokio tasks during initialization, each serving a specific role in the connection lifecycle.

Task Architecture

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:131-267

Heartbeat Task

The heartbeat task generates periodic ping messages to maintain connection liveness and detect silent disconnections.

Implementation (rpc_client.rs:139-154):

Behavior :

  • Ticks every 1 second using tokio::time::interval
  • Sends WsMessage::Ping to the internal message queue
  • Exits when the channel is closed (client dropped)

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:139-154

Receive Loop Task

The receive loop processes incoming WebSocket frames and routes them through the RPC dispatcher and endpoint.

Implementation (rpc_client.rs:157-222):

Frame Processing :

Message TypeAction
BinaryLock dispatcher, call endpoint.read_bytes() to process RPC frames
PingSend Pong response via internal channel
PongLog and ignore (response to our heartbeat pings)
TextLog and ignore (protocol uses binary frames only)
CloseBreak loop (connection closed)
ErrorSpawn shutdown_async() task and break loop

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:157-222

Send Loop Task

The send loop drains the internal message queue and transmits messages to the WebSocket.

Implementation (rpc_client.rs:224-257):

Behavior :

  • Receives messages from the MPSC channel
  • Checks is_connected before sending (prevents sending after disconnect signal)
  • Uses Ordering::Acquire for memory synchronization with the shutdown path
  • Spawns shutdown_async() on send errors
  • Exits when the channel is closed or disconnected

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:224-257


Connection State Management

is_connected Flag

The is_connected field is an Arc<AtomicBool> that tracks the current connection status. It uses atomic operations to ensure thread-safe updates across multiple tasks.

Memory Ordering :

  • Ordering::SeqCst for swapping in shutdown paths (strongest guarantee)
  • Ordering::Acquire in send loop for reading (synchronizes with shutdown writes)
  • Ordering::Relaxed for general reads (no synchronization needed)

State Transitions :

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs30 extensions/muxio-tokio-rpc-client/src/rpc_client.rs61 extensions/muxio-tokio-rpc-client/src/rpc_client.rs85

State Change Handlers

Applications can register a state change handler to be notified of connection and disconnection events.

Handler Registration (rpc_client.rs:315-334):

Invocation Points :

  1. Connected : Immediately after setting the handler if already connected
  2. Disconnected : In shutdown_sync() or shutdown_async() when connection ends

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334


graph TB
    subgraph "RpcServiceCallerInterface Trait"
        GET_DISPATCHER["get_dispatcher()\nReturns Arc&lt;TokioMutex&lt;RpcDispatcher&gt;&gt;"]
IS_CONNECTED["is_connected()\nReturns bool"]
GET_EMIT_FN["get_emit_fn()\nReturns Arc&lt;dyn Fn(Vec&lt;u8&gt;)&gt;"]
SET_STATE_HANDLER["set_state_change_handler()\nRegisters callback"]
end
    
    subgraph "RpcClient Implementation"
        IMPL_GET_DISP["Clone dispatcher Arc"]
IMPL_IS_CONN["Load is_connected atomic"]
IMPL_EMIT["Closure sending to tx channel"]
IMPL_SET_STATE["Store handler, invoke if connected"]
end
    
 
   GET_DISPATCHER --> IMPL_GET_DISP
 
   IS_CONNECTED --> IMPL_IS_CONN
 
   GET_EMIT_FN --> IMPL_EMIT
 
   SET_STATE_HANDLER --> IMPL_SET_STATE

RpcServiceCallerInterface Implementation

The RpcClient implements the platform-agnostic RpcServiceCallerInterface trait, enabling it to be used with shared RPC service definitions.

Trait Methods Implementation

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:278-335

get_dispatcher

Returns a clone of the Arc<TokioMutex<RpcDispatcher>>, allowing RPC call implementations to access the dispatcher for request correlation and stream management.

Implementation (rpc_client.rs:280-282):

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:280-282

is_connected

Returns the current connection status by loading the atomic boolean with relaxed ordering.

Implementation (rpc_client.rs:284-286):

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286

get_emit_fn

Returns a closure that captures the internal message channel and is_connected flag. This closure is used by the RPC dispatcher to emit binary frames for transmission.

Implementation (rpc_client.rs:289-313):

Behavior :

  • Checks is_connected before sending (prevents writes after disconnect)
  • Converts Vec<u8> to WsMessage::Binary
  • Sends to the internal MPSC channel (non-blocking unbounded send)
  • Ignores send errors (channel closed means client is dropping)

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313


WebSocket Transport Integration

tokio-tungstenite

The client uses tokio-tungstenite for WebSocket protocol implementation over Tokio’s async I/O.

Dependencies (Cargo.toml16):

Connection Flow :

  1. URL Construction : Builds ws://host:port/ws URL string
  2. Async Connect : Calls connect_async(&websocket_url) which returns (WebSocketStream, Response)
  3. Stream Split : Calls ws_stream.split() to obtain (SplitSink, SplitStream)
  4. Task Distribution : Distributes the sink to send loop, stream to receive loop

Sources : extensions/muxio-tokio-rpc-client/Cargo.toml16 extensions/muxio-tokio-rpc-client/src/rpc_client.rs19 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:118-127

Binary Frame Processing

All RPC communication uses WebSocket binary frames. The receive loop processes binary frames by passing them to the endpoint for decoding.

Binary Frame Flow :

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:163-178

Ping/Pong Handling

The client automatically responds to server pings and sends periodic pings via the heartbeat task.

Ping/Pong Matrix :

DirectionMessage TypeSenderHandler
Client → ServerPingHeartbeat task (1s interval)Server responds with Pong
Server → ClientPingServerReceive loop sends Pong
Server → ClientPongServer (response to our Ping)Receive loop logs and ignores

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:139-154 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:179-182


Error Handling and Cleanup

Error Sources

The client handles errors from multiple sources:

Error SourceHandling StrategyCleanup Action
connect_async failureReturn io::Error from new()No cleanup needed (not created)
WebSocket receive errorSpawn shutdown_async(), break receive loopFail pending requests, notify handler
WebSocket send errorSpawn shutdown_async(), break send loopFail pending requests, notify handler
MPSC channel closedBreak task loopTask exits naturally
Client dropAbort tasks, call shutdown_sync()Notify handler, abort background tasks

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:118-121 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:186-198 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:239-253

Pending Request Cleanup

When the connection is lost, all pending RPC requests in the dispatcher must be failed to prevent callers from waiting indefinitely.

Cleanup Flow (rpc_client.rs:100-103):

Error Propagation :

  1. Dispatcher lock is acquired (prevents new requests)
  2. fail_all_pending_requests is called with ReadAfterCancel error
  3. All pending request callbacks receive RpcServiceError::Transport
  4. Waiting futures resolve with error

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:100-103

Task Abort on Drop

The Drop implementation ensures clean shutdown even if the client is dropped while background tasks are running.

Drop Sequence (rpc_client.rs:42-52):

  1. Iterate over task_handles and call abort() on each
  2. Call shutdown_sync() to notify state handlers
  3. Background tasks receive abort signal and terminate

Abort Safety : Aborting tasks is safe because:

  • The receive loop holds a weak reference (won’t prevent drop)
  • The send loop checks is_connected before sending
  • The heartbeat task only sends pings (no critical state)
  • All critical state is owned by the RpcClient itself

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52


Usage Examples

Basic Connection and Call

This example demonstrates:

  • Creating a client with RpcClient::new(host, port)
  • Making a prebuffered RPC call via the trait method
  • The client is automatically cleaned up when dropped

Sources : Example pattern from integration tests

State Change Handler Registration

The handler is invoked:

  • Immediately with Connected if already connected
  • With Disconnected when the connection is lost or client is dropped

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334

Server-to-Client RPC

The client can also handle incoming RPC calls from the server by registering handlers on its embedded endpoint.

This demonstrates bidirectional RPC:

  • The client can call server methods via RpcServiceCallerInterface
  • The server can call client methods via the registered endpoint handlers

Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:273-275


Testing

The crate includes comprehensive integration tests covering connection lifecycle, error handling, and state management.

Test Coverage

TestFilePurpose
test_client_errors_on_connection_failuretests/transport_state_tests.rs:16-31Verifies connection errors are returned properly
test_transport_state_change_handlertests/transport_state_tests.rs:34-165Validates state handler invocations
test_pending_requests_fail_on_disconnecttests/transport_state_tests.rs:167-292Ensures pending requests fail on disconnect

Sources : extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:1-293

Mock Implementations

The muxio-rpc-service-caller crate tests include mock client implementations demonstrating the trait contract:

MockRpcClient Structure (dynamic_channel_tests.rs:19-88):

  • Implements RpcServiceCallerInterface for testing
  • Uses Arc<Mutex<Option<DynamicSender>>> to provide response senders
  • Demonstrates dynamic channel handling (bounded/unbounded)
  • Uses Arc<AtomicBool> for connection state simulation

Sources : extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:15-88