This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Tokio RPC Server
Relevant source files
- Cargo.lock
- extensions/muxio-rpc-service-endpoint/Cargo.toml
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs
- extensions/muxio-tokio-rpc-server/Cargo.toml
Purpose and Scope
The muxio-tokio-rpc-server crate provides a concrete, production-ready WebSocket RPC server implementation built on the Tokio async runtime and the Axum web framework. This crate bridges the transport-agnostic RpcServiceEndpointInterface (documented in 4.3) with real-world network transport over WebSocket connections.
For client-side connection logic, see 5.2 Tokio RPC Client. For cross-platform WASM client support, see 5.3 WASM RPC Client. For details on connection state tracking and disconnection handling, see 5.4 Transport State Management.
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:1-23
System Architecture
The Tokio RPC Server sits at the intersection of three major subsystems: the Axum HTTP/WebSocket framework, the muxio RPC endpoint abstraction layer, and the Tokio async runtime. It translates between the generic RpcServiceEndpointInterface and the specific requirements of WebSocket binary message transport.
Diagram: Tokio RPC Server Component Architecture
graph TB
subgraph "Application Layer"
APP["Application Code"]
HANDLERS["RPC Method Handlers\n(user-defined async functions)"]
end
subgraph "muxio-tokio-rpc-server"
SERVER["TokioRpcServer"]
WS_HANDLER["WebSocket Handler\n(Axum route)"]
ENDPOINT_IMPL["RpcServiceEndpointInterface\nImplementation"]
CONN_MGR["Connection Manager\n(per-connection state)"]
end
subgraph "RPC Abstraction Layer"
ENDPOINT_TRAIT["RpcServiceEndpointInterface\n(trait)"]
DISPATCHER["RpcDispatcher"]
HANDLERS_LOCK["Handler Registry"]
end
subgraph "Network Transport"
AXUM["Axum Web Framework"]
WS["tokio-tungstenite\nWebSocket Protocol"]
TOKIO["Tokio Async Runtime"]
end
APP --> SERVER
APP --> HANDLERS
HANDLERS --> SERVER
SERVER --> WS_HANDLER
SERVER --> ENDPOINT_IMPL
WS_HANDLER --> AXUM
AXUM --> WS
ENDPOINT_IMPL -.implements.-> ENDPOINT_TRAIT
ENDPOINT_IMPL --> DISPATCHER
ENDPOINT_IMPL --> HANDLERS_LOCK
WS_HANDLER --> CONN_MGR
CONN_MGR --> ENDPOINT_IMPL
WS --> TOKIO
ENDPOINT_IMPL --> TOKIO
The server operates in three layers:
- Application Layer : User code registers RPC handlers and starts the server
- Server Implementation : Manages WebSocket connections and implements the endpoint interface
- Transport Layer : Axum provides HTTP routing, tokio-tungstenite handles WebSocket framing, Tokio executes async tasks
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:12-22 Cargo.lock:918-932
Core Components
Server Structure
The TokioRpcServer serves as the main entry point and coordinates all subsystems:
| Component | Purpose | Key Responsibilities |
|---|---|---|
TokioRpcServer | Main server instance | Server lifecycle, handler registration, Axum router configuration |
| WebSocket Handler | Axum route handler | Connection upgrade, per-connection spawning, binary message loop |
| Endpoint Implementation | RpcServiceEndpointInterface impl | Handler lookup, request dispatching, response encoding |
| Handler Registry | Shared state | Thread-safe storage of registered RPC method handlers |
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:12-17
Dependency Integration
Diagram: Key Dependencies and Their Relationships
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:12-22 Cargo.lock:918-932
sequenceDiagram
participant Client
participant AxumRouter as "Axum Router"
participant WSHandler as "WebSocket Handler"
participant Upgrade as "WebSocket Upgrade"
participant ConnTask as "Connection Task\n(spawned)"
participant Dispatcher as "RpcDispatcher"
participant Endpoint as "Endpoint Interface"
Client->>AxumRouter: HTTP GET /ws
AxumRouter->>WSHandler: Route match
WSHandler->>Upgrade: Upgrade to WebSocket
Upgrade->>Client: 101 Switching Protocols
Upgrade->>WSHandler: WebSocket stream
WSHandler->>ConnTask: tokio::spawn
ConnTask->>Dispatcher: Create new instance
loop Message Loop
Client->>ConnTask: Binary WebSocket Frame
ConnTask->>Dispatcher: read_bytes(frame)
Dispatcher->>Endpoint: Decode + identify requests
Endpoint->>Endpoint: Execute handlers
Endpoint->>Dispatcher: Encode responses
Dispatcher->>ConnTask: Emit response bytes
ConnTask->>Client: Binary WebSocket Frame
end
alt Client Disconnect
Client->>ConnTask: Close frame
ConnTask->>Dispatcher: Cleanup
ConnTask->>ConnTask: Task exits
end
alt Server Shutdown
WSHandler->>ConnTask: Shutdown signal
ConnTask->>Client: Close frame
ConnTask->>Dispatcher: Cleanup
ConnTask->>ConnTask: Task exits
end
Connection Lifecycle
Each WebSocket connection follows a well-defined lifecycle managed by the server:
Diagram: WebSocket Connection Lifecycle
Connection States
| State | Description | Transitions |
|---|---|---|
| Upgrade | HTTP connection being upgraded to WebSocket | → Connected |
| Connected | Active WebSocket connection processing messages | → Disconnecting, → Error |
| Disconnecting | Graceful shutdown in progress, flushing pending responses | → Disconnected |
| Disconnected | Connection closed, resources cleaned up | Terminal state |
| Error | Abnormal termination due to protocol or transport error | → Disconnected |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138
RPC Endpoint Implementation
The server implements RpcServiceEndpointInterface<C> where C is the per-connection context type. This implementation provides the bridge between WebSocket binary frames and the RPC protocol layer.
Handler Registration
The register_prebuffered method allows applications to register RPC method handlers at runtime:
Diagram: Handler Registration Flow
graph LR
APP["Application Code"]
SERVER["TokioRpcServer"]
ENDPOINT["Endpoint Implementation"]
LOCK["Handler Registry\n(Arc<Mutex<HashMap>>)"]
APP -->|register_prebuffered method_id, handler| SERVER
SERVER -->|delegate| ENDPOINT
ENDPOINT -->|lock handlers| LOCK
LOCK -->|insert| LOCK
LOCK -->|check duplicates| LOCK
Key characteristics:
- Thread-safe : Uses
Arcand async-aware locking (tokio::sync::RwLockor similar) - Type-safe : Handlers accept
Vec<u8>input and returnResult<Vec<u8>, Box<dyn Error>> - Duplicate detection : Returns
RpcServiceEndpointError::Handlerif method ID already registered - Runtime registration : Handlers can be added after server start (though typically done during initialization)
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64 extensions/muxio-rpc-service-endpoint/Cargo.toml:21-22
flowchart TD
START["read_bytes(dispatcher, context, bytes, on_emit)"]
subgraph Stage1["Stage 1: Decode & Identify (Sync)"]
DECODE["dispatcher.read_bytes(bytes)"]
CHECK["Check which requests finalized"]
EXTRACT["Extract finalized requests\nfrom dispatcher"]
end
subgraph Stage2["Stage 2: Execute Handlers (Async)"]
LOOKUP["Look up handler by method_id"]
SPAWN["Spawn handler futures"]
AWAIT["join_all(futures)"]
end
subgraph Stage3["Stage 3: Encode & Emit (Sync)"]
ENCODE["dispatcher.respond(response)"]
CHUNK["Chunk large payloads"]
EMIT["on_emit(bytes)"]
end
START --> DECODE
DECODE --> CHECK
CHECK --> EXTRACT
EXTRACT -->|for each request| LOOKUP
LOOKUP --> SPAWN
SPAWN --> AWAIT
AWAIT -->|for each response| ENCODE
ENCODE --> CHUNK
CHUNK --> EMIT
EMIT --> END["Return Ok()"]
Request Processing Pipeline
The read_bytes method implements the three-stage request processing pipeline:
Diagram: Request Processing Pipeline in read_bytes
Stage details:
-
Decode & Identify extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-97
- Synchronously processes raw bytes from WebSocket
- Updates
RpcDispatcherinternal state - Collects fully-received requests ready for processing
- No blocking I/O or async operations
-
Execute Handlers extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-125
- Spawns async handler futures for each request
- Handlers execute concurrently via
join_all - Each handler receives cloned context and request bytes
- Returns vector of responses in same order as requests
-
Encode & Emit extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:127-136
- Synchronously encodes responses into RPC protocol format
- Chunks large payloads (respects
DEFAULT_SERVICE_MAX_CHUNK_SIZE) - Emits bytes via provided callback (typically writes to WebSocket)
- Updates dispatcher state for correlation tracking
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138
sequenceDiagram
participant WS as "WebSocket Stream"
participant Task as "Connection Task"
participant Dispatcher as "RpcDispatcher"
participant Endpoint as "Endpoint::read_bytes"
participant Emit as "on_emit Closure"
loop Until Disconnection
WS->>Task: next() -> Some(Message::Binary)
Task->>Task: Extract bytes from message
Task->>Endpoint: read_bytes(dispatcher, ctx, bytes, emit)
Note over Endpoint: Stage 1: Decode
Endpoint->>Dispatcher: dispatcher.read_bytes(bytes)
Dispatcher-->>Endpoint: Vec<request_id>
Note over Endpoint: Stage 2: Execute
Endpoint->>Endpoint: Spawn handler futures
Endpoint->>Endpoint: join_all(futures).await
Note over Endpoint: Stage 3: Encode
Endpoint->>Dispatcher: dispatcher.respond(response)
Dispatcher->>Emit: on_emit(response_bytes)
Emit->>Task: Collect bytes to send
Endpoint-->>Task: Ok(())
Task->>WS: send(Message::Binary(response_bytes))
end
alt WebSocket Close
WS->>Task: next() -> Some(Message::Close)
Task->>Task: Break loop
end
alt WebSocket Error
WS->>Task: next() -> Some(Message::Error)
Task->>Task: Log error, break loop
end
Task->>Dispatcher: Drop (cleanup)
Task->>Task: Task exits
WebSocket Message Loop
Each spawned connection task runs a continuous message loop that bridges WebSocket frames to RPC processing:
Diagram: WebSocket Message Loop Detail
Key implementation aspects:
| Aspect | Implementation Detail |
|---|---|
| Message Type Handling | Only Message::Binary frames processed; text/ping/pong handled separately or ignored |
| Backpressure | Async send() naturally applies backpressure when client slow to receive |
| Error Handling | WebSocket errors logged via tracing, connection terminated gracefully |
| Emit Callback | Closure captures WebSocket sink, queues bytes for batched sending |
| Dispatcher Lifetime | One RpcDispatcher per connection, dropped on task exit |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138
Integration with Axum
The server integrates with Axum's routing and middleware system:
Diagram: Axum Integration Pattern
Integration points:
- Router Construction : Server provides method to convert into
axum::Router - WebSocket Route : Typically mounted at
/wsor configurable path - Handler Function : Accepts
WebSocketUpgradeextractor from Axum - Upgrade Callback : Receives upgraded socket, spawns per-connection task
- Middleware Compatibility : Works with standard Tower middleware (CORS, auth, logging)
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml12 Cargo.lock:80-114
classDiagram
class Context {
<<trait bounds>>
+Send
+Sync
+Clone
+'static
}
class ConnectionId {+u64 id\n+SocketAddr peer_addr\n+Instant connected_at}
class AppState {+Arc~SharedData~ shared\n+Metrics metrics\n+AuthManager auth}
Context <|-- ConnectionId : implements
Context <|-- AppState : implements
note for Context "Any type implementing these bounds\ncan be used as connection context"
Context and State Management
The server supports per-connection context via the generic C type parameter in RpcServiceEndpointInterface<C>:
Context Type Requirements
Diagram: Context Type Requirements
Common context patterns:
| Pattern | Use Case | Example Fields |
|---|---|---|
| Connection Metadata | Track connection identity and timing | connection_id, peer_addr, connected_at |
| Authentication State | Store authenticated user information | user_id, session_token, permissions |
| Application State | Share server-wide resources | Arc<Database>, Arc<ConfigManager> |
| Request Context | Per-request metadata | trace_id, request_start, client_version |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:9-13
Handler Execution Model
Handlers execute asynchronously with Tokio runtime integration:
Diagram: Handler Execution Flow
Execution characteristics:
| Characteristic | Behavior |
|---|---|
| Concurrency | Multiple handlers execute concurrently via join_all for requests in same batch |
| Isolation | Each handler receives cloned context, preventing shared mutable state issues |
| Cancellation | If connection drops, futures are dropped (handlers should handle cancellation gracefully) |
| Error Handling | Handler errors converted to RPC error responses, connection remains active |
| Backpressure | WebSocket send backpressure naturally limits handler spawn rate |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-125
Error Handling
The server handles errors at multiple layers:
Error Types and Propagation
| Error Source | Error Type | Handling Strategy |
|---|---|---|
| WebSocket Protocol | tungstenite::Error | Log via tracing, close connection gracefully |
| RPC Dispatcher | muxio::RpcError | Convert to RPC error response, send to client |
| Handler Execution | Box<dyn Error + Send + Sync> | Encode as RPC error response, log details |
| Endpoint Registration | RpcServiceEndpointError | Return to caller during setup, prevent server start |
| Serialization | bitcode::Error | Treat as invalid request, send error response |
Error Response Flow
Diagram: Error Response Flow
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:32-34 extensions/muxio-tokio-rpc-server/Cargo.toml22
Performance Considerations
The Tokio RPC Server implementation includes several optimizations:
Message Batching
Diagram: Request Batching for Efficiency
Optimization Strategies
| Strategy | Implementation | Benefit |
|---|---|---|
| Zero-copy when possible | Uses bytes::Bytes for reference-counted buffers | Reduces memory allocations for large payloads |
| Concurrent handler execution | join_all spawns all handlers in batch | Maximizes CPU utilization for independent requests |
| Chunked responses | Large responses split per DEFAULT_SERVICE_MAX_CHUNK_SIZE | Prevents memory spikes, enables streaming-like behavior |
| Connection pooling | Each connection has dedicated task, no contention | Scales to thousands of concurrent connections |
| Async I/O | All I/O operations are async via Tokio | Efficient use of OS threads, high concurrency |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-125 extensions/muxio-tokio-rpc-server/Cargo.toml:13-14
Logging and Observability
The server integrates with the tracing ecosystem for structured logging:
Instrumentation Points
Diagram: Tracing Instrumentation Hierarchy
Key logging capabilities:
- Connection lifecycle events with peer address
- Request/response message IDs for correlation
- Handler execution timing and errors
- WebSocket protocol errors
- Dispatcher state changes
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml22 extensions/muxio-rpc-service-endpoint/Cargo.toml18
Example: Building a Server
Typical server construction and lifecycle:
Diagram: Server Initialization Sequence
Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:1-23 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64
Dismiss
Refresh this wiki
Enter email to refresh