This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Tokio RPC Client
Loading…
Tokio RPC Client
Relevant source files
- extensions/muxio-rpc-service-caller/src/lib.rs
- extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs
- extensions/muxio-tokio-rpc-client/Cargo.toml
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs
Purpose and Scope
This document describes the muxio-tokio-rpc-client crate, which provides a Tokio-based native RPC client implementation for connecting to muxio WebSocket servers. The client uses tokio-tungstenite for WebSocket transport and implements the platform-agnostic RpcServiceCallerInterface trait to enable type-safe RPC calls.
For server-side implementation details, see Tokio RPC Server. For the browser-based WASM client alternative, see WASM RPC Client. For general connection lifecycle concepts applicable to both platforms, see Connection Lifecycle and State Management.
Sources : extensions/muxio-tokio-rpc-client/Cargo.toml:1-31 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:1-336
Architecture Overview
The RpcClient is built on Tokio’s asynchronous runtime and manages a persistent WebSocket connection to an muxio server. It encapsulates bidirectional RPC functionality through two primary abstractions:
- Client-side calling : Via the
RpcServiceCallerInterfacetrait, enabling outbound RPC invocations - Server-to-client calling : Via an embedded
RpcServiceEndpoint, enabling the server to invoke methods on the client
graph TB
subgraph "RpcClient Structure"
CLIENT["RpcClient"]
DISPATCHER["Arc<TokioMutex<RpcDispatcher>>\nRequest correlation & routing"]
ENDPOINT["Arc<RpcServiceEndpoint<()>>\nServer-to-client RPC handler"]
TX["mpsc::UnboundedSender<WsMessage>\nInternal message queue"]
STATE_HANDLER["Arc<StdMutex<Option<Box<dyn Fn>>>>\nState change callback"]
IS_CONNECTED["Arc<AtomicBool>\nConnection state flag"]
TASK_HANDLES["Vec<JoinHandle<()>>\nBackground task handles"]
end
subgraph "Background Tasks"
HEARTBEAT["Heartbeat Task\nSends Ping every 1s"]
RECV["Receive Loop Task\nProcesses WS frames"]
SEND["Send Loop Task\nTransmits WS messages"]
end
subgraph "WebSocket Layer"
WS_STREAM["tokio_tungstenite::WebSocketStream"]
WS_SENDER["SplitSink\nWrite half"]
WS_RECEIVER["SplitStream\nRead half"]
end
CLIENT -->|owns| DISPATCHER
CLIENT -->|owns| ENDPOINT
CLIENT -->|owns| TX
CLIENT -->|owns| STATE_HANDLER
CLIENT -->|owns| IS_CONNECTED
CLIENT -->|owns| TASK_HANDLES
TASK_HANDLES -->|contains| HEARTBEAT
TASK_HANDLES -->|contains| RECV
TASK_HANDLES -->|contains| SEND
HEARTBEAT -->|sends Ping via| TX
SEND -->|reads from| TX
SEND -->|writes to| WS_SENDER
RECV -->|reads from| WS_RECEIVER
RECV -->|processes via| DISPATCHER
RECV -->|processes via| ENDPOINT
WS_SENDER -->|part of| WS_STREAM
WS_RECEIVER -->|part of| WS_STREAM
The client spawns three background Tokio tasks to manage the WebSocket lifecycle: a heartbeat task for periodic pings, a receive loop for processing incoming WebSocket frames, and a send loop for transmitting outbound messages.
High-Level Component Structure
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:131-267
Core Components
RpcClient Structure
The RpcClient struct holds all state and resources necessary for maintaining an active WebSocket connection and processing RPC operations.
| Field | Type | Purpose |
|---|---|---|
dispatcher | Arc<TokioMutex<RpcDispatcher<'static>>> | Manages request ID allocation, response correlation, and stream multiplexing |
endpoint | Arc<RpcServiceEndpoint<()>> | Handles server-to-client RPC method dispatch |
tx | mpsc::UnboundedSender<WsMessage> | Internal channel for queuing outbound WebSocket messages |
state_change_handler | RpcTransportStateChangeHandler | User-registered callback invoked on connection state changes |
is_connected | Arc<AtomicBool> | Atomic flag tracking current connection status |
task_handles | Vec<JoinHandle<()>> | Handles to the three background Tokio tasks |
Type Alias : RpcTransportStateChangeHandler is defined as Arc<StdMutex<Option<Box<dyn Fn(RpcTransportState) + Send + Sync>>>>, allowing thread-safe storage and invocation of the state change callback.
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:22-32
Debug and Drop Implementations
The RpcClient implements Debug to display connection status and Drop to ensure proper cleanup:
The Drop implementation ensures that when the last Arc<RpcClient> reference is dropped, all background tasks are aborted and the state change handler is notified of disconnection.
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:34-52
Lifecycle Management
Client Creation
The RpcClient::new method establishes a WebSocket connection and initializes all client components. It uses Arc::new_cyclic to enable background tasks to hold weak references to the client, preventing circular reference cycles.
sequenceDiagram
participant App as "Application Code"
participant New as "RpcClient::new"
participant WS as "tokio_tungstenite"
participant Cyclic as "Arc::new_cyclic"
participant Tasks as "Background Tasks"
App->>New: RpcClient::new(host, port)
New->>New: Build websocket_url
New->>WS: connect_async(url)
WS-->>New: WebSocketStream + Response
New->>New: ws_stream.split()
New->>New: mpsc::unbounded_channel()
New->>Cyclic: "Arc::new_cyclic(|weak_client|)"
Cyclic->>Tasks: Spawn heartbeat task
Cyclic->>Tasks: Spawn receive loop task
Cyclic->>Tasks: Spawn send loop task
Cyclic-->>New: Arc<RpcClient>
New-->>App: Ok(Arc<RpcClient>)
Connection Establishment Flow
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271
Key Initialization Steps
- WebSocket URL Construction : Parses the host to determine if it’s an IP address or hostname, constructing the appropriate
ws://URL (rpc_client.rs:112-115) - Connection Establishment : Calls
connect_asyncfromtokio-tungstenite(rpc_client.rs:118-121) - Stream Splitting : Splits the WebSocket stream into separate read and write halves (rpc_client.rs127)
- Channel Creation : Creates an unbounded MPSC channel for internal message passing (rpc_client.rs128)
- Cyclic Arc Initialization : Uses
Arc::new_cyclicto allow tasks to hold weak references (rpc_client.rs131) - Component Initialization : Creates
RpcDispatcher,RpcServiceEndpoint, and state tracking (rpc_client.rs:132-136) - Task Spawning : Spawns the three background tasks (rpc_client.rs:139-257)
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271
Shutdown Mechanisms
The client provides both synchronous and asynchronous shutdown paths to handle different termination scenarios.
Shutdown Flow Comparison
| Aspect | shutdown_sync() | shutdown_async() |
|---|---|---|
| Context | Called from Drop implementation | Called from background tasks on error |
| Locking | Uses StdMutex::lock() (blocking) | Uses TokioMutex::lock().await (async) |
| Dispatcher | Does not acquire dispatcher lock | Acquires dispatcher lock to fail pending requests |
| Pending Requests | Left in dispatcher (client is dropping) | Explicitly failed with ReadAfterCancel error |
| State Handler | Invokes with Disconnected if connected | Invokes with Disconnected if connected |
Synchronous Shutdown (rpc_client.rs:56-77):
- Used when the client is being dropped
- Checks
is_connectedand swaps tofalseatomically - Invokes the state change handler if connected
- Does not fail pending requests (client is being destroyed)
Asynchronous Shutdown (rpc_client.rs:79-108):
- Used when background tasks detect connection errors
- Swaps
is_connectedtofalseatomically - Acquires the dispatcher lock asynchronously
- Calls
fail_all_pending_requestswithFrameDecodeError::ReadAfterCancel - Invokes the state change handler if connected
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:56-108
graph LR
subgraph "Task Lifecycle"
SPAWN["Arc::new_cyclic\ncreates Weak references"]
HEARTBEAT_TASK["Heartbeat Task"]
RECV_TASK["Receive Loop Task"]
SEND_TASK["Send Loop Task"]
SPAWN --> HEARTBEAT_TASK
SPAWN --> RECV_TASK
SPAWN --> SEND_TASK
end
subgraph "Shared State"
APP_TX["mpsc::UnboundedSender\nMessage queue"]
WS_SENDER["WebSocket write half"]
WS_RECEIVER["WebSocket read half"]
WEAK_CLIENT["Weak<RpcClient>\nUpgradable reference"]
end
HEARTBEAT_TASK -->|send Ping| APP_TX
SEND_TASK -->|recv| APP_TX
SEND_TASK -->|send msg| WS_SENDER
RECV_TASK -->|next| WS_RECEIVER
RECV_TASK -->|upgrade| WEAK_CLIENT
SEND_TASK -->|upgrade| WEAK_CLIENT
Background Tasks
The client spawns three independent Tokio tasks during initialization, each serving a specific role in the connection lifecycle.
Task Architecture
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:131-267
Heartbeat Task
The heartbeat task generates periodic ping messages to maintain connection liveness and detect silent disconnections.
Implementation (rpc_client.rs:139-154):
Behavior :
- Ticks every 1 second using
tokio::time::interval - Sends
WsMessage::Pingto the internal message queue - Exits when the channel is closed (client dropped)
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:139-154
Receive Loop Task
The receive loop processes incoming WebSocket frames and routes them through the RPC dispatcher and endpoint.
Implementation (rpc_client.rs:157-222):
Frame Processing :
| Message Type | Action |
|---|---|
Binary | Lock dispatcher, call endpoint.read_bytes() to process RPC frames |
Ping | Send Pong response via internal channel |
Pong | Log and ignore (response to our heartbeat pings) |
Text | Log and ignore (protocol uses binary frames only) |
Close | Break loop (connection closed) |
| Error | Spawn shutdown_async() task and break loop |
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:157-222
Send Loop Task
The send loop drains the internal message queue and transmits messages to the WebSocket.
Implementation (rpc_client.rs:224-257):
Behavior :
- Receives messages from the MPSC channel
- Checks
is_connectedbefore sending (prevents sending after disconnect signal) - Uses
Ordering::Acquirefor memory synchronization with the shutdown path - Spawns
shutdown_async()on send errors - Exits when the channel is closed or disconnected
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:224-257
Connection State Management
is_connected Flag
The is_connected field is an Arc<AtomicBool> that tracks the current connection status. It uses atomic operations to ensure thread-safe updates across multiple tasks.
Memory Ordering :
Ordering::SeqCstfor swapping in shutdown paths (strongest guarantee)Ordering::Acquirein send loop for reading (synchronizes with shutdown writes)Ordering::Relaxedfor general reads (no synchronization needed)
State Transitions :
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs30 extensions/muxio-tokio-rpc-client/src/rpc_client.rs61 extensions/muxio-tokio-rpc-client/src/rpc_client.rs85
State Change Handlers
Applications can register a state change handler to be notified of connection and disconnection events.
Handler Registration (rpc_client.rs:315-334):
Invocation Points :
- Connected : Immediately after setting the handler if already connected
- Disconnected : In
shutdown_sync()orshutdown_async()when connection ends
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334
graph TB
subgraph "RpcServiceCallerInterface Trait"
GET_DISPATCHER["get_dispatcher()\nReturns Arc<TokioMutex<RpcDispatcher>>"]
IS_CONNECTED["is_connected()\nReturns bool"]
GET_EMIT_FN["get_emit_fn()\nReturns Arc<dyn Fn(Vec<u8>)>"]
SET_STATE_HANDLER["set_state_change_handler()\nRegisters callback"]
end
subgraph "RpcClient Implementation"
IMPL_GET_DISP["Clone dispatcher Arc"]
IMPL_IS_CONN["Load is_connected atomic"]
IMPL_EMIT["Closure sending to tx channel"]
IMPL_SET_STATE["Store handler, invoke if connected"]
end
GET_DISPATCHER --> IMPL_GET_DISP
IS_CONNECTED --> IMPL_IS_CONN
GET_EMIT_FN --> IMPL_EMIT
SET_STATE_HANDLER --> IMPL_SET_STATE
RpcServiceCallerInterface Implementation
The RpcClient implements the platform-agnostic RpcServiceCallerInterface trait, enabling it to be used with shared RPC service definitions.
Trait Methods Implementation
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:278-335
get_dispatcher
Returns a clone of the Arc<TokioMutex<RpcDispatcher>>, allowing RPC call implementations to access the dispatcher for request correlation and stream management.
Implementation (rpc_client.rs:280-282):
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:280-282
is_connected
Returns the current connection status by loading the atomic boolean with relaxed ordering.
Implementation (rpc_client.rs:284-286):
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286
get_emit_fn
Returns a closure that captures the internal message channel and is_connected flag. This closure is used by the RPC dispatcher to emit binary frames for transmission.
Implementation (rpc_client.rs:289-313):
Behavior :
- Checks
is_connectedbefore sending (prevents writes after disconnect) - Converts
Vec<u8>toWsMessage::Binary - Sends to the internal MPSC channel (non-blocking unbounded send)
- Ignores send errors (channel closed means client is dropping)
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313
WebSocket Transport Integration
tokio-tungstenite
The client uses tokio-tungstenite for WebSocket protocol implementation over Tokio’s async I/O.
Dependencies (Cargo.toml16):
Connection Flow :
- URL Construction : Builds
ws://host:port/wsURL string - Async Connect : Calls
connect_async(&websocket_url)which returns(WebSocketStream, Response) - Stream Split : Calls
ws_stream.split()to obtain(SplitSink, SplitStream) - Task Distribution : Distributes the sink to send loop, stream to receive loop
Sources : extensions/muxio-tokio-rpc-client/Cargo.toml16 extensions/muxio-tokio-rpc-client/src/rpc_client.rs19 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:118-127
Binary Frame Processing
All RPC communication uses WebSocket binary frames. The receive loop processes binary frames by passing them to the endpoint for decoding.
Binary Frame Flow :
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:163-178
Ping/Pong Handling
The client automatically responds to server pings and sends periodic pings via the heartbeat task.
Ping/Pong Matrix :
| Direction | Message Type | Sender | Handler |
|---|---|---|---|
| Client → Server | Ping | Heartbeat task (1s interval) | Server responds with Pong |
| Server → Client | Ping | Server | Receive loop sends Pong |
| Server → Client | Pong | Server (response to our Ping) | Receive loop logs and ignores |
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:139-154 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:179-182
Error Handling and Cleanup
Error Sources
The client handles errors from multiple sources:
| Error Source | Handling Strategy | Cleanup Action |
|---|---|---|
connect_async failure | Return io::Error from new() | No cleanup needed (not created) |
| WebSocket receive error | Spawn shutdown_async(), break receive loop | Fail pending requests, notify handler |
| WebSocket send error | Spawn shutdown_async(), break send loop | Fail pending requests, notify handler |
| MPSC channel closed | Break task loop | Task exits naturally |
| Client drop | Abort tasks, call shutdown_sync() | Notify handler, abort background tasks |
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:118-121 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:186-198 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:239-253
Pending Request Cleanup
When the connection is lost, all pending RPC requests in the dispatcher must be failed to prevent callers from waiting indefinitely.
Cleanup Flow (rpc_client.rs:100-103):
Error Propagation :
- Dispatcher lock is acquired (prevents new requests)
fail_all_pending_requestsis called withReadAfterCancelerror- All pending request callbacks receive
RpcServiceError::Transport - Waiting futures resolve with error
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:100-103
Task Abort on Drop
The Drop implementation ensures clean shutdown even if the client is dropped while background tasks are running.
Drop Sequence (rpc_client.rs:42-52):
- Iterate over
task_handlesand callabort()on each - Call
shutdown_sync()to notify state handlers - Background tasks receive abort signal and terminate
Abort Safety : Aborting tasks is safe because:
- The receive loop holds a weak reference (won’t prevent drop)
- The send loop checks
is_connectedbefore sending - The heartbeat task only sends pings (no critical state)
- All critical state is owned by the
RpcClientitself
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52
Usage Examples
Basic Connection and Call
This example demonstrates:
- Creating a client with
RpcClient::new(host, port) - Making a prebuffered RPC call via the trait method
- The client is automatically cleaned up when dropped
Sources : Example pattern from integration tests
State Change Handler Registration
The handler is invoked:
- Immediately with
Connectedif already connected - With
Disconnectedwhen the connection is lost or client is dropped
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334
Server-to-Client RPC
The client can also handle incoming RPC calls from the server by registering handlers on its embedded endpoint.
This demonstrates bidirectional RPC:
- The client can call server methods via
RpcServiceCallerInterface - The server can call client methods via the registered endpoint handlers
Sources : extensions/muxio-tokio-rpc-client/src/rpc_client.rs:273-275
Testing
The crate includes comprehensive integration tests covering connection lifecycle, error handling, and state management.
Test Coverage
| Test | File | Purpose |
|---|---|---|
test_client_errors_on_connection_failure | tests/transport_state_tests.rs:16-31 | Verifies connection errors are returned properly |
test_transport_state_change_handler | tests/transport_state_tests.rs:34-165 | Validates state handler invocations |
test_pending_requests_fail_on_disconnect | tests/transport_state_tests.rs:167-292 | Ensures pending requests fail on disconnect |
Sources : extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:1-293
Mock Implementations
The muxio-rpc-service-caller crate tests include mock client implementations demonstrating the trait contract:
MockRpcClient Structure (dynamic_channel_tests.rs:19-88):
- Implements
RpcServiceCallerInterfacefor testing - Uses
Arc<Mutex<Option<DynamicSender>>>to provide response senders - Demonstrates dynamic channel handling (bounded/unbounded)
- Uses
Arc<AtomicBool>for connection state simulation
Sources : extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:15-88