This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Tokio RPC Server
Loading…
Tokio RPC Server
Relevant source files
Purpose and Scope
The muxio-tokio-rpc-server crate provides a production-ready WebSocket RPC server implementation for native Rust environments using the Tokio async runtime. This server integrates the muxio RPC framework with Axum’s HTTP/WebSocket capabilities and tokio-tungstenite’s WebSocket protocol handling.
This document covers the server-side implementation for native applications. For client-side Tokio implementations, see Tokio RPC Client. For browser-based clients, see WASM RPC Client. For general information about service definitions and endpoint interfaces, see Service Endpoint Interface.
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml:1-23
- High-level architecture diagrams (Diagram 1)
Architecture Overview
The Tokio RPC Server sits at the intersection of three major subsystems: Axum’s HTTP framework, the muxio core multiplexing layer, and the RPC service framework. It provides a complete server implementation that accepts WebSocket connections, manages multiple concurrent clients, and dispatches RPC requests to registered handlers.
graph TB
subgraph "Application Layer"
APP["Application Code\nService Handler Implementations"]
end
subgraph "Server Interface Layer - muxio-tokio-rpc-server"
RPC_SERVER["RpcServer\nMain server struct\nHandler registration\nConnection management"]
WS_HANDLER["WebSocket Handler\nhandle_websocket_connection\nPer-connection async task"]
end
subgraph "RPC Framework Integration"
ENDPOINT["RpcServiceEndpointInterface\nDispatcher to handlers\nMethod routing"]
CALLER["RpcServiceCallerInterface\nServer-to-client calls\nBidirectional RPC"]
DISPATCHER["RpcDispatcher\nRequest correlation\nResponse routing"]
end
subgraph "Transport Layer"
AXUM["Axum Framework\nHTTP routing\nWebSocket upgrade"]
TUNGSTENITE["tokio-tungstenite\nWebSocket protocol\nFrame handling"]
end
subgraph "Core Multiplexing"
SESSION["RpcSession\nStream multiplexing\nFrame mux/demux"]
end
APP --> RPC_SERVER
RPC_SERVER --> WS_HANDLER
RPC_SERVER --> ENDPOINT
WS_HANDLER --> CALLER
WS_HANDLER --> ENDPOINT
WS_HANDLER --> DISPATCHER
WS_HANDLER --> AXUM
AXUM --> TUNGSTENITE
DISPATCHER --> SESSION
ENDPOINT --> DISPATCHER
CALLER --> DISPATCHER
TUNGSTENITE --> SESSION
style RPC_SERVER fill:#e1f5ff
style WS_HANDLER fill:#e1f5ff
Server Component Stack
Description: The server is built in layers. The RpcServer struct provides the high-level interface for configuring the server and registering handlers. When a WebSocket connection arrives via Axum, a dedicated handle_websocket_connection task is spawned. This task instantiates both an RpcServiceEndpointInterface (for handling incoming client requests) and an RpcServiceCallerInterface (for making server-initiated calls to the client), both sharing a single RpcDispatcher and RpcSession for the connection.
Sources:
- High-level architecture diagrams (Diagrams 1, 2, 3)
- extensions/muxio-tokio-rpc-server/Cargo.toml:11-22
Key Components
RpcServer Structure
The main server structure manages the overall server lifecycle, route configuration, and handler registry. It integrates with Axum’s routing system to expose WebSocket endpoints.
| Component | Type | Purpose |
|---|---|---|
RpcServer | Main struct | Server configuration, handler registration, Axum router integration |
| Handler registry | Internal storage | Maintains registered RPC service handlers |
| Axum router | axum::Router | HTTP routing and WebSocket upgrade handling |
| Connection tracking | State management | Tracks active connections and connection state |
Key Responsibilities:
- Registering service handlers via
RpcServiceEndpointInterface - Creating Axum routes for WebSocket endpoints
- Spawning per-connection tasks
- Managing server lifecycle (start, stop, graceful shutdown)
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml:12-16
- High-level architecture diagrams (Diagram 1)
WebSocket Connection Handler
Each WebSocket connection is handled by a dedicated async task that manages the complete lifecycle of that connection.
Description: The connection handler orchestrates all communication for a single client. It creates a shared RpcDispatcher, instantiates both endpoint and caller interfaces, and manages the read/write loops for WebSocket frames. This enables full bidirectional RPC - clients can call server methods, and the server can call client methods over the same connection.
sequenceDiagram
participant Axum as "Axum Router"
participant Handler as "handle_websocket_connection"
participant Endpoint as "RpcServiceEndpointInterface"
participant Caller as "RpcServiceCallerInterface"
participant Dispatcher as "RpcDispatcher"
participant WS as "WebSocket\n(tokio-tungstenite)"
Axum->>Handler: WebSocket upgrade
Handler->>Handler: Create RpcDispatcher
Handler->>Endpoint: Instantiate with dispatcher
Handler->>Caller: Instantiate with dispatcher
Handler->>Handler: Spawn read loop
Handler->>Handler: Spawn write loop
Note over Handler,WS: Connection Active
WS->>Handler: Binary frame received
Handler->>Dispatcher: Feed bytes
Dispatcher->>Endpoint: Route request
Endpoint->>Handler: Handler result
Handler->>Dispatcher: Send response
Dispatcher->>WS: Binary frames
Note over Handler,WS: Bidirectional RPC Support
Handler->>Caller: call(request)
Caller->>Dispatcher: Outgoing request
Dispatcher->>WS: Binary frames
WS->>Handler: Response frames
Handler->>Dispatcher: Feed bytes
Dispatcher->>Caller: Deliver response
Note over Handler,WS: Connection Closing
WS->>Handler: Close frame
Handler->>Endpoint: Cleanup
Handler->>Caller: Cleanup
Handler->>Handler: Task exit
Sources:
- High-level architecture diagrams (Diagram 3)
- extensions/muxio-tokio-rpc-server/Cargo.toml:14-22
WebSocket Integration with Axum
The server leverages Axum’s built-in WebSocket support to handle the HTTP upgrade handshake and ongoing WebSocket communication.
graph LR
subgraph "Axum Router Configuration"
ROUTER["axum::Router"]
ROUTE["WebSocket Route\n/ws or custom path"]
UPGRADE["WebSocket Upgrade Handler\naxum::extract::ws"]
end
subgraph "Handler Function"
WS_FN["handle_websocket_connection\nAsync task per connection"]
EXTRACT["Extract WebSocket\nfrom HTTP upgrade"]
end
subgraph "Connection State"
DISPATCHER_STATE["Arc<Mutex<RpcDispatcher>>"]
ENDPOINT_STATE["Arc<RpcServiceEndpointInterface>"]
HANDLER_REGISTRY["Handler Registry\nRegistered service handlers"]
end
ROUTER --> ROUTE
ROUTE --> UPGRADE
UPGRADE --> WS_FN
WS_FN --> EXTRACT
WS_FN --> DISPATCHER_STATE
WS_FN --> ENDPOINT_STATE
WS_FN --> HANDLER_REGISTRY
style ROUTER fill:#fff4e1
style WS_FN fill:#e1f5ff
Route Configuration
Description: The server configures an Axum route (typically at /ws) that handles WebSocket upgrades. When a client connects, Axum invokes the handler function with the upgraded WebSocket. The handler then creates the necessary RPC infrastructure (dispatcher, endpoint, caller) and enters the connection loop.
Integration Points:
axum::extract::ws::WebSocketUpgrade- HTTP to WebSocket upgradeaxum::extract::ws::WebSocket- Bidirectional WebSocket streamaxum::routing::get()- Route registrationtokio::spawn()- Connection task spawning
Sources:
- Cargo.lock:80-114 (Axum dependencies)
- Cargo.lock:1446-1455 (tokio-tungstenite dependencies)
Handler Registration and Dispatch
Service handlers are registered with the server before it starts, allowing the endpoint to route incoming requests to the appropriate handler implementations.
Handler Registration Flow
| Step | Action | Code Entity |
|---|---|---|
| 1 | Create RpcServer instance | RpcServer::new() or ::builder() |
| 2 | Register service handlers | register_handler<T>() method |
| 3 | Build Axum router | Internal router configuration |
| 4 | Start server | serve() or run() method |
Request Dispatch Mechanism
Description: When a complete RPC request is received and decoded, the RpcServiceEndpointInterface looks up the registered handler by method ID. The handler is invoked asynchronously, and its result is serialized and sent back through the same connection. Error handling is built into each layer, with errors propagated back to the client as RpcServiceError responses.
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml:18-20 (RPC service dependencies)
- High-level architecture diagrams (Diagram 3)
graph TB
subgraph "Per-Connection State"
SHARED_DISPATCHER["Arc<TokioMutex<RpcDispatcher>>\nShared between endpoint & caller"]
end
subgraph "Server-to-Client Direction"
CALLER["RpcServiceCallerInterface\nServer-initiated calls"]
CALLER_LOGIC["Call Logic\nawait response from client"]
end
subgraph "Client-to-Server Direction"
ENDPOINT["RpcServiceEndpointInterface\nClient-initiated calls"]
HANDLER["Registered Handlers\nServer-side implementations"]
end
subgraph "Transport Sharing"
READ_LOOP["Read Loop\nReceive WebSocket frames\nFeed to dispatcher"]
WRITE_LOOP["Write Loop\nSend WebSocket frames\nFrom dispatcher"]
WEBSOCKET["WebSocket Connection\nSingle TCP connection"]
end
CALLER --> SHARED_DISPATCHER
ENDPOINT --> SHARED_DISPATCHER
SHARED_DISPATCHER --> READ_LOOP
SHARED_DISPATCHER --> WRITE_LOOP
READ_LOOP --> WEBSOCKET
WRITE_LOOP --> WEBSOCKET
CALLER_LOGIC --> CALLER
HANDLER --> ENDPOINT
style SHARED_DISPATCHER fill:#e1f5ff
style WEBSOCKET fill:#fff4e1
Bidirectional RPC Support
A key feature of the Tokio RPC Server is support for server-initiated RPC calls to connected clients. This enables push notifications, server-side events, and bidirectional request/response patterns.
Bidirectional Communication Architecture
Description: Both the RpcServiceEndpointInterface and RpcServiceCallerInterface share the same RpcDispatcher and underlying RpcSession. This allows both directions to multiplex their requests over the same WebSocket connection. The dispatcher ensures that request IDs are unique and that responses are routed to the correct awaiting caller, whether that’s a client waiting for a server response or a server waiting for a client response.
Use Cases:
- Server pushing real-time updates to clients
- Server requesting information from clients
- Peer-to-peer style communication patterns
- Event-driven architectures where both sides can initiate actions
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml:18-20
- High-level architecture diagrams (Diagram 3)
Connection Lifecycle Management
The server manages the complete lifecycle of each WebSocket connection, from initial upgrade through active communication to graceful or abrupt termination.
Connection States
| State | Description | Transitions |
|---|---|---|
Connecting | WebSocket upgrade in progress | → Connected |
Connected | Active RPC communication | → Disconnecting, Error |
Disconnecting | Graceful shutdown initiated | → Disconnected |
Disconnected | Connection closed cleanly | Terminal state |
Error | Connection error occurred | → Disconnected |
Lifecycle State Machine
Description: The connection lifecycle is managed by the per-connection task. State transitions are tracked internally, and state change callbacks (if registered) are invoked to notify application code of connection events. Resources are properly cleaned up in all terminal states.
Resource Cleanup:
- WebSocket stream closed
- Per-stream decoders removed from
RpcSession - Pending requests in
RpcDispatchercompleted with error - Connection removed from server’s tracking state
- Task exit and memory deallocation
Sources:
- High-level architecture diagrams (Diagram 6)
- extensions/muxio-tokio-rpc-server/Cargo.toml15 (Tokio dependency)
graph LR
subgraph "Inbound Path"
WS_MSG["WebSocket Binary Message\ntokio_tungstenite::Message"]
BYTES["Bytes Extract\nVec<u8> or Bytes"]
FEED["dispatcher.feed_bytes()\nProcess frame data"]
DECODE["RpcSession Decode\nReconstruct streams"]
EVENT["RpcDispatcher Events\nComplete requests/responses"]
end
subgraph "Outbound Path"
RESPONSE["Response Data\nFrom handlers or caller"]
ENCODE["RpcSession Encode\nFrame into chunks"]
WRITE_QUEUE["Write Queue\nPending frames"]
WS_SEND["WebSocket Send\nBinary message"]
end
WS_MSG --> BYTES
BYTES --> FEED
FEED --> DECODE
DECODE --> EVENT
RESPONSE --> ENCODE
ENCODE --> WRITE_QUEUE
WRITE_QUEUE --> WS_SEND
style FEED fill:#e1f5ff
style ENCODE fill:#e1f5ff
Binary Frame Processing
The server handles the low-level details of converting between WebSocket binary messages and muxio’s framing protocol.
Frame Processing Pipeline
Description: WebSocket messages arrive as binary frames. The server extracts the byte payload and feeds it to the RpcDispatcher, which uses the RpcSession to demultiplex and decode the frames. In the outbound direction, responses are encoded by RpcSession into chunked frames and queued for WebSocket transmission.
Frame Format Details:
- Binary WebSocket frames only (text frames rejected)
- Frames may be chunked at the WebSocket layer (transparent to muxio)
- muxio’s internal chunking is based on
DEFAULT_MAX_CHUNK_SIZE - Stream multiplexing allows multiple concurrent operations
Sources:
- Cargo.lock:1446-1455 (tokio-tungstenite)
- High-level architecture diagrams (Diagram 5)
Server Configuration and Builder Pattern
The server typically provides a builder pattern for configuration before starting.
Configuration Options
| Option | Purpose | Default |
|---|---|---|
| Bind address | TCP address and port | 127.0.0.1:3000 (typical) |
| WebSocket path | Route path for WebSocket upgrade | /ws (typical) |
| Connection limits | Max concurrent connections | Unlimited (configurable) |
| Timeout settings | Connection and request timeouts | Platform defaults |
| Handler registry | Registered RPC service handlers | Empty (must register) |
| State change handlers | Lifecycle event callbacks | None (optional) |
Typical Server Setup Pattern
Description: The server is configured using a builder pattern. Handlers are registered before the server starts. Once build() is called, the server configures its Axum router with the WebSocket route. The run() or serve() method starts the server’s event loop.
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml12 (Axum dependency)
- extensions/muxio-tokio-rpc-server/Cargo.toml15 (Tokio dependency)
Integration with Tokio Async Runtime
The server is fully integrated with the Tokio async runtime, using async/await throughout.
Async Task Structure
| Task Type | Purpose | Lifetime |
|---|---|---|
| Server listener | Accept incoming connections | Until server shutdown |
| Per-connection task | Handle one WebSocket connection | Until connection closes |
| Read loop | Process incoming WebSocket frames | Per-connection lifetime |
| Write loop | Send outgoing WebSocket frames | Per-connection lifetime |
| Handler execution | Execute registered RPC handlers | Per-request duration |
Async Patterns:
async fnfor all I/O operationstokio::spawn()for concurrent task creationtokio::select!for graceful shutdown coordinationTokioMutexfor shared state protectionfutures::stream::StreamExtfor frame processing
Concurrency Model:
- One Tokio task per connection
- Handlers executed concurrently on Tokio thread pool
- Multiplexed streams allow concurrent RPC calls per connection
- Backpressure handled at WebSocket layer
Sources:
- Cargo.lock:1417-1432 (Tokio dependency)
- extensions/muxio-tokio-rpc-server/Cargo.toml15
- extensions/muxio-tokio-rpc-server/Cargo.toml21 (async-trait)
graph TB
subgraph "Error Sources"
WS_ERR["WebSocket Errors\nConnection failures\nProtocol violations"]
DECODE_ERR["Decode Errors\nInvalid frames\nMalformed data"]
HANDLER_ERR["Handler Errors\nBusiness logic failures\nRpcServiceError"]
DISPATCHER_ERR["Dispatcher Errors\nUnknown request ID\nMutex poisoning"]
end
subgraph "Error Handling"
LOG["tracing::error!\nStructured logging"]
CLIENT_RESP["Error Response\nRpcServiceError to client"]
DISCONNECT["Connection Close\nUnrecoverable errors"]
STATE_CALLBACK["State Change Handler\nNotify application"]
end
WS_ERR --> LOG
WS_ERR --> DISCONNECT
DECODE_ERR --> LOG
DECODE_ERR --> CLIENT_RESP
HANDLER_ERR --> LOG
HANDLER_ERR --> CLIENT_RESP
DISPATCHER_ERR --> LOG
DISPATCHER_ERR --> DISCONNECT
DISCONNECT --> STATE_CALLBACK
style LOG fill:#fff4e1
style DISCONNECT fill:#ffe1e1
Error Handling and Observability
The server provides comprehensive error handling and integrates with the tracing ecosystem for observability.
Error Propagation
Description: Errors are categorized by severity and handled appropriately. Handler errors are serialized and sent to the client as RpcServiceError responses. Transport and protocol errors are logged and result in connection termination. All errors are emitted as structured tracing events for monitoring and debugging.
Tracing Integration:
#[tracing::instrument]on key functions- Span context for each connection
- Structured fields for method IDs, request IDs, error codes
- Error, warn, info, debug, and trace level events
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml22 (tracing dependency)
- Cargo.lock:1503-1562 (tracing ecosystem)
Performance Considerations
The Tokio RPC Server is designed for high-performance scenarios with multiple concurrent connections and high request throughput.
Performance Characteristics
| Metric | Behavior | Tuning |
|---|---|---|
| Connections | O(n) tasks, minimal per-connection overhead | Tokio thread pool size |
| Throughput | Stream multiplexing enables pipelining | DEFAULT_MAX_CHUNK_SIZE |
| Latency | Low - single async task hop per request | Handler execution time |
| Memory | Incremental - per-stream decoders only | Connection limits |
Optimization Strategies:
- Stream multiplexing reduces head-of-line blocking
- Binary protocol minimizes serialization overhead
- Zero-copy where possible (using
bytes::Bytes) - Efficient buffer management in
RpcSession - Concurrent handler execution on Tokio thread pool
Sources:
- extensions/muxio-tokio-rpc-server/Cargo.toml13 (bytes dependency)
- High-level architecture diagrams (Diagram 5)
- Cargo.lock:209-212 (bytes crate)