Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

WebSocket RPC Application

Relevant source files

This page provides a detailed walkthrough of the example-muxio-ws-rpc-app demonstration, which showcases a complete WebSocket-based RPC application using the Muxio framework. This example illustrates how to create a server with registered RPC handlers, connect a client, perform concurrent RPC calls, and gracefully shut down.

Scope : This page focuses specifically on the example application structure and implementation. For information about creating custom service definitions, see Creating Service Definitions. For details about the Tokio server implementation, see Tokio RPC Server. For details about the Tokio client implementation, see Tokio RPC Client.

Overview

The WebSocket RPC application demonstrates the complete lifecycle of a Muxio-based RPC service:

  1. Server Setup : Binds to a random TCP port and registers RPC method handlers
  2. Client Connection : Connects to the server via WebSocket
  3. Concurrent RPC Execution : Makes multiple simultaneous RPC calls over a single connection
  4. State Management : Monitors connection state changes via callbacks
  5. Request/Response Verification : Validates that all responses match expected values

Sources : README.md:64-161

Application Structure

The example application is organized into two main workspace crates:

CratePurposeKey Components
example-muxio-ws-rpc-appApplication executableServer setup, client execution, main loop
example-muxio-rpc-service-definitionShared service contractAdd, Mult, Echo method definitions

The shared service definition crate ensures compile-time type safety between client and server by providing a single source of truth for RPC method signatures, parameter types, and return types.

Sources : README.md:67-73

graph LR
    subgraph "Shared Contract"
        SERVICE_DEF["example-muxio-rpc-service-definition\nAdd, Mult, Echo"]
end
    
    subgraph "Server Side"
        SERVER_IMPL["Server Implementation\nDecode → Compute → Encode"]
end
    
    subgraph "Client Side"
        CLIENT_IMPL["Client Implementation\nEncode → Send → Decode"]
end
    
 
   SERVICE_DEF -->|Defines API| SERVER_IMPL
 
   SERVICE_DEF -->|Defines API| CLIENT_IMPL
    
 
   CLIENT_IMPL -->|WebSocket Binary Frames| SERVER_IMPL

Service Definitions

The example uses three prebuffered RPC methods, all defined in the example-muxio-rpc-service-definition crate:

Method Inventory

MethodInput TypeOutput TypeOperation
AddVec<f64>f64Sum of all input values
MultVec<f64>f64Product of all input values
EchoVec<u8>Vec<u8>Returns input unchanged

Each method implements the RpcMethodPrebuffered trait, which provides:

  • Compile-time METHOD_ID : Generated by hashing the method name with xxhash-rust
  • encode_request/decode_request : Serialization logic for parameters using bitcode
  • encode_response/decode_response : Serialization logic for return values using bitcode

Sources : README.md:70-73

Server Implementation

The server setup involves creating an RpcServer instance, registering method handlers, and spawning the server task.

sequenceDiagram
    participant Main as "main()"
    participant Listener as "TcpListener"
    participant Server as "RpcServer"
    participant Endpoint as "endpoint()"
    participant Task as "Server Task"
    
    Main->>Listener: TcpListener::bind("127.0.0.1:0")
    Listener-->>Main: Random port assigned
    
    Main->>Server: RpcServer::new(None)
    Main->>Server: Arc::new(server)
    
    Main->>Endpoint: server.endpoint()
    Main->>Endpoint: register_prebuffered(Add::METHOD_ID, handler)
    Main->>Endpoint: register_prebuffered(Mult::METHOD_ID, handler)
    Main->>Endpoint: register_prebuffered(Echo::METHOD_ID, handler)
    
    Main->>Task: tokio::spawn(server.serve_with_listener)
    Task->>Server: Begin accepting connections

Server Setup Sequence

Sources : README.md:86-128

Handler Registration

Handlers are registered using the register_prebuffered method from RpcServiceEndpointInterface. Each handler receives:

  • request_bytes: Vec<u8> - The serialized request parameters
  • _ctx - Request context (unused in this example)

The handler pattern follows these steps:

  1. Decode Request : README.md102 - Add::decode_request(&request_bytes)?
  2. Compute Result : README.md103 - let sum = request_params.iter().sum()
  3. Encode Response : README.md104 - Add::encode_response(sum)?
  4. Return Result : README.md105 - Ok(response_bytes)

Sources : README.md:101-117

Server Configuration Details

ConfigurationValueCode Reference
Bind address"127.0.0.1:0"README.md87
Port selectionRandom available portREADME.md87
Server optionsNone (defaults)README.md94
Arc wrappingArc::new(RpcServer::new(None))README.md94

The server is wrapped in Arc to enable sharing across multiple tasks. The endpoint handle is obtained via README.md97 server.endpoint() and used for handler registration.

Sources : README.md:86-94

sequenceDiagram
    participant Main as "main()"
    participant Sleep as "tokio::time::sleep"
    participant Client as "RpcClient"
    participant Handler as "State Change Handler"
    participant Methods as "RPC Methods"
    
    Main->>Sleep: sleep(200ms)
    Note over Sleep: Wait for server startup
    
    Main->>Client: RpcClient::new(host, port)
    Client-->>Main: Connected client
    
    Main->>Client: set_state_change_handler(callback)
    Client->>Handler: Register callback
    
    Main->>Methods: Add::call(&client, params)
    Main->>Methods: Mult::call(&client, params)
    Main->>Methods: Echo::call(&client, params)
    
    Note over Methods: All calls execute concurrently
    
    Methods-->>Main: Results returned via join!

Client Implementation

The client connects to the server, sets up state monitoring, and performs concurrent RPC calls.

Client Connection Flow

Sources : README.md:130-160

State Change Monitoring

The client sets a state change handler at README.md:138-141 to monitor connection lifecycle events:

The handler receives RpcTransportState enum values indicating connection status. See Transport State Management for details on available states.

Sources : README.md:138-141

Concurrent RPC Execution

The example demonstrates concurrent request handling using Tokio's join! macro at README.md:144-151 Six RPC calls execute simultaneously over a single WebSocket connection:

CallMethodParametersExpected Result
res1Add::call[1.0, 2.0, 3.0]6.0
res2Add::call[8.0, 3.0, 7.0]18.0
res3Mult::call[8.0, 3.0, 7.0]168.0
res4Mult::call[1.5, 2.5, 8.5]31.875
res5Echo::callb"testing 1 2 3"b"testing 1 2 3"
res6Echo::callb"testing 4 5 6"b"testing 4 5 6"

All requests are multiplexed over the single WebSocket connection, with the RpcDispatcher handling request correlation and response routing.

Sources : README.md:144-158

sequenceDiagram
    participant App as "Application\nmain()"
    participant AddCall as "Add::call()"
    participant Client as "RpcClient"
    participant Dispatcher as "RpcDispatcher"
    participant WS as "WebSocket\nConnection"
    participant ServerDisp as "Server RpcDispatcher"
    participant Endpoint as "Endpoint"
    participant Handler as "Add Handler"
    
    App->>AddCall: Add::call(&client, [1.0, 2.0, 3.0])
    AddCall->>AddCall: Add::encode_request(params)
    AddCall->>Client: call_prebuffered(METHOD_ID, request_bytes)
    Client->>Dispatcher: dispatch_request(METHOD_ID, bytes)
    
    Note over Dispatcher: Assign unique request_id\nStore pending request
    
    Dispatcher->>WS: Binary frames with METHOD_ID
    
    WS->>ServerDisp: Receive binary frames
    ServerDisp->>Endpoint: Route by METHOD_ID
    Endpoint->>Handler: Invoke registered handler
    
    Handler->>Handler: Add::decode_request(bytes)
    Handler->>Handler: let sum = params.iter().sum()
    Handler->>Handler: Add::encode_response(sum)
    
    Handler-->>Endpoint: response_bytes
    Endpoint-->>ServerDisp: response_bytes
    ServerDisp->>WS: Binary frames with request_id
    
    WS->>Dispatcher: Receive response frames
    
    Note over Dispatcher: Match by request_id\nResolve pending future
    
    Dispatcher-->>Client: response_bytes
    Client-->>AddCall: response_bytes
    AddCall->>AddCall: Add::decode_response(bytes)
    AddCall-->>App: Result&lt;f64&gt;

Request/Response Flow

This diagram shows the complete path of a single RPC call through the system layers:

Sources : README.md:69-161

Running the Example

The example application requires the following dependencies in Cargo.toml:

Execution Flow

  1. Initialize : README.md84 - tracing_subscriber::fmt().with_env_filter("info").init()
  2. Bind Port : README.md87 - TcpListener::bind("127.0.0.1:0").await.unwrap()
  3. Create Server : README.md94 - Arc::new(RpcServer::new(None))
  4. Register Handlers : README.md:100-118 - Register Add, Mult, Echo handlers
  5. Spawn Server : README.md:121-127 - tokio::spawn(server.serve_with_listener(listener))
  6. Wait for Startup : README.md133 - tokio::time::sleep(Duration::from_millis(200))
  7. Connect Client : README.md136 - RpcClient::new(&host, port).await.unwrap()
  8. Set Handler : README.md:138-141 - set_state_change_handler(callback)
  9. Make Calls : README.md:144-151 - Concurrent RPC invocations via join!
  10. Verify Results : README.md:153-158 - Assert expected values

Sources : README.md:82-161

Key Implementation Details

Server-Side Handler Closure Signature

Each handler registered via register_prebuffered has the signature:

|request_bytes: Vec<u8>, _ctx| async move { ... }

The handler must:

  • Accept request_bytes: Vec<u8> and context
  • Return Result<Vec<u8>, RpcServiceError>
  • Be async and move to capture necessary data

Sources : README.md:101-117

Client-Side Method Invocation

Each method provides a static call function with this pattern:

Method::call(&*rpc_client, params)
  • Takes a reference to any type implementing RpcServiceCallerInterface
  • Accepts typed parameters (e.g., Vec<f64>)
  • Returns Result<T, RpcServiceError> with the typed response

The &* dereference at README.md:145-150 is required because rpc_client is of type RpcClient, and the trait bound requires &dyn RpcServiceCallerInterface.

Sources : README.md:144-151

Connection Lifecycle

The application demonstrates automatic connection management:

  1. Server Spawn : Server task runs independently in background
  2. Client Connect : Client establishes WebSocket connection
  3. State Tracking : Callback logs all state changes
  4. Request Processing : Multiple concurrent requests handled
  5. Implicit Cleanup : Server and client dropped when main() exits

No explicit shutdown code is needed; Tokio handles task cancellation when the runtime stops.

Sources : README.md:82-160

For additional examples and tutorials:

  • Simple Calculator Service : #9.2 - Step-by-step tutorial building from scratch
  • Cross-Platform Deployment : #10.1 - Deploying to native and WASM targets
  • JavaScript/WASM Integration : #10.4 - Using WASM clients with JavaScript

Sources : README.md:64-161

Dismiss

Refresh this wiki

Enter email to refresh