Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Tokio RPC Server

Loading…

Tokio RPC Server

Relevant source files

Purpose and Scope

The muxio-tokio-rpc-server crate provides a production-ready WebSocket RPC server implementation for native Rust environments using the Tokio async runtime. This server integrates the muxio RPC framework with Axum’s HTTP/WebSocket capabilities and tokio-tungstenite’s WebSocket protocol handling.

This document covers the server-side implementation for native applications. For client-side Tokio implementations, see Tokio RPC Client. For browser-based clients, see WASM RPC Client. For general information about service definitions and endpoint interfaces, see Service Endpoint Interface.

Sources:


Architecture Overview

The Tokio RPC Server sits at the intersection of three major subsystems: Axum’s HTTP framework, the muxio core multiplexing layer, and the RPC service framework. It provides a complete server implementation that accepts WebSocket connections, manages multiple concurrent clients, and dispatches RPC requests to registered handlers.

graph TB
    subgraph "Application Layer"
        APP["Application Code\nService Handler Implementations"]
end
    
    subgraph "Server Interface Layer - muxio-tokio-rpc-server"
        RPC_SERVER["RpcServer\nMain server struct\nHandler registration\nConnection management"]
WS_HANDLER["WebSocket Handler\nhandle_websocket_connection\nPer-connection async task"]
end
    
    subgraph "RPC Framework Integration"
        ENDPOINT["RpcServiceEndpointInterface\nDispatcher to handlers\nMethod routing"]
CALLER["RpcServiceCallerInterface\nServer-to-client calls\nBidirectional RPC"]
DISPATCHER["RpcDispatcher\nRequest correlation\nResponse routing"]
end
    
    subgraph "Transport Layer"
        AXUM["Axum Framework\nHTTP routing\nWebSocket upgrade"]
TUNGSTENITE["tokio-tungstenite\nWebSocket protocol\nFrame handling"]
end
    
    subgraph "Core Multiplexing"
        SESSION["RpcSession\nStream multiplexing\nFrame mux/demux"]
end
    
 
   APP --> RPC_SERVER
 
   RPC_SERVER --> WS_HANDLER
 
   RPC_SERVER --> ENDPOINT
 
   WS_HANDLER --> CALLER
 
   WS_HANDLER --> ENDPOINT
 
   WS_HANDLER --> DISPATCHER
 
   WS_HANDLER --> AXUM
 
   AXUM --> TUNGSTENITE
 
   DISPATCHER --> SESSION
 
   ENDPOINT --> DISPATCHER
 
   CALLER --> DISPATCHER
 
   TUNGSTENITE --> SESSION
    
    style RPC_SERVER fill:#e1f5ff
    style WS_HANDLER fill:#e1f5ff

Server Component Stack

Description: The server is built in layers. The RpcServer struct provides the high-level interface for configuring the server and registering handlers. When a WebSocket connection arrives via Axum, a dedicated handle_websocket_connection task is spawned. This task instantiates both an RpcServiceEndpointInterface (for handling incoming client requests) and an RpcServiceCallerInterface (for making server-initiated calls to the client), both sharing a single RpcDispatcher and RpcSession for the connection.

Sources:


Key Components

RpcServer Structure

The main server structure manages the overall server lifecycle, route configuration, and handler registry. It integrates with Axum’s routing system to expose WebSocket endpoints.

ComponentTypePurpose
RpcServerMain structServer configuration, handler registration, Axum router integration
Handler registryInternal storageMaintains registered RPC service handlers
Axum routeraxum::RouterHTTP routing and WebSocket upgrade handling
Connection trackingState managementTracks active connections and connection state

Key Responsibilities:

  • Registering service handlers via RpcServiceEndpointInterface
  • Creating Axum routes for WebSocket endpoints
  • Spawning per-connection tasks
  • Managing server lifecycle (start, stop, graceful shutdown)

Sources:

WebSocket Connection Handler

Each WebSocket connection is handled by a dedicated async task that manages the complete lifecycle of that connection.

Description: The connection handler orchestrates all communication for a single client. It creates a shared RpcDispatcher, instantiates both endpoint and caller interfaces, and manages the read/write loops for WebSocket frames. This enables full bidirectional RPC - clients can call server methods, and the server can call client methods over the same connection.

sequenceDiagram
    participant Axum as "Axum Router"
    participant Handler as "handle_websocket_connection"
    participant Endpoint as "RpcServiceEndpointInterface"
    participant Caller as "RpcServiceCallerInterface"
    participant Dispatcher as "RpcDispatcher"
    participant WS as "WebSocket\n(tokio-tungstenite)"
    
    Axum->>Handler: WebSocket upgrade
    Handler->>Handler: Create RpcDispatcher
    Handler->>Endpoint: Instantiate with dispatcher
    Handler->>Caller: Instantiate with dispatcher
    Handler->>Handler: Spawn read loop
    Handler->>Handler: Spawn write loop
    
    Note over Handler,WS: Connection Active
    
    WS->>Handler: Binary frame received
    Handler->>Dispatcher: Feed bytes
    Dispatcher->>Endpoint: Route request
    Endpoint->>Handler: Handler result
    Handler->>Dispatcher: Send response
    Dispatcher->>WS: Binary frames
    
    Note over Handler,WS: Bidirectional RPC Support
    
    Handler->>Caller: call(request)
    Caller->>Dispatcher: Outgoing request
    Dispatcher->>WS: Binary frames
    WS->>Handler: Response frames
    Handler->>Dispatcher: Feed bytes
    Dispatcher->>Caller: Deliver response
    
    Note over Handler,WS: Connection Closing
    
    WS->>Handler: Close frame
    Handler->>Endpoint: Cleanup
    Handler->>Caller: Cleanup
    Handler->>Handler: Task exit

Sources:


WebSocket Integration with Axum

The server leverages Axum’s built-in WebSocket support to handle the HTTP upgrade handshake and ongoing WebSocket communication.

graph LR
    subgraph "Axum Router Configuration"
        ROUTER["axum::Router"]
ROUTE["WebSocket Route\n/ws or custom path"]
UPGRADE["WebSocket Upgrade Handler\naxum::extract::ws"]
end
    
    subgraph "Handler Function"
        WS_FN["handle_websocket_connection\nAsync task per connection"]
EXTRACT["Extract WebSocket\nfrom HTTP upgrade"]
end
    
    subgraph "Connection State"
        DISPATCHER_STATE["Arc<Mutex<RpcDispatcher>>"]
ENDPOINT_STATE["Arc<RpcServiceEndpointInterface>"]
HANDLER_REGISTRY["Handler Registry\nRegistered service handlers"]
end
    
 
   ROUTER --> ROUTE
 
   ROUTE --> UPGRADE
 
   UPGRADE --> WS_FN
 
   WS_FN --> EXTRACT
 
   WS_FN --> DISPATCHER_STATE
 
   WS_FN --> ENDPOINT_STATE
 
   WS_FN --> HANDLER_REGISTRY
    
    style ROUTER fill:#fff4e1
    style WS_FN fill:#e1f5ff

Route Configuration

Description: The server configures an Axum route (typically at /ws) that handles WebSocket upgrades. When a client connects, Axum invokes the handler function with the upgraded WebSocket. The handler then creates the necessary RPC infrastructure (dispatcher, endpoint, caller) and enters the connection loop.

Integration Points:

  • axum::extract::ws::WebSocketUpgrade - HTTP to WebSocket upgrade
  • axum::extract::ws::WebSocket - Bidirectional WebSocket stream
  • axum::routing::get() - Route registration
  • tokio::spawn() - Connection task spawning

Sources:


Handler Registration and Dispatch

Service handlers are registered with the server before it starts, allowing the endpoint to route incoming requests to the appropriate handler implementations.

Handler Registration Flow

StepActionCode Entity
1Create RpcServer instanceRpcServer::new() or ::builder()
2Register service handlersregister_handler<T>() method
3Build Axum routerInternal router configuration
4Start serverserve() or run() method

Request Dispatch Mechanism

Description: When a complete RPC request is received and decoded, the RpcServiceEndpointInterface looks up the registered handler by method ID. The handler is invoked asynchronously, and its result is serialized and sent back through the same connection. Error handling is built into each layer, with errors propagated back to the client as RpcServiceError responses.

Sources:


graph TB
    subgraph "Per-Connection State"
        SHARED_DISPATCHER["Arc<TokioMutex<RpcDispatcher>>\nShared between endpoint & caller"]
end
    
    subgraph "Server-to-Client Direction"
        CALLER["RpcServiceCallerInterface\nServer-initiated calls"]
CALLER_LOGIC["Call Logic\nawait response from client"]
end
    
    subgraph "Client-to-Server Direction"
        ENDPOINT["RpcServiceEndpointInterface\nClient-initiated calls"]
HANDLER["Registered Handlers\nServer-side implementations"]
end
    
    subgraph "Transport Sharing"
        READ_LOOP["Read Loop\nReceive WebSocket frames\nFeed to dispatcher"]
WRITE_LOOP["Write Loop\nSend WebSocket frames\nFrom dispatcher"]
WEBSOCKET["WebSocket Connection\nSingle TCP connection"]
end
    
 
   CALLER --> SHARED_DISPATCHER
 
   ENDPOINT --> SHARED_DISPATCHER
 
   SHARED_DISPATCHER --> READ_LOOP
 
   SHARED_DISPATCHER --> WRITE_LOOP
 
   READ_LOOP --> WEBSOCKET
 
   WRITE_LOOP --> WEBSOCKET
 
   CALLER_LOGIC --> CALLER
 
   HANDLER --> ENDPOINT
    
    style SHARED_DISPATCHER fill:#e1f5ff
    style WEBSOCKET fill:#fff4e1

Bidirectional RPC Support

A key feature of the Tokio RPC Server is support for server-initiated RPC calls to connected clients. This enables push notifications, server-side events, and bidirectional request/response patterns.

Bidirectional Communication Architecture

Description: Both the RpcServiceEndpointInterface and RpcServiceCallerInterface share the same RpcDispatcher and underlying RpcSession. This allows both directions to multiplex their requests over the same WebSocket connection. The dispatcher ensures that request IDs are unique and that responses are routed to the correct awaiting caller, whether that’s a client waiting for a server response or a server waiting for a client response.

Use Cases:

  • Server pushing real-time updates to clients
  • Server requesting information from clients
  • Peer-to-peer style communication patterns
  • Event-driven architectures where both sides can initiate actions

Sources:


Connection Lifecycle Management

The server manages the complete lifecycle of each WebSocket connection, from initial upgrade through active communication to graceful or abrupt termination.

Connection States

StateDescriptionTransitions
ConnectingWebSocket upgrade in progressConnected
ConnectedActive RPC communicationDisconnecting, Error
DisconnectingGraceful shutdown initiatedDisconnected
DisconnectedConnection closed cleanlyTerminal state
ErrorConnection error occurredDisconnected

Lifecycle State Machine

Description: The connection lifecycle is managed by the per-connection task. State transitions are tracked internally, and state change callbacks (if registered) are invoked to notify application code of connection events. Resources are properly cleaned up in all terminal states.

Resource Cleanup:

  • WebSocket stream closed
  • Per-stream decoders removed from RpcSession
  • Pending requests in RpcDispatcher completed with error
  • Connection removed from server’s tracking state
  • Task exit and memory deallocation

Sources:


graph LR
    subgraph "Inbound Path"
        WS_MSG["WebSocket Binary Message\ntokio_tungstenite::Message"]
BYTES["Bytes Extract\nVec<u8> or Bytes"]
FEED["dispatcher.feed_bytes()\nProcess frame data"]
DECODE["RpcSession Decode\nReconstruct streams"]
EVENT["RpcDispatcher Events\nComplete requests/responses"]
end
    
    subgraph "Outbound Path"
        RESPONSE["Response Data\nFrom handlers or caller"]
ENCODE["RpcSession Encode\nFrame into chunks"]
WRITE_QUEUE["Write Queue\nPending frames"]
WS_SEND["WebSocket Send\nBinary message"]
end
    
 
   WS_MSG --> BYTES
 
   BYTES --> FEED
 
   FEED --> DECODE
 
   DECODE --> EVENT
    
 
   RESPONSE --> ENCODE
 
   ENCODE --> WRITE_QUEUE
 
   WRITE_QUEUE --> WS_SEND
    
    style FEED fill:#e1f5ff
    style ENCODE fill:#e1f5ff

Binary Frame Processing

The server handles the low-level details of converting between WebSocket binary messages and muxio’s framing protocol.

Frame Processing Pipeline

Description: WebSocket messages arrive as binary frames. The server extracts the byte payload and feeds it to the RpcDispatcher, which uses the RpcSession to demultiplex and decode the frames. In the outbound direction, responses are encoded by RpcSession into chunked frames and queued for WebSocket transmission.

Frame Format Details:

  • Binary WebSocket frames only (text frames rejected)
  • Frames may be chunked at the WebSocket layer (transparent to muxio)
  • muxio’s internal chunking is based on DEFAULT_MAX_CHUNK_SIZE
  • Stream multiplexing allows multiple concurrent operations

Sources:


Server Configuration and Builder Pattern

The server typically provides a builder pattern for configuration before starting.

Configuration Options

OptionPurposeDefault
Bind addressTCP address and port127.0.0.1:3000 (typical)
WebSocket pathRoute path for WebSocket upgrade/ws (typical)
Connection limitsMax concurrent connectionsUnlimited (configurable)
Timeout settingsConnection and request timeoutsPlatform defaults
Handler registryRegistered RPC service handlersEmpty (must register)
State change handlersLifecycle event callbacksNone (optional)

Typical Server Setup Pattern

Description: The server is configured using a builder pattern. Handlers are registered before the server starts. Once build() is called, the server configures its Axum router with the WebSocket route. The run() or serve() method starts the server’s event loop.

Sources:


Integration with Tokio Async Runtime

The server is fully integrated with the Tokio async runtime, using async/await throughout.

Async Task Structure

Task TypePurposeLifetime
Server listenerAccept incoming connectionsUntil server shutdown
Per-connection taskHandle one WebSocket connectionUntil connection closes
Read loopProcess incoming WebSocket framesPer-connection lifetime
Write loopSend outgoing WebSocket framesPer-connection lifetime
Handler executionExecute registered RPC handlersPer-request duration

Async Patterns:

  • async fn for all I/O operations
  • tokio::spawn() for concurrent task creation
  • tokio::select! for graceful shutdown coordination
  • TokioMutex for shared state protection
  • futures::stream::StreamExt for frame processing

Concurrency Model:

  • One Tokio task per connection
  • Handlers executed concurrently on Tokio thread pool
  • Multiplexed streams allow concurrent RPC calls per connection
  • Backpressure handled at WebSocket layer

Sources:


graph TB
    subgraph "Error Sources"
        WS_ERR["WebSocket Errors\nConnection failures\nProtocol violations"]
DECODE_ERR["Decode Errors\nInvalid frames\nMalformed data"]
HANDLER_ERR["Handler Errors\nBusiness logic failures\nRpcServiceError"]
DISPATCHER_ERR["Dispatcher Errors\nUnknown request ID\nMutex poisoning"]
end
    
    subgraph "Error Handling"
        LOG["tracing::error!\nStructured logging"]
CLIENT_RESP["Error Response\nRpcServiceError to client"]
DISCONNECT["Connection Close\nUnrecoverable errors"]
STATE_CALLBACK["State Change Handler\nNotify application"]
end
    
 
   WS_ERR --> LOG
 
   WS_ERR --> DISCONNECT
 
   DECODE_ERR --> LOG
 
   DECODE_ERR --> CLIENT_RESP
 
   HANDLER_ERR --> LOG
 
   HANDLER_ERR --> CLIENT_RESP
 
   DISPATCHER_ERR --> LOG
 
   DISPATCHER_ERR --> DISCONNECT
    
 
   DISCONNECT --> STATE_CALLBACK
    
    style LOG fill:#fff4e1
    style DISCONNECT fill:#ffe1e1

Error Handling and Observability

The server provides comprehensive error handling and integrates with the tracing ecosystem for observability.

Error Propagation

Description: Errors are categorized by severity and handled appropriately. Handler errors are serialized and sent to the client as RpcServiceError responses. Transport and protocol errors are logged and result in connection termination. All errors are emitted as structured tracing events for monitoring and debugging.

Tracing Integration:

  • #[tracing::instrument] on key functions
  • Span context for each connection
  • Structured fields for method IDs, request IDs, error codes
  • Error, warn, info, debug, and trace level events

Sources:


Performance Considerations

The Tokio RPC Server is designed for high-performance scenarios with multiple concurrent connections and high request throughput.

Performance Characteristics

MetricBehaviorTuning
ConnectionsO(n) tasks, minimal per-connection overheadTokio thread pool size
ThroughputStream multiplexing enables pipeliningDEFAULT_MAX_CHUNK_SIZE
LatencyLow - single async task hop per requestHandler execution time
MemoryIncremental - per-stream decoders onlyConnection limits

Optimization Strategies:

  • Stream multiplexing reduces head-of-line blocking
  • Binary protocol minimizes serialization overhead
  • Zero-copy where possible (using bytes::Bytes)
  • Efficient buffer management in RpcSession
  • Concurrent handler execution on Tokio thread pool

Sources: