This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
WebSocket RPC Application
Relevant source files
This page provides a detailed walkthrough of the example-muxio-ws-rpc-app demonstration, which showcases a complete WebSocket-based RPC application using the Muxio framework. This example illustrates how to create a server with registered RPC handlers, connect a client, perform concurrent RPC calls, and gracefully shut down.
Scope : This page focuses specifically on the example application structure and implementation. For information about creating custom service definitions, see Creating Service Definitions. For details about the Tokio server implementation, see Tokio RPC Server. For details about the Tokio client implementation, see Tokio RPC Client.
Overview
The WebSocket RPC application demonstrates the complete lifecycle of a Muxio-based RPC service:
- Server Setup : Binds to a random TCP port and registers RPC method handlers
- Client Connection : Connects to the server via WebSocket
- Concurrent RPC Execution : Makes multiple simultaneous RPC calls over a single connection
- State Management : Monitors connection state changes via callbacks
- Request/Response Verification : Validates that all responses match expected values
Sources : README.md:64-161
Application Structure
The example application is organized into two main workspace crates:
| Crate | Purpose | Key Components |
|---|---|---|
example-muxio-ws-rpc-app | Application executable | Server setup, client execution, main loop |
example-muxio-rpc-service-definition | Shared service contract | Add, Mult, Echo method definitions |
The shared service definition crate ensures compile-time type safety between client and server by providing a single source of truth for RPC method signatures, parameter types, and return types.
Sources : README.md:67-73
graph LR
subgraph "Shared Contract"
SERVICE_DEF["example-muxio-rpc-service-definition\nAdd, Mult, Echo"]
end
subgraph "Server Side"
SERVER_IMPL["Server Implementation\nDecode → Compute → Encode"]
end
subgraph "Client Side"
CLIENT_IMPL["Client Implementation\nEncode → Send → Decode"]
end
SERVICE_DEF -->|Defines API| SERVER_IMPL
SERVICE_DEF -->|Defines API| CLIENT_IMPL
CLIENT_IMPL -->|WebSocket Binary Frames| SERVER_IMPL
Service Definitions
The example uses three prebuffered RPC methods, all defined in the example-muxio-rpc-service-definition crate:
Method Inventory
| Method | Input Type | Output Type | Operation |
|---|---|---|---|
Add | Vec<f64> | f64 | Sum of all input values |
Mult | Vec<f64> | f64 | Product of all input values |
Echo | Vec<u8> | Vec<u8> | Returns input unchanged |
Each method implements the RpcMethodPrebuffered trait, which provides:
- Compile-time METHOD_ID : Generated by hashing the method name with
xxhash-rust - encode_request/decode_request : Serialization logic for parameters using
bitcode - encode_response/decode_response : Serialization logic for return values using
bitcode
Sources : README.md:70-73
Server Implementation
The server setup involves creating an RpcServer instance, registering method handlers, and spawning the server task.
sequenceDiagram
participant Main as "main()"
participant Listener as "TcpListener"
participant Server as "RpcServer"
participant Endpoint as "endpoint()"
participant Task as "Server Task"
Main->>Listener: TcpListener::bind("127.0.0.1:0")
Listener-->>Main: Random port assigned
Main->>Server: RpcServer::new(None)
Main->>Server: Arc::new(server)
Main->>Endpoint: server.endpoint()
Main->>Endpoint: register_prebuffered(Add::METHOD_ID, handler)
Main->>Endpoint: register_prebuffered(Mult::METHOD_ID, handler)
Main->>Endpoint: register_prebuffered(Echo::METHOD_ID, handler)
Main->>Task: tokio::spawn(server.serve_with_listener)
Task->>Server: Begin accepting connections
Server Setup Sequence
Sources : README.md:86-128
Handler Registration
Handlers are registered using the register_prebuffered method from RpcServiceEndpointInterface. Each handler receives:
request_bytes: Vec<u8>- The serialized request parameters_ctx- Request context (unused in this example)
The handler pattern follows these steps:
- Decode Request : README.md102 -
Add::decode_request(&request_bytes)? - Compute Result : README.md103 -
let sum = request_params.iter().sum() - Encode Response : README.md104 -
Add::encode_response(sum)? - Return Result : README.md105 -
Ok(response_bytes)
Sources : README.md:101-117
Server Configuration Details
| Configuration | Value | Code Reference |
|---|---|---|
| Bind address | "127.0.0.1:0" | README.md87 |
| Port selection | Random available port | README.md87 |
| Server options | None (defaults) | README.md94 |
| Arc wrapping | Arc::new(RpcServer::new(None)) | README.md94 |
The server is wrapped in Arc to enable sharing across multiple tasks. The endpoint handle is obtained via README.md97 server.endpoint() and used for handler registration.
Sources : README.md:86-94
sequenceDiagram
participant Main as "main()"
participant Sleep as "tokio::time::sleep"
participant Client as "RpcClient"
participant Handler as "State Change Handler"
participant Methods as "RPC Methods"
Main->>Sleep: sleep(200ms)
Note over Sleep: Wait for server startup
Main->>Client: RpcClient::new(host, port)
Client-->>Main: Connected client
Main->>Client: set_state_change_handler(callback)
Client->>Handler: Register callback
Main->>Methods: Add::call(&client, params)
Main->>Methods: Mult::call(&client, params)
Main->>Methods: Echo::call(&client, params)
Note over Methods: All calls execute concurrently
Methods-->>Main: Results returned via join!
Client Implementation
The client connects to the server, sets up state monitoring, and performs concurrent RPC calls.
Client Connection Flow
Sources : README.md:130-160
State Change Monitoring
The client sets a state change handler at README.md:138-141 to monitor connection lifecycle events:
The handler receives RpcTransportState enum values indicating connection status. See Transport State Management for details on available states.
Sources : README.md:138-141
Concurrent RPC Execution
The example demonstrates concurrent request handling using Tokio's join! macro at README.md:144-151 Six RPC calls execute simultaneously over a single WebSocket connection:
| Call | Method | Parameters | Expected Result |
|---|---|---|---|
| res1 | Add::call | [1.0, 2.0, 3.0] | 6.0 |
| res2 | Add::call | [8.0, 3.0, 7.0] | 18.0 |
| res3 | Mult::call | [8.0, 3.0, 7.0] | 168.0 |
| res4 | Mult::call | [1.5, 2.5, 8.5] | 31.875 |
| res5 | Echo::call | b"testing 1 2 3" | b"testing 1 2 3" |
| res6 | Echo::call | b"testing 4 5 6" | b"testing 4 5 6" |
All requests are multiplexed over the single WebSocket connection, with the RpcDispatcher handling request correlation and response routing.
Sources : README.md:144-158
sequenceDiagram
participant App as "Application\nmain()"
participant AddCall as "Add::call()"
participant Client as "RpcClient"
participant Dispatcher as "RpcDispatcher"
participant WS as "WebSocket\nConnection"
participant ServerDisp as "Server RpcDispatcher"
participant Endpoint as "Endpoint"
participant Handler as "Add Handler"
App->>AddCall: Add::call(&client, [1.0, 2.0, 3.0])
AddCall->>AddCall: Add::encode_request(params)
AddCall->>Client: call_prebuffered(METHOD_ID, request_bytes)
Client->>Dispatcher: dispatch_request(METHOD_ID, bytes)
Note over Dispatcher: Assign unique request_id\nStore pending request
Dispatcher->>WS: Binary frames with METHOD_ID
WS->>ServerDisp: Receive binary frames
ServerDisp->>Endpoint: Route by METHOD_ID
Endpoint->>Handler: Invoke registered handler
Handler->>Handler: Add::decode_request(bytes)
Handler->>Handler: let sum = params.iter().sum()
Handler->>Handler: Add::encode_response(sum)
Handler-->>Endpoint: response_bytes
Endpoint-->>ServerDisp: response_bytes
ServerDisp->>WS: Binary frames with request_id
WS->>Dispatcher: Receive response frames
Note over Dispatcher: Match by request_id\nResolve pending future
Dispatcher-->>Client: response_bytes
Client-->>AddCall: response_bytes
AddCall->>AddCall: Add::decode_response(bytes)
AddCall-->>App: Result<f64>
Request/Response Flow
This diagram shows the complete path of a single RPC call through the system layers:
Sources : README.md:69-161
Running the Example
The example application requires the following dependencies in Cargo.toml:
Execution Flow
- Initialize : README.md84 -
tracing_subscriber::fmt().with_env_filter("info").init() - Bind Port : README.md87 -
TcpListener::bind("127.0.0.1:0").await.unwrap() - Create Server : README.md94 -
Arc::new(RpcServer::new(None)) - Register Handlers : README.md:100-118 - Register
Add,Mult,Echohandlers - Spawn Server : README.md:121-127 -
tokio::spawn(server.serve_with_listener(listener)) - Wait for Startup : README.md133 -
tokio::time::sleep(Duration::from_millis(200)) - Connect Client : README.md136 -
RpcClient::new(&host, port).await.unwrap() - Set Handler : README.md:138-141 -
set_state_change_handler(callback) - Make Calls : README.md:144-151 - Concurrent RPC invocations via
join! - Verify Results : README.md:153-158 - Assert expected values
Sources : README.md:82-161
Key Implementation Details
Server-Side Handler Closure Signature
Each handler registered via register_prebuffered has the signature:
|request_bytes: Vec<u8>, _ctx| async move { ... }
The handler must:
- Accept
request_bytes: Vec<u8>and context - Return
Result<Vec<u8>, RpcServiceError> - Be
asyncandmoveto capture necessary data
Sources : README.md:101-117
Client-Side Method Invocation
Each method provides a static call function with this pattern:
Method::call(&*rpc_client, params)
- Takes a reference to any type implementing
RpcServiceCallerInterface - Accepts typed parameters (e.g.,
Vec<f64>) - Returns
Result<T, RpcServiceError>with the typed response
The &* dereference at README.md:145-150 is required because rpc_client is of type RpcClient, and the trait bound requires &dyn RpcServiceCallerInterface.
Sources : README.md:144-151
Connection Lifecycle
The application demonstrates automatic connection management:
- Server Spawn : Server task runs independently in background
- Client Connect : Client establishes WebSocket connection
- State Tracking : Callback logs all state changes
- Request Processing : Multiple concurrent requests handled
- Implicit Cleanup : Server and client dropped when
main()exits
No explicit shutdown code is needed; Tokio handles task cancellation when the runtime stops.
Sources : README.md:82-160
Related Examples
For additional examples and tutorials:
- Simple Calculator Service : #9.2 - Step-by-step tutorial building from scratch
- Cross-Platform Deployment : #10.1 - Deploying to native and WASM targets
- JavaScript/WASM Integration : #10.4 - Using WASM clients with JavaScript
Sources : README.md:64-161
Dismiss
Refresh this wiki
Enter email to refresh