Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Tokio RPC Server

Relevant source files

Purpose and Scope

The muxio-tokio-rpc-server crate provides a concrete, production-ready WebSocket RPC server implementation built on the Tokio async runtime and the Axum web framework. This crate bridges the transport-agnostic RpcServiceEndpointInterface (documented in 4.3) with real-world network transport over WebSocket connections.

For client-side connection logic, see 5.2 Tokio RPC Client. For cross-platform WASM client support, see 5.3 WASM RPC Client. For details on connection state tracking and disconnection handling, see 5.4 Transport State Management.

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:1-23


System Architecture

The Tokio RPC Server sits at the intersection of three major subsystems: the Axum HTTP/WebSocket framework, the muxio RPC endpoint abstraction layer, and the Tokio async runtime. It translates between the generic RpcServiceEndpointInterface and the specific requirements of WebSocket binary message transport.

Diagram: Tokio RPC Server Component Architecture

graph TB
    subgraph "Application Layer"
        APP["Application Code"]
HANDLERS["RPC Method Handlers\n(user-defined async functions)"]
end
    
    subgraph "muxio-tokio-rpc-server"
        SERVER["TokioRpcServer"]
WS_HANDLER["WebSocket Handler\n(Axum route)"]
ENDPOINT_IMPL["RpcServiceEndpointInterface\nImplementation"]
CONN_MGR["Connection Manager\n(per-connection state)"]
end
    
    subgraph "RPC Abstraction Layer"
        ENDPOINT_TRAIT["RpcServiceEndpointInterface\n(trait)"]
DISPATCHER["RpcDispatcher"]
HANDLERS_LOCK["Handler Registry"]
end
    
    subgraph "Network Transport"
        AXUM["Axum Web Framework"]
WS["tokio-tungstenite\nWebSocket Protocol"]
TOKIO["Tokio Async Runtime"]
end
    
 
   APP --> SERVER
 
   APP --> HANDLERS
 
   HANDLERS --> SERVER
    
 
   SERVER --> WS_HANDLER
 
   SERVER --> ENDPOINT_IMPL
    
 
   WS_HANDLER --> AXUM
 
   AXUM --> WS
    
    ENDPOINT_IMPL -.implements.-> ENDPOINT_TRAIT
 
   ENDPOINT_IMPL --> DISPATCHER
 
   ENDPOINT_IMPL --> HANDLERS_LOCK
    
 
   WS_HANDLER --> CONN_MGR
 
   CONN_MGR --> ENDPOINT_IMPL
    
 
   WS --> TOKIO
 
   ENDPOINT_IMPL --> TOKIO

The server operates in three layers:

  1. Application Layer : User code registers RPC handlers and starts the server
  2. Server Implementation : Manages WebSocket connections and implements the endpoint interface
  3. Transport Layer : Axum provides HTTP routing, tokio-tungstenite handles WebSocket framing, Tokio executes async tasks

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:12-22 Cargo.lock:918-932


Core Components

Server Structure

The TokioRpcServer serves as the main entry point and coordinates all subsystems:

ComponentPurposeKey Responsibilities
TokioRpcServerMain server instanceServer lifecycle, handler registration, Axum router configuration
WebSocket HandlerAxum route handlerConnection upgrade, per-connection spawning, binary message loop
Endpoint ImplementationRpcServiceEndpointInterface implHandler lookup, request dispatching, response encoding
Handler RegistryShared stateThread-safe storage of registered RPC method handlers

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:12-17

Dependency Integration

Diagram: Key Dependencies and Their Relationships

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:12-22 Cargo.lock:918-932


sequenceDiagram
    participant Client
    participant AxumRouter as "Axum Router"
    participant WSHandler as "WebSocket Handler"
    participant Upgrade as "WebSocket Upgrade"
    participant ConnTask as "Connection Task\n(spawned)"
    participant Dispatcher as "RpcDispatcher"
    participant Endpoint as "Endpoint Interface"
    
    Client->>AxumRouter: HTTP GET /ws
    AxumRouter->>WSHandler: Route match
    WSHandler->>Upgrade: Upgrade to WebSocket
    Upgrade->>Client: 101 Switching Protocols
    Upgrade->>WSHandler: WebSocket stream
    
    WSHandler->>ConnTask: tokio::spawn
    ConnTask->>Dispatcher: Create new instance
    
    loop Message Loop
        Client->>ConnTask: Binary WebSocket Frame
        ConnTask->>Dispatcher: read_bytes(frame)
        Dispatcher->>Endpoint: Decode + identify requests
        Endpoint->>Endpoint: Execute handlers
        Endpoint->>Dispatcher: Encode responses
        Dispatcher->>ConnTask: Emit response bytes
        ConnTask->>Client: Binary WebSocket Frame
    end
    
    alt Client Disconnect
        Client->>ConnTask: Close frame
        ConnTask->>Dispatcher: Cleanup
        ConnTask->>ConnTask: Task exits
    end
    
    alt Server Shutdown
        WSHandler->>ConnTask: Shutdown signal
        ConnTask->>Client: Close frame
        ConnTask->>Dispatcher: Cleanup
        ConnTask->>ConnTask: Task exits
    end

Connection Lifecycle

Each WebSocket connection follows a well-defined lifecycle managed by the server:

Diagram: WebSocket Connection Lifecycle

Connection States

StateDescriptionTransitions
UpgradeHTTP connection being upgraded to WebSocket→ Connected
ConnectedActive WebSocket connection processing messages→ Disconnecting, → Error
DisconnectingGraceful shutdown in progress, flushing pending responses→ Disconnected
DisconnectedConnection closed, resources cleaned upTerminal state
ErrorAbnormal termination due to protocol or transport error→ Disconnected

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138


RPC Endpoint Implementation

The server implements RpcServiceEndpointInterface<C> where C is the per-connection context type. This implementation provides the bridge between WebSocket binary frames and the RPC protocol layer.

Handler Registration

The register_prebuffered method allows applications to register RPC method handlers at runtime:

Diagram: Handler Registration Flow

graph LR
    APP["Application Code"]
SERVER["TokioRpcServer"]
ENDPOINT["Endpoint Implementation"]
LOCK["Handler Registry\n(Arc&lt;Mutex&lt;HashMap&gt;&gt;)"]
APP -->|register_prebuffered method_id, handler| SERVER
 
   SERVER -->|delegate| ENDPOINT
 
   ENDPOINT -->|lock handlers| LOCK
 
   LOCK -->|insert| LOCK
 
   LOCK -->|check duplicates| LOCK

Key characteristics:

  • Thread-safe : Uses Arc and async-aware locking (tokio::sync::RwLock or similar)
  • Type-safe : Handlers accept Vec<u8> input and return Result<Vec<u8>, Box<dyn Error>>
  • Duplicate detection : Returns RpcServiceEndpointError::Handler if method ID already registered
  • Runtime registration : Handlers can be added after server start (though typically done during initialization)

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64 extensions/muxio-rpc-service-endpoint/Cargo.toml:21-22

flowchart TD
    START["read_bytes(dispatcher, context, bytes, on_emit)"]
subgraph Stage1["Stage 1: Decode & Identify (Sync)"]
DECODE["dispatcher.read_bytes(bytes)"]
CHECK["Check which requests finalized"]
EXTRACT["Extract finalized requests\nfrom dispatcher"]
end
    
    subgraph Stage2["Stage 2: Execute Handlers (Async)"]
LOOKUP["Look up handler by method_id"]
SPAWN["Spawn handler futures"]
AWAIT["join_all(futures)"]
end
    
    subgraph Stage3["Stage 3: Encode & Emit (Sync)"]
ENCODE["dispatcher.respond(response)"]
CHUNK["Chunk large payloads"]
EMIT["on_emit(bytes)"]
end
    
 
   START --> DECODE
 
   DECODE --> CHECK
 
   CHECK --> EXTRACT
    
 
   EXTRACT -->|for each request| LOOKUP
 
   LOOKUP --> SPAWN
 
   SPAWN --> AWAIT
    
 
   AWAIT -->|for each response| ENCODE
 
   ENCODE --> CHUNK
 
   CHUNK --> EMIT
    
 
   EMIT --> END["Return Ok()"]

Request Processing Pipeline

The read_bytes method implements the three-stage request processing pipeline:

Diagram: Request Processing Pipeline in read_bytes

Stage details:

  1. Decode & Identify extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-97

    • Synchronously processes raw bytes from WebSocket
    • Updates RpcDispatcher internal state
    • Collects fully-received requests ready for processing
    • No blocking I/O or async operations
  2. Execute Handlers extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-125

    • Spawns async handler futures for each request
    • Handlers execute concurrently via join_all
    • Each handler receives cloned context and request bytes
    • Returns vector of responses in same order as requests
  3. Encode & Emit extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:127-136

    • Synchronously encodes responses into RPC protocol format
    • Chunks large payloads (respects DEFAULT_SERVICE_MAX_CHUNK_SIZE)
    • Emits bytes via provided callback (typically writes to WebSocket)
    • Updates dispatcher state for correlation tracking

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138


sequenceDiagram
    participant WS as "WebSocket Stream"
    participant Task as "Connection Task"
    participant Dispatcher as "RpcDispatcher"
    participant Endpoint as "Endpoint::read_bytes"
    participant Emit as "on_emit Closure"
    
    loop Until Disconnection
        WS->>Task: next() -> Some(Message::Binary)
        Task->>Task: Extract bytes from message
        
        Task->>Endpoint: read_bytes(dispatcher, ctx, bytes, emit)
        
        Note over Endpoint: Stage 1: Decode
        Endpoint->>Dispatcher: dispatcher.read_bytes(bytes)
        Dispatcher-->>Endpoint: Vec&lt;request_id&gt;
        
        Note over Endpoint: Stage 2: Execute
        Endpoint->>Endpoint: Spawn handler futures
        Endpoint->>Endpoint: join_all(futures).await
        
        Note over Endpoint: Stage 3: Encode
        Endpoint->>Dispatcher: dispatcher.respond(response)
        Dispatcher->>Emit: on_emit(response_bytes)
        Emit->>Task: Collect bytes to send
        
        Endpoint-->>Task: Ok(())
        
        Task->>WS: send(Message::Binary(response_bytes))
    end
    
    alt WebSocket Close
        WS->>Task: next() -> Some(Message::Close)
        Task->>Task: Break loop
    end
    
    alt WebSocket Error
        WS->>Task: next() -> Some(Message::Error)
        Task->>Task: Log error, break loop
    end
    
    Task->>Dispatcher: Drop (cleanup)
    Task->>Task: Task exits

WebSocket Message Loop

Each spawned connection task runs a continuous message loop that bridges WebSocket frames to RPC processing:

Diagram: WebSocket Message Loop Detail

Key implementation aspects:

AspectImplementation Detail
Message Type HandlingOnly Message::Binary frames processed; text/ping/pong handled separately or ignored
BackpressureAsync send() naturally applies backpressure when client slow to receive
Error HandlingWebSocket errors logged via tracing, connection terminated gracefully
Emit CallbackClosure captures WebSocket sink, queues bytes for batched sending
Dispatcher LifetimeOne RpcDispatcher per connection, dropped on task exit

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138


Integration with Axum

The server integrates with Axum's routing and middleware system:

Diagram: Axum Integration Pattern

Integration points:

  1. Router Construction : Server provides method to convert into axum::Router
  2. WebSocket Route : Typically mounted at /ws or configurable path
  3. Handler Function : Accepts WebSocketUpgrade extractor from Axum
  4. Upgrade Callback : Receives upgraded socket, spawns per-connection task
  5. Middleware Compatibility : Works with standard Tower middleware (CORS, auth, logging)

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml12 Cargo.lock:80-114


classDiagram
    class Context {
        <<trait bounds>>
        +Send
        +Sync
        +Clone
        +'static
    }
    
    class ConnectionId {+u64 id\n+SocketAddr peer_addr\n+Instant connected_at}
    
    class AppState {+Arc~SharedData~ shared\n+Metrics metrics\n+AuthManager auth}
    
    Context <|-- ConnectionId : implements
    Context <|-- AppState : implements
    
    note for Context "Any type implementing these bounds\ncan be used as connection context"

Context and State Management

The server supports per-connection context via the generic C type parameter in RpcServiceEndpointInterface<C>:

Context Type Requirements

Diagram: Context Type Requirements

Common context patterns:

PatternUse CaseExample Fields
Connection MetadataTrack connection identity and timingconnection_id, peer_addr, connected_at
Authentication StateStore authenticated user informationuser_id, session_token, permissions
Application StateShare server-wide resourcesArc<Database>, Arc<ConfigManager>
Request ContextPer-request metadatatrace_id, request_start, client_version

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:9-13


Handler Execution Model

Handlers execute asynchronously with Tokio runtime integration:

Diagram: Handler Execution Flow

Execution characteristics:

CharacteristicBehavior
ConcurrencyMultiple handlers execute concurrently via join_all for requests in same batch
IsolationEach handler receives cloned context, preventing shared mutable state issues
CancellationIf connection drops, futures are dropped (handlers should handle cancellation gracefully)
Error HandlingHandler errors converted to RPC error responses, connection remains active
BackpressureWebSocket send backpressure naturally limits handler spawn rate

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-125


Error Handling

The server handles errors at multiple layers:

Error Types and Propagation

Error SourceError TypeHandling Strategy
WebSocket Protocoltungstenite::ErrorLog via tracing, close connection gracefully
RPC Dispatchermuxio::RpcErrorConvert to RPC error response, send to client
Handler ExecutionBox<dyn Error + Send + Sync>Encode as RPC error response, log details
Endpoint RegistrationRpcServiceEndpointErrorReturn to caller during setup, prevent server start
Serializationbitcode::ErrorTreat as invalid request, send error response

Error Response Flow

Diagram: Error Response Flow

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:32-34 extensions/muxio-tokio-rpc-server/Cargo.toml22


Performance Considerations

The Tokio RPC Server implementation includes several optimizations:

Message Batching

Diagram: Request Batching for Efficiency

Optimization Strategies

StrategyImplementationBenefit
Zero-copy when possibleUses bytes::Bytes for reference-counted buffersReduces memory allocations for large payloads
Concurrent handler executionjoin_all spawns all handlers in batchMaximizes CPU utilization for independent requests
Chunked responsesLarge responses split per DEFAULT_SERVICE_MAX_CHUNK_SIZEPrevents memory spikes, enables streaming-like behavior
Connection poolingEach connection has dedicated task, no contentionScales to thousands of concurrent connections
Async I/OAll I/O operations are async via TokioEfficient use of OS threads, high concurrency

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-125 extensions/muxio-tokio-rpc-server/Cargo.toml:13-14


Logging and Observability

The server integrates with the tracing ecosystem for structured logging:

Instrumentation Points

Diagram: Tracing Instrumentation Hierarchy

Key logging capabilities:

  • Connection lifecycle events with peer address
  • Request/response message IDs for correlation
  • Handler execution timing and errors
  • WebSocket protocol errors
  • Dispatcher state changes

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml22 extensions/muxio-rpc-service-endpoint/Cargo.toml18


Example: Building a Server

Typical server construction and lifecycle:

Diagram: Server Initialization Sequence

Sources: extensions/muxio-tokio-rpc-server/Cargo.toml:1-23 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64

Dismiss

Refresh this wiki

Enter email to refresh