Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

WebSocket RPC Application Example

Loading…

WebSocket RPC Application Example

Relevant source files

Purpose and Scope

This document provides a complete walkthrough of the example-muxio-ws-rpc-app demonstration application, showing how to build a functional WebSocket-based RPC system using muxio. The example covers server initialization, handler registration, client connection, and executing RPC calls with shared type-safe service definitions.

For information about defining custom RPC services, see Defining a Simple RPC Service. For details about cross-platform deployment strategies, see Cross-Platform Deployment. For the underlying transport mechanisms, see Tokio RPC Server and Tokio RPC Client.


Application Architecture

The example application demonstrates a complete client-server interaction cycle using three main components:

ComponentCrateRole
Service Definitionsexample-muxio-rpc-service-definitionShared RPC method contracts (Add, Mult, Echo)
Serverexample-muxio-ws-rpc-app (server code)Tokio-based WebSocket server with handler registration
Clientexample-muxio-ws-rpc-app (client code)Tokio-based client making concurrent RPC calls

Diagram: Example Application Component Structure

Sources :


Shared Service Definitions

The example uses three RPC methods defined in example-muxio-rpc-service-definition:

MethodPurposeRequest TypeResponse Type
AddSum a vector of floatsVec<f64>f64
MultMultiply a vector of floatsVec<f64>f64
EchoEcho back binary dataVec<u8>Vec<u8>

Each method implements the RpcMethodPrebuffered trait, providing:

  • METHOD_ID: Compile-time generated method identifier
  • call(): Client-side invocation function
  • encode_request() / decode_request(): Request serialization
  • encode_response() / decode_response(): Response serialization

Diagram: Service Definition Structure

graph LR
    subgraph "RpcMethodPrebuffered Trait"
        TRAIT["const METHOD_ID: u64\nencode_request\ndecode_request\nencode_response\ndecode_response\ncall"]
end
    
    subgraph "Implementations"
        ADD_IMPL["Add struct\nMETHOD_ID = xxhash('Add')\nRequest: Vec&lt;f64&gt;\nResponse: f64"]
MULT_IMPL["Mult struct\nMETHOD_ID = xxhash('Mult')\nRequest: Vec&lt;f64&gt;\nResponse: f64"]
ECHO_IMPL["Echo struct\nMETHOD_ID = xxhash('Echo')\nRequest: Vec&lt;u8&gt;\nResponse: Vec&lt;u8&gt;"]
end
    
    TRAIT -.implemented by.-> ADD_IMPL
    TRAIT -.implemented by.-> MULT_IMPL
    TRAIT -.implemented by.-> ECHO_IMPL

Sources :


Server Setup and Initialization

The server initialization occurs in a dedicated block within main():

Diagram: Server Initialization Flow

sequenceDiagram
    participant MAIN as main function
    participant LISTENER as TcpListener
    participant SERVER as RpcServer
    participant ENDPOINT as endpoint
    participant TASK as tokio::spawn
    
    MAIN->>LISTENER: TcpListener::bind("127.0.0.1:0")
    Note over MAIN: Binds to random available port
    
    MAIN->>MAIN: tcp_listener_to_host_port
    Note over MAIN: Extracts host and port
    
    MAIN->>SERVER: RpcServer::new(None)
    MAIN->>SERVER: Arc::new(server)
    Note over SERVER: Wrapped in Arc for sharing
    
    MAIN->>ENDPOINT: server.endpoint()
    Note over ENDPOINT: Get handler registration interface
    
    MAIN->>ENDPOINT: register_prebuffered(Add::METHOD_ID, handler)
    MAIN->>ENDPOINT: register_prebuffered(Mult::METHOD_ID, handler)
    MAIN->>ENDPOINT: register_prebuffered(Echo::METHOD_ID, handler)
    Note over ENDPOINT: Handlers registered concurrently with join!
    
    MAIN->>TASK: tokio::spawn(server.serve_with_listener)
    Note over TASK: Server runs in background task

Sources :

TcpListener Binding

The server binds to a random available port using port 0:

Key Code Entities :

  • TcpListener::bind(): Tokio’s async TCP listener
  • tcp_listener_to_host_port(): Utility to extract host/port from listener

Sources :

RpcServer Creation

The RpcServer is created and wrapped in Arc for shared ownership:

Key Code Entities :

  • RpcServer::new(None): Creates server with default configuration
  • server.endpoint(): Returns RpcServiceEndpointInterface for handler registration
  • Arc: Enables shared ownership across async tasks

Sources :


graph TB
    ENDPOINT["endpoint\n(RpcServiceEndpointInterface)"]
subgraph "Handler Closure Signature"
        CLOSURE["async move closure\n/request_bytes: Vec&lt;u8&gt;, _ctx/ -&gt; Result"]
end
    
    subgraph "Handler Implementation Steps"
        DECODE["1. Decode request bytes\nMethod::decode_request(&request_bytes)"]
PROCESS["2. Process business logic\n(sum, product, echo)"]
ENCODE["3. Encode response\nMethod::encode_response(result)"]
end
    
 
   ENDPOINT -->|register_prebuffered METHOD_ID, closure| CLOSURE
 
   CLOSURE --> DECODE
 
   DECODE --> PROCESS
 
   PROCESS --> ENCODE
 
   ENCODE -->|Ok response_bytes| ENDPOINT

Handler Registration

Handlers are registered using the register_prebuffered method, which accepts a method ID and an async closure:

Diagram: Handler Registration Pattern

Sources :

Add Handler Example

The Add handler sums a vector of floats:

Key Operations :

  1. Add::decode_request(): Deserializes request bytes into Vec<f64>
  2. iter().sum(): Computes sum using standard iterator methods
  3. Add::encode_response(): Serializes f64 result back to bytes

Sources :

Concurrent Handler Registration

All handlers are registered concurrently using tokio::join!:

This ensures all handlers are registered before the server begins accepting connections.

Sources :

Server Task Spawning

The server is spawned into a background task:

Key Code Entities :

  • tokio::spawn(): Spawns async task on Tokio runtime
  • Arc::clone(&server): Clones Arc reference for task ownership
  • serve_with_listener(): Accepts connections and dispatches to handlers

Sources :


sequenceDiagram
    participant MAIN as main function
    participant CLIENT as RpcClient
    participant HANDLER as state_change_handler
    participant SERVER as RpcServer
    
    MAIN->>MAIN: tokio::time::sleep(200ms)
    Note over MAIN: Wait for server startup
    
    MAIN->>CLIENT: RpcClient::new(host, port)
    CLIENT->>SERVER: WebSocket connection
    SERVER-->>CLIENT: Connection established
    
    MAIN->>CLIENT: set_state_change_handler(callback)
    Note over HANDLER: Callback invoked on state changes
    
    MAIN->>CLIENT: Method::call(&client, params)
    Note over MAIN: Ready to make RPC calls

Client Connection and Configuration

The client establishes a WebSocket connection to the server:

Diagram: Client Initialization Flow

Sources :

RpcClient Creation

Key Code Entities :

  • RpcClient::new(): Creates client and initiates WebSocket connection
  • Parameters: Server host (String) and port (u16)
  • Returns: Result<RpcClient> on successful connection

Sources :

State Change Handling

The client supports optional state change callbacks:

Key Code Entities :

  • set_state_change_handler(): Registers callback for transport state changes
  • RpcTransportState: Enum representing connection states (Connected, Disconnected, etc.)
  • Callback signature: Fn(RpcTransportState)

Sources :


graph LR
    CLIENT["client code"]
CALL_METHOD["Method::call\n(client, params)"]
CALLER_IF["RpcServiceCallerInterface\ncall_prebuffered"]
DISPATCHER["RpcDispatcher\nassign request_id\ntrack pending"]
SESSION["RpcSession\nallocate stream_id\nencode frames"]
WS["WebSocket transport"]
CLIENT --> CALL_METHOD
 
   CALL_METHOD --> CALLER_IF
 
   CALLER_IF --> DISPATCHER
 
   DISPATCHER --> SESSION
 
   SESSION --> WS
    
    WS -.response frames.-> SESSION
    SESSION -.decoded response.-> DISPATCHER
    DISPATCHER -.correlated result.-> CALLER_IF
    CALLER_IF -.deserialized.-> CALL_METHOD
    CALL_METHOD -.return value.-> CLIENT

Making RPC Calls

RPC calls are made using the static call() method on each service definition:

Diagram: RPC Call Execution Pattern

Sources :

Concurrent Call Execution

Multiple calls can be executed concurrently using tokio::join!:

Key Features :

  • All six calls execute concurrently over the same WebSocket connection
  • Each call gets a unique request ID for response correlation
  • Stream multiplexing allows interleaved responses
  • join! waits for all responses before proceeding

Sources :

Call Syntax and Type Safety

The call() method is generic and type-safe:

CallInput TypeReturn TypeImplementation
Add::call(&client, vec![1.0, 2.0, 3.0])Vec<f64>Result<f64>Sums inputs
Mult::call(&client, vec![8.0, 3.0, 7.0])Vec<f64>Result<f64>Multiplies inputs
Echo::call(&client, b"test".into())Vec<u8>Result<Vec<u8>>Echoes input

Type Safety Guarantees :

  • Compile-time verification of parameter types
  • Compile-time verification of return types
  • Mismatched types result in compilation errors, not runtime errors

Sources :

Result Validation

The example validates all results with assertions:

Sources :


Complete Execution Flow

Diagram: End-to-End Request/Response Flow

Sources :


Running the Example

The example is located in the example-muxio-ws-rpc-app crate and can be executed with:

Expected Output :

[INFO] Transport state changed to: Connected
[INFO] All assertions passed

The example demonstrates:

  1. Server starts on random port
  2. Handlers registered for Add, Mult, Echo
  3. Client connects via WebSocket
  4. Six concurrent RPC calls execute successfully
  5. All responses validated with assertions
  6. Automatic cleanup on completion

Sources :


Key Takeaways

ConceptImplementation
Shared DefinitionsService methods defined once in example-muxio-rpc-service-definition, used by both client and server
Type SafetyCompile-time verification of request/response types via RpcMethodPrebuffered trait
ConcurrencyMultiple RPC calls multiplex over single WebSocket connection
Async HandlersServer handlers are async closures, enabling non-blocking execution
State ManagementOptional state change callbacks for connection monitoring
Zero BoilerplateMethod calls use simple Method::call(client, params) syntax

This example provides a foundation for building production WebSocket RPC applications. For streaming RPC patterns, see Streaming RPC Calls. For WASM client integration, see WASM RPC Client.

Sources :