This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Tokio RPC Client
Relevant source files
- Cargo.lock
- extensions/muxio-rpc-service-caller/src/lib.rs
- extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs
- extensions/muxio-tokio-rpc-client/Cargo.toml
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs
Purpose and Scope
This page documents the muxio-tokio-rpc-client crate, which provides a production-ready WebSocket client implementation for native Rust applications using the Tokio async runtime. This client connects to servers running muxio-tokio-rpc-server and enables type-safe RPC communication through shared service definitions.
Related Pages:
- For server-side implementation, see Tokio RPC Server
- For browser-based client implementation, see WASM RPC Client
- For the underlying RPC service caller abstraction, see Service Caller Interface
- For transport state management concepts, see Transport State Management
Architecture Overview
The RpcClient is a fully-featured WebSocket client that manages connection lifecycle, message routing, and RPC request/response correlation. It implements the RpcServiceCallerInterface trait, providing the same API as the WASM client while using native Tokio primitives.
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:131-267
graph TB
subgraph "Client Application"
APP["Application Code"]
end
subgraph "RpcClient Structure"
CLIENT["RpcClient"]
DISPATCHER["Arc<TokioMutex<RpcDispatcher>>"]
ENDPOINT["Arc<RpcServiceEndpoint<()>>"]
TX["mpsc::UnboundedSender<WsMessage>"]
STATE_HANDLER["RpcTransportStateChangeHandler"]
IS_CONNECTED["Arc<AtomicBool>"]
end
subgraph "Background Tasks"
HEARTBEAT["Heartbeat Task\n(JoinHandle)"]
RECV_LOOP["Receive Loop\n(JoinHandle)"]
SEND_LOOP["Send Loop\n(JoinHandle)"]
end
subgraph "WebSocket Connection"
WS_SENDER["ws_sender\n(SplitSink)"]
WS_RECEIVER["ws_receiver\n(SplitStream)"]
WS_STREAM["tokio_tungstenite::WebSocketStream"]
end
APP --> CLIENT
CLIENT --> DISPATCHER
CLIENT --> ENDPOINT
CLIENT --> TX
CLIENT --> STATE_HANDLER
CLIENT --> IS_CONNECTED
CLIENT --> HEARTBEAT
CLIENT --> RECV_LOOP
CLIENT --> SEND_LOOP
HEARTBEAT --> TX
TX --> SEND_LOOP
SEND_LOOP --> WS_SENDER
WS_RECEIVER --> RECV_LOOP
RECV_LOOP --> DISPATCHER
RECV_LOOP --> ENDPOINT
WS_SENDER --> WS_STREAM
WS_RECEIVER --> WS_STREAM
Core Components
RpcClient Structure
The RpcClient struct is the primary type exposed by this crate. It owns all connection resources and background tasks.
| Field | Type | Purpose |
|---|---|---|
dispatcher | Arc<TokioMutex<RpcDispatcher<'static>>> | Manages RPC request correlation and frame encoding/decoding |
endpoint | Arc<RpcServiceEndpoint<()>> | Handles server-to-client RPC calls (bidirectional support) |
tx | mpsc::UnboundedSender<WsMessage> | Channel for sending messages to WebSocket |
state_change_handler | RpcTransportStateChangeHandler | Optional callback for connection state changes |
is_connected | Arc<AtomicBool> | Atomic flag tracking connection status |
task_handles | Vec<JoinHandle<()>> | Handles for background tasks (aborted on drop) |
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-32
RpcServiceCallerInterface Implementation
The client implements the RpcServiceCallerInterface trait, which defines the contract for making RPC calls.
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:278-335
graph LR
subgraph "Trait Methods"
GET_DISPATCHER["get_dispatcher()"]
IS_CONNECTED["is_connected()"]
GET_EMIT["get_emit_fn()"]
SET_HANDLER["set_state_change_handler()"]
end
subgraph "RpcClient Implementation"
RETURN_DISP["Returns Arc<TokioMutex<RpcDispatcher>>"]
CHECK_FLAG["Checks Arc<AtomicBool>"]
CREATE_CLOSURE["Creates closure wrapping mpsc::Sender"]
STORE_CB["Stores callback in Arc<StdMutex>"]
end
GET_DISPATCHER --> RETURN_DISP
IS_CONNECTED --> CHECK_FLAG
GET_EMIT --> CREATE_CLOSURE
SET_HANDLER --> STORE_CB
Connection Lifecycle
Connection Establishment
The RpcClient::new() function establishes a WebSocket connection and spawns background tasks using Arc::new_cyclic() to enable weak references.
Connection URL Construction:
sequenceDiagram
participant App as "Application"
participant New as "RpcClient::new()"
participant WS as "tokio_tungstenite"
participant Tasks as "Background Tasks"
App->>New: new(host, port)
New->>New: construct websocket_url
New->>WS: connect_async(url)
WS-->>New: WebSocketStream + Response
New->>New: split stream into sender/receiver
New->>New: create mpsc channel
New->>New: Arc::new_cyclic(closure)
New->>Tasks: spawn heartbeat task
New->>Tasks: spawn receive loop
New->>Tasks: spawn send loop
New-->>App: Arc<RpcClient>
- If
hostparses asIpAddr:ws://{ip}:{port}/ws - Otherwise:
ws://{host}:{port}/ws
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271
Background Tasks
The client spawns three concurrent tasks that run for the lifetime of the connection:
1. Heartbeat Task
Sends periodic WebSocket ping frames to keep the connection alive and detect disconnections.
- Interval: 1 second
- Payload: Empty ping message
- Exit condition: Channel send failure
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:139-154
graph TB
START["ws_receiver.next().await"]
MATCH["Match message type"]
BINARY["WsMessage::Binary"]
PING["WsMessage::Ping"]
OTHER["Other messages"]
ERROR["Err(e)"]
LOCK_DISP["Lock dispatcher"]
READ_BYTES["endpoint.read_bytes()"]
SEND_PONG["Send pong response"]
LOG["Log message"]
SHUTDOWN["Call shutdown_async()"]
START --> MATCH
MATCH --> BINARY
MATCH --> PING
MATCH --> OTHER
MATCH --> ERROR
BINARY --> LOCK_DISP
LOCK_DISP --> READ_BYTES
PING --> SEND_PONG
OTHER --> LOG
ERROR --> SHUTDOWN
2. Receive Loop
Processes incoming WebSocket messages and routes them to the appropriate handlers.
Message Handling:
Binary: Decoded byRpcDispatcherand processed byRpcServiceEndpointPing: Automatically responds withPongPong: Logged (heartbeat responses)- Error: Triggers
shutdown_async()
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:156-222
3. Send Loop
Drains the internal MPSC channel and transmits messages over the WebSocket.
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:224-257
Connection Shutdown
Shutdown can occur synchronously (on Drop) or asynchronously (on connection errors).
Synchronous Shutdown (shutdown_sync)
Called from Drop implementation. Uses swap to ensure single execution.
Process:
- Swap
is_connectedflag tofalse - If previously
true, invoke state change handler withDisconnected - Abort all background tasks
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:55-77 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:42-52
Asynchronous Shutdown (shutdown_async)
Called from background tasks on connection errors.
Process:
- Swap
is_connectedflag tofalse - If previously
true, invoke state change handler withDisconnected - Acquire dispatcher lock
- Call
fail_all_pending_requests()withFrameDecodeError::ReadAfterCancel
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:79-108
Message Flow
Client-to-Server RPC Call
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313
sequenceDiagram
participant WS as "WebSocket"
participant RecvLoop as "Receive Loop Task"
participant Dispatcher as "RpcDispatcher"
participant Endpoint as "RpcServiceEndpoint"
participant Handler as "Registered Handler"
WS->>RecvLoop: Binary message
RecvLoop->>Dispatcher: Lock dispatcher
RecvLoop->>Endpoint: read_bytes(dispatcher, (), bytes, on_emit)
Endpoint->>Dispatcher: Decode frames
Dispatcher->>Endpoint: Route by METHOD_ID
Endpoint->>Handler: Invoke handler
Handler-->>Endpoint: Response bytes
Endpoint->>RecvLoop: on_emit(response_chunk)
RecvLoop->>WS: Send response
Server-to-Client RPC Call
The client includes an RpcServiceEndpoint to handle bidirectional RPC (server calling client).
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:164-177 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:273-275
State Management
Connection State Tracking
The client uses Arc<AtomicBool> for lock-free state checking and Arc<StdMutex<Option<Box<dyn Fn>>>> for the state change callback.
| State | is_connected Value | Trigger |
|---|---|---|
Connected | true | Successful connect_async() |
Disconnected | false | WebSocket error, explicit shutdown, or drop |
State Transition Events:
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs30 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286
graph LR
SET["set_state_change_handler(handler)"]
LOCK["Lock state_change_handler mutex"]
STORE["Store Box<dyn Fn> in Option"]
CHECK["Check is_connected flag"]
CALL_INIT["Call handler(Connected)"]
SET --> LOCK
LOCK --> STORE
STORE --> CHECK
CHECK -->|true| CALL_INIT
State Change Handler
Applications can register a callback to be notified of connection state changes.
Handler Registration:
Important: If the client is already connected when the handler is set, it immediately invokes the handler with RpcTransportState::Connected.
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334
Error Handling and Disconnection
graph TB
ERROR["Connection Error Detected"]
SHUTDOWN["shutdown_async()
called"]
LOCK["Acquire dispatcher lock"]
FAIL["dispatcher.fail_all_pending_requests()"]
NOTIFY["Waiting futures receive error"]
ERROR --> SHUTDOWN
SHUTDOWN --> LOCK
LOCK --> FAIL
FAIL --> NOTIFY
Pending Request Cancellation
When the connection is lost, all pending RPC requests are failed with FrameDecodeError::ReadAfterCancel.
Cancellation Flow:
Error Propagation:
ReadAfterCancel→RpcServiceError::TransportError→ Application receivesErr
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:100-103
Send Failure Handling
If the send loop cannot transmit a message, it triggers shutdown.
Failure Scenarios:
- WebSocket send returns error →
shutdown_async() - Channel closed (receiver dropped) → Loop exits
is_connectedflag isfalse→ Message dropped, loop exits
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:231-253
Receive Failure Handling
If the receive loop encounters an error, it triggers shutdown and exits.
Failure Scenarios:
- WebSocket receive returns error →
shutdown_async(), break loop - Stream ends (
Nonefromnext()) →shutdown_async(), exit loop
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:186-199 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:205-220
Usage Patterns
Basic Client Creation
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271
Making RPC Calls
The client implements RpcServiceCallerInterface, enabling use with any RpcMethodPrebuffered implementation:
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:278-335
Monitoring Connection State
Register a handler to track connection lifecycle:
Handler Invocation:
- Called immediately with
Connectedif client is already connected - Called with
Disconnectedon any connection loss - Called from shutdown paths (both sync and async)
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334 extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:36-165
Bidirectional RPC (Server-to-Client Calls)
The client can handle RPC calls initiated by the server:
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:273-275
Implementation Details
Weak Reference Pattern
The client uses Arc::new_cyclic to allow background tasks to hold Weak<RpcClient> references. This prevents reference cycles while enabling tasks to access the client.
Benefits:
- Tasks can access client methods without preventing cleanup
- Client can be dropped while tasks are running
- Tasks gracefully exit when client is dropped
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs131 extensions/muxio-tokio-rpc-client/src/rpc_client.rs157 extensions/muxio-tokio-rpc-client/src/rpc_client.rs225
Lock-Free is_connected Check
The is_connected flag uses AtomicBool with Ordering::Relaxed for reads and Ordering::SeqCst for writes, enabling fast connection status checks without mutexes.
Memory Ordering:
- Read:
Relaxed- No synchronization needed, flag is only hint - Write:
SeqCst- Strong ordering for state transitions - Swap:
SeqCst- Ensure single shutdown execution
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs30 extensions/muxio-tokio-rpc-client/src/rpc_client.rs61 extensions/muxio-tokio-rpc-client/src/rpc_client.rs85 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:284-286
Debug Implementation
The Debug trait is manually implemented to avoid exposing closures and function pointers, showing only the connection state.
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:34-40
Testing
Connection Failure Tests
Validates error handling when connecting to non-existent servers.
Test: test_client_errors_on_connection_failure
- Attempts connection to unused port
- Asserts
io::ErrorKind::ConnectionRefused
Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:17-31
State Change Handler Tests
Validates state change callbacks are invoked correctly during connection lifecycle.
Test: test_transport_state_change_handler
- Spawns minimal WebSocket server
- Registers state change handler
- Verifies
Connectedcallback - Server closes connection
- Verifies
Disconnectedcallback
Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:36-165
Pending Request Cancellation Tests
Validates that in-flight RPC requests fail when connection is lost.
Test: test_pending_requests_fail_on_disconnect
- Spawns server that accepts but doesn't respond
- Initiates RPC call (becomes pending)
- Server closes connection
- Asserts RPC call fails with cancellation error
Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:169-292
Dynamic Channel Tests
Validates streaming RPC functionality with bounded and unbounded channels.
Tests:
test_dynamic_channel_boundedtest_dynamic_channel_unbounded
Sources: extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:101-167
Dependencies
| Crate | Purpose |
|---|---|
tokio | Async runtime with "full" features |
tokio-tungstenite | WebSocket protocol implementation |
futures-util | Stream/sink utilities for WebSocket splitting |
async-trait | Trait async method support |
muxio | Core RPC dispatcher and framing |
muxio-rpc-service | Service trait definitions |
muxio-rpc-service-caller | Caller interface trait |
muxio-rpc-service-endpoint | Server-to-client RPC handling |
Sources: extensions/muxio-tokio-rpc-client/Cargo.toml:11-22
Dismiss
Refresh this wiki
Enter email to refresh