This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
WebSocket RPC Application Example
Loading…
WebSocket RPC Application Example
Relevant source files
Purpose and Scope
This document provides a complete walkthrough of the example-muxio-ws-rpc-app demonstration application, showing how to build a functional WebSocket-based RPC system using muxio. The example covers server initialization, handler registration, client connection, and executing RPC calls with shared type-safe service definitions.
For information about defining custom RPC services, see Defining a Simple RPC Service. For details about cross-platform deployment strategies, see Cross-Platform Deployment. For the underlying transport mechanisms, see Tokio RPC Server and Tokio RPC Client.
Application Architecture
The example application demonstrates a complete client-server interaction cycle using three main components:
| Component | Crate | Role |
|---|---|---|
| Service Definitions | example-muxio-rpc-service-definition | Shared RPC method contracts (Add, Mult, Echo) |
| Server | example-muxio-ws-rpc-app (server code) | Tokio-based WebSocket server with handler registration |
| Client | example-muxio-ws-rpc-app (client code) | Tokio-based client making concurrent RPC calls |
Diagram: Example Application Component Structure
Sources :
Shared Service Definitions
The example uses three RPC methods defined in example-muxio-rpc-service-definition:
| Method | Purpose | Request Type | Response Type |
|---|---|---|---|
Add | Sum a vector of floats | Vec<f64> | f64 |
Mult | Multiply a vector of floats | Vec<f64> | f64 |
Echo | Echo back binary data | Vec<u8> | Vec<u8> |
Each method implements the RpcMethodPrebuffered trait, providing:
METHOD_ID: Compile-time generated method identifiercall(): Client-side invocation functionencode_request()/decode_request(): Request serializationencode_response()/decode_response(): Response serialization
Diagram: Service Definition Structure
graph LR
subgraph "RpcMethodPrebuffered Trait"
TRAIT["const METHOD_ID: u64\nencode_request\ndecode_request\nencode_response\ndecode_response\ncall"]
end
subgraph "Implementations"
ADD_IMPL["Add struct\nMETHOD_ID = xxhash('Add')\nRequest: Vec<f64>\nResponse: f64"]
MULT_IMPL["Mult struct\nMETHOD_ID = xxhash('Mult')\nRequest: Vec<f64>\nResponse: f64"]
ECHO_IMPL["Echo struct\nMETHOD_ID = xxhash('Echo')\nRequest: Vec<u8>\nResponse: Vec<u8>"]
end
TRAIT -.implemented by.-> ADD_IMPL
TRAIT -.implemented by.-> MULT_IMPL
TRAIT -.implemented by.-> ECHO_IMPL
Sources :
Server Setup and Initialization
The server initialization occurs in a dedicated block within main():
Diagram: Server Initialization Flow
sequenceDiagram
participant MAIN as main function
participant LISTENER as TcpListener
participant SERVER as RpcServer
participant ENDPOINT as endpoint
participant TASK as tokio::spawn
MAIN->>LISTENER: TcpListener::bind("127.0.0.1:0")
Note over MAIN: Binds to random available port
MAIN->>MAIN: tcp_listener_to_host_port
Note over MAIN: Extracts host and port
MAIN->>SERVER: RpcServer::new(None)
MAIN->>SERVER: Arc::new(server)
Note over SERVER: Wrapped in Arc for sharing
MAIN->>ENDPOINT: server.endpoint()
Note over ENDPOINT: Get handler registration interface
MAIN->>ENDPOINT: register_prebuffered(Add::METHOD_ID, handler)
MAIN->>ENDPOINT: register_prebuffered(Mult::METHOD_ID, handler)
MAIN->>ENDPOINT: register_prebuffered(Echo::METHOD_ID, handler)
Note over ENDPOINT: Handlers registered concurrently with join!
MAIN->>TASK: tokio::spawn(server.serve_with_listener)
Note over TASK: Server runs in background task
Sources :
TcpListener Binding
The server binds to a random available port using port 0:
Key Code Entities :
TcpListener::bind(): Tokio’s async TCP listenertcp_listener_to_host_port(): Utility to extract host/port from listener
Sources :
RpcServer Creation
The RpcServer is created and wrapped in Arc for shared ownership:
Key Code Entities :
RpcServer::new(None): Creates server with default configurationserver.endpoint(): ReturnsRpcServiceEndpointInterfacefor handler registrationArc: Enables shared ownership across async tasks
Sources :
graph TB
ENDPOINT["endpoint\n(RpcServiceEndpointInterface)"]
subgraph "Handler Closure Signature"
CLOSURE["async move closure\n/request_bytes: Vec<u8>, _ctx/ -> Result"]
end
subgraph "Handler Implementation Steps"
DECODE["1. Decode request bytes\nMethod::decode_request(&request_bytes)"]
PROCESS["2. Process business logic\n(sum, product, echo)"]
ENCODE["3. Encode response\nMethod::encode_response(result)"]
end
ENDPOINT -->|register_prebuffered METHOD_ID, closure| CLOSURE
CLOSURE --> DECODE
DECODE --> PROCESS
PROCESS --> ENCODE
ENCODE -->|Ok response_bytes| ENDPOINT
Handler Registration
Handlers are registered using the register_prebuffered method, which accepts a method ID and an async closure:
Diagram: Handler Registration Pattern
Sources :
Add Handler Example
The Add handler sums a vector of floats:
Key Operations :
Add::decode_request(): Deserializes request bytes intoVec<f64>iter().sum(): Computes sum using standard iterator methodsAdd::encode_response(): Serializesf64result back to bytes
Sources :
Concurrent Handler Registration
All handlers are registered concurrently using tokio::join!:
This ensures all handlers are registered before the server begins accepting connections.
Sources :
Server Task Spawning
The server is spawned into a background task:
Key Code Entities :
tokio::spawn(): Spawns async task on Tokio runtimeArc::clone(&server): Clones Arc reference for task ownershipserve_with_listener(): Accepts connections and dispatches to handlers
Sources :
sequenceDiagram
participant MAIN as main function
participant CLIENT as RpcClient
participant HANDLER as state_change_handler
participant SERVER as RpcServer
MAIN->>MAIN: tokio::time::sleep(200ms)
Note over MAIN: Wait for server startup
MAIN->>CLIENT: RpcClient::new(host, port)
CLIENT->>SERVER: WebSocket connection
SERVER-->>CLIENT: Connection established
MAIN->>CLIENT: set_state_change_handler(callback)
Note over HANDLER: Callback invoked on state changes
MAIN->>CLIENT: Method::call(&client, params)
Note over MAIN: Ready to make RPC calls
Client Connection and Configuration
The client establishes a WebSocket connection to the server:
Diagram: Client Initialization Flow
Sources :
RpcClient Creation
Key Code Entities :
RpcClient::new(): Creates client and initiates WebSocket connection- Parameters: Server host (String) and port (u16)
- Returns:
Result<RpcClient>on successful connection
Sources :
State Change Handling
The client supports optional state change callbacks:
Key Code Entities :
set_state_change_handler(): Registers callback for transport state changesRpcTransportState: Enum representing connection states (Connected, Disconnected, etc.)- Callback signature:
Fn(RpcTransportState)
Sources :
graph LR
CLIENT["client code"]
CALL_METHOD["Method::call\n(client, params)"]
CALLER_IF["RpcServiceCallerInterface\ncall_prebuffered"]
DISPATCHER["RpcDispatcher\nassign request_id\ntrack pending"]
SESSION["RpcSession\nallocate stream_id\nencode frames"]
WS["WebSocket transport"]
CLIENT --> CALL_METHOD
CALL_METHOD --> CALLER_IF
CALLER_IF --> DISPATCHER
DISPATCHER --> SESSION
SESSION --> WS
WS -.response frames.-> SESSION
SESSION -.decoded response.-> DISPATCHER
DISPATCHER -.correlated result.-> CALLER_IF
CALLER_IF -.deserialized.-> CALL_METHOD
CALL_METHOD -.return value.-> CLIENT
Making RPC Calls
RPC calls are made using the static call() method on each service definition:
Diagram: RPC Call Execution Pattern
Sources :
Concurrent Call Execution
Multiple calls can be executed concurrently using tokio::join!:
Key Features :
- All six calls execute concurrently over the same WebSocket connection
- Each call gets a unique request ID for response correlation
- Stream multiplexing allows interleaved responses
join!waits for all responses before proceeding
Sources :
Call Syntax and Type Safety
The call() method is generic and type-safe:
| Call | Input Type | Return Type | Implementation |
|---|---|---|---|
Add::call(&client, vec![1.0, 2.0, 3.0]) | Vec<f64> | Result<f64> | Sums inputs |
Mult::call(&client, vec![8.0, 3.0, 7.0]) | Vec<f64> | Result<f64> | Multiplies inputs |
Echo::call(&client, b"test".into()) | Vec<u8> | Result<Vec<u8>> | Echoes input |
Type Safety Guarantees :
- Compile-time verification of parameter types
- Compile-time verification of return types
- Mismatched types result in compilation errors, not runtime errors
Sources :
Result Validation
The example validates all results with assertions:
Sources :
Complete Execution Flow
Diagram: End-to-End Request/Response Flow
Sources :
Running the Example
The example is located in the example-muxio-ws-rpc-app crate and can be executed with:
Expected Output :
[INFO] Transport state changed to: Connected
[INFO] All assertions passed
The example demonstrates:
- Server starts on random port
- Handlers registered for Add, Mult, Echo
- Client connects via WebSocket
- Six concurrent RPC calls execute successfully
- All responses validated with assertions
- Automatic cleanup on completion
Sources :
Key Takeaways
| Concept | Implementation |
|---|---|
| Shared Definitions | Service methods defined once in example-muxio-rpc-service-definition, used by both client and server |
| Type Safety | Compile-time verification of request/response types via RpcMethodPrebuffered trait |
| Concurrency | Multiple RPC calls multiplex over single WebSocket connection |
| Async Handlers | Server handlers are async closures, enabling non-blocking execution |
| State Management | Optional state change callbacks for connection monitoring |
| Zero Boilerplate | Method calls use simple Method::call(client, params) syntax |
This example provides a foundation for building production WebSocket RPC applications. For streaming RPC patterns, see Streaming RPC Calls. For WASM client integration, see WASM RPC Client.
Sources :