Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Examples and Tutorials

Loading…

Examples and Tutorials

Relevant source files

This section provides practical, hands-on examples demonstrating how to build applications using the muxio framework. The tutorials progress from understanding the complete example application to creating your own service definitions and implementations.

For information about the underlying architecture and design principles, see Core Concepts. For detailed documentation on specific platform implementations, see Platform Implementations.


Available Examples

The muxio repository includes a complete working example demonstrating the end-to-end process of building an RPC application:

Example CratePurposeLocation
example-muxio-rpc-service-definitionShared service contract definitionsexamples/example-muxio-rpc-service-definition/
example-muxio-ws-rpc-appComplete WebSocket RPC applicationexamples/example-muxio-ws-rpc-app/

The example demonstrates:

  • Defining shared service contracts using RpcMethodPrebuffered
  • Setting up a Tokio-based RPC server with WebSocket transport
  • Creating an RPC client and making concurrent calls
  • Proper error handling and state management
  • Connection lifecycle management

Sources : Cargo.lock:426-449 README.md:64-162


Example Application Architecture

The following diagram shows how the example application components relate to each other and to the muxio framework:

Sources : README.md:70-82 Cargo.lock:426-449

graph TB
    subgraph "Shared Service Definition"
        DEF["example-muxio-rpc-service-definition"]
ADD["Add::METHOD_ID\nRpcMethodPrebuffered"]
MULT["Mult::METHOD_ID\nRpcMethodPrebuffered"]
ECHO["Echo::METHOD_ID\nRpcMethodPrebuffered"]
DEF --> ADD
 
       DEF --> MULT
 
       DEF --> ECHO
    end
    
    subgraph "Server Side (example-muxio-ws-rpc-app)"
        SERVER["RpcServer::new()"]
ENDPOINT["server.endpoint()"]
HANDLERS["endpoint.register_prebuffered()"]
LISTENER["TcpListener::bind()"]
SERVER --> ENDPOINT
 
       ENDPOINT --> HANDLERS
 
       SERVER --> LISTENER
    end
    
    subgraph "Client Side (example-muxio-ws-rpc-app)"
        CLIENT["RpcClient::new()"]
CALLS["Add::call()\nMult::call()\nEcho::call()"]
STATE["set_state_change_handler()"]
CLIENT --> CALLS
 
       CLIENT --> STATE
    end
    
    subgraph "Muxio Framework"
        CALLER_IF["RpcServiceCallerInterface"]
ENDPOINT_IF["RpcServiceEndpointInterface"]
DISPATCHER["RpcDispatcher"]
CALLER_IF --> DISPATCHER
 
       ENDPOINT_IF --> DISPATCHER
    end
    
 
   DEF -.->|imported by| HANDLERS
 
   DEF -.->|imported by| CALLS
    
 
   HANDLERS -->|implements| ENDPOINT_IF
 
   CALLS -->|uses| CALLER_IF
    
 
   CLIENT <-.->|WebSocket frames| SERVER

Project Structure for an Example Application

When building a muxio application, the typical project structure separates concerns into distinct crates:

graph LR
    subgraph "Workspace Root"
        CARGO["Cargo.toml\n[workspace]"]
end
    
    subgraph "Service Definition Crate"
        DEF_CARGO["service-definition/Cargo.toml"]
DEF_LIB["service-definition/src/lib.rs"]
DEF_PREBUF["RpcMethodPrebuffered impls"]
DEF_CARGO --> DEF_LIB
 
       DEF_LIB --> DEF_PREBUF
    end
    
    subgraph "Application Crate"
        APP_CARGO["app/Cargo.toml"]
APP_MAIN["app/src/main.rs"]
SERVER_CODE["Server setup + handlers"]
CLIENT_CODE["Client setup + calls"]
APP_CARGO --> APP_MAIN
 
       APP_MAIN --> SERVER_CODE
 
       APP_MAIN --> CLIENT_CODE
    end
    
 
   CARGO -->|members| DEF_CARGO
 
   CARGO -->|members| APP_CARGO
    
 
   APP_CARGO -.->|depends on| DEF_CARGO
    
 
   DEF_CARGO -.->|muxio-rpc-service| MUXIO_RPC["muxio-rpc-service"]
APP_CARGO -.->|muxio-tokio-rpc-server| TOKIO_SERVER["muxio-tokio-rpc-server"]
APP_CARGO -.->|muxio-tokio-rpc-client| TOKIO_CLIENT["muxio-tokio-rpc-client"]

This structure ensures:

  • Service definitions are shared between client and server (compile-time type safety)
  • Application code depends on the definition crate
  • Changes to the service contract require recompilation of both sides

Sources : Cargo.lock:426-449 README.md:70-74


Understanding the Request-Response Flow

The following sequence diagram traces a complete RPC call through the example application:

Sources : README.md:92-162

sequenceDiagram
    participant Main as "main() function"
    participant Client as "RpcClient"
    participant AddTrait as "Add::call()"
    participant Caller as "RpcServiceCallerInterface"
    participant WS as "WebSocket Transport"
    participant Server as "RpcServer"
    participant Endpoint as "RpcServiceEndpointInterface"
    participant Handler as "register_prebuffered handler"
    
    Note over Main,Handler: Server Setup Phase
    Main->>Server: RpcServer::new()
    Main->>Server: server.endpoint()
    Server->>Endpoint: returns endpoint handle
    Main->>Endpoint: endpoint.register_prebuffered(Add::METHOD_ID, handler)
    Endpoint->>Handler: stores async handler
    Main->>Server: server.serve_with_listener(listener)
    
    Note over Main,Handler: Client Connection Phase
    Main->>Client: RpcClient::new(host, port)
    Client->>WS: WebSocket connection established
    
    Note over Main,Handler: RPC Call Phase
    Main->>AddTrait: Add::call(&client, vec![1.0, 2.0, 3.0])
    AddTrait->>AddTrait: Add::encode_request(params)
    AddTrait->>Caller: client.call_prebuffered(METHOD_ID, encoded_bytes)
    Caller->>WS: Binary frames with stream_id, method_id
    
    WS->>Server: Receive frames
    Server->>Endpoint: Dispatch to registered handler
    Endpoint->>Handler: invoke with request_bytes, ctx
    Handler->>Handler: Add::decode_request(&request_bytes)
    Handler->>Handler: let sum = request_params.iter().sum()
    Handler->>Handler: Add::encode_response(sum)
    Handler->>Endpoint: Ok(response_bytes)
    Endpoint->>WS: Binary frames with result
    
    WS->>Caller: Receive frames
    Caller->>AddTrait: response_bytes
    AddTrait->>AddTrait: Add::decode_response(&response_bytes)
    AddTrait->>Main: Ok(6.0)

Detailed Code Flow Through the Example

The following table maps natural language steps to specific code locations:

StepDescriptionCode Location
1. Bind server socketCreate TCP listener on random portREADME.md88
2. Create server instanceInstantiate RpcServer wrapped in ArcREADME.md95
3. Get endpoint handleRetrieve endpoint for handler registrationREADME.md98
4. Register Add handlerRegister async handler for Add::METHOD_IDREADME.md:102-107
5. Register Mult handlerRegister async handler for Mult::METHOD_IDREADME.md:108-113
6. Register Echo handlerRegister async handler for Echo::METHOD_IDREADME.md:114-118
7. Spawn server taskStart server with serve_with_listener()README.md126
8. Create clientConnect to server via RpcClient::new()README.md137
9. Set state handlerRegister callback for connection state changesREADME.md:139-142
10. Make concurrent callsUse join! macro to await multiple RPC callsREADME.md:145-152
11. Verify resultsAssert response values match expected resultsREADME.md:154-159

Sources : README.md:83-161


Service Definition Pattern

The service definition crate defines the contract between client and server. Each RPC method is implemented as a unit struct that implements the RpcMethodPrebuffered trait:

graph TB
    subgraph "Service Definition Structure"
        TRAIT["RpcMethodPrebuffered trait"]
ADD_STRUCT["pub struct Add"]
ADD_IMPL["impl RpcMethodPrebuffered for Add"]
ADD_METHOD_ID["const METHOD_ID: u64"]
ADD_REQUEST["type Request = Vec<f64>"]
ADD_RESPONSE["type Response = f64"]
MULT_STRUCT["pub struct Mult"]
MULT_IMPL["impl RpcMethodPrebuffered for Mult"]
ECHO_STRUCT["pub struct Echo"]
ECHO_IMPL["impl RpcMethodPrebuffered for Echo"]
TRAIT -.->|implemented by| ADD_IMPL
 
       TRAIT -.->|implemented by| MULT_IMPL
 
       TRAIT -.->|implemented by| ECHO_IMPL
        
 
       ADD_STRUCT --> ADD_IMPL
 
       ADD_IMPL --> ADD_METHOD_ID
 
       ADD_IMPL --> ADD_REQUEST
 
       ADD_IMPL --> ADD_RESPONSE
        
 
       MULT_STRUCT --> MULT_IMPL
 
       ECHO_STRUCT --> ECHO_IMPL
    end
    
    subgraph "Generated by Trait"
        ENCODE_REQ["encode_request()"]
DECODE_REQ["decode_request()"]
ENCODE_RESP["encode_response()"]
DECODE_RESP["decode_response()"]
CALL_METHOD["call()
async fn"]
TRAIT -.->|provides| ENCODE_REQ
 
       TRAIT -.->|provides| DECODE_REQ
 
       TRAIT -.->|provides| ENCODE_RESP
 
       TRAIT -.->|provides| DECODE_RESP
 
       TRAIT -.->|provides| CALL_METHOD
    end
    
    subgraph "Serialization"
        BITCODE["bitcode::encode/decode"]
ENCODE_REQ --> BITCODE
 
       DECODE_REQ --> BITCODE
 
       ENCODE_RESP --> BITCODE
 
       DECODE_RESP --> BITCODE
    end
    
    subgraph "Method ID Generation"
        XXHASH["xxhash-rust"]
METHOD_NAME["Method name string"]
ADD_METHOD_ID -.->|hash of| METHOD_NAME
 
       METHOD_NAME --> XXHASH
    end

The RpcMethodPrebuffered trait provides default implementations for encoding/decoding using bitcode serialization and a call() method that works with any RpcServiceCallerInterface implementation.

Sources : README.md:71-74 Cargo.lock:426-431


Handler Registration Pattern

Server-side handlers are registered on the endpoint using the register_prebuffered() method. Each handler is an async closure that:

  1. Receives raw request bytes and a context object
  2. Decodes the request using the service definition’s decode_request() method
  3. Performs the business logic
  4. Encodes the response using encode_response()
  5. Returns Result<Vec<u8>, RpcServiceError>

Sources : README.md:101-119

graph LR
    subgraph "Handler Registration Flow"
        ENDPOINT["endpoint.register_prebuffered()"]
METHOD_ID["Add::METHOD_ID"]
CLOSURE["async closure"]
ENDPOINT -->|key| METHOD_ID
 
       ENDPOINT -->|value| CLOSURE
    end
    
    subgraph "Handler Execution Flow"
        REQ_BYTES["request_bytes: Vec<u8>"]
CTX["_ctx: Arc<RpcContext>"]
DECODE["Add::decode_request(&request_bytes)"]
LOGIC["Business logic: iter().sum()"]
ENCODE["Add::encode_response(sum)"]
RESULT["Ok(response_bytes)"]
REQ_BYTES --> DECODE
 
       CTX -.->|available but unused| LOGIC
 
       DECODE --> LOGIC
 
       LOGIC --> ENCODE
 
       ENCODE --> RESULT
    end
    
 
   CLOSURE -.->|contains| DECODE

graph TB
    subgraph "Client Call Flow"
        APP_CODE["Application code"]
CALL_METHOD["Add::call(&client, vec![1.0, 2.0, 3.0])"]
APP_CODE --> CALL_METHOD
    end
    
    subgraph "Inside call()
implementation"
        ENCODE["encode_request(params)"]
CALL_PREBUF["client.call_prebuffered(METHOD_ID, bytes)"]
AWAIT["await response"]
DECODE["decode_response(&response_bytes)"]
RETURN["Ok(Response)"]
CALL_METHOD --> ENCODE
 
       ENCODE --> CALL_PREBUF
 
       CALL_PREBUF --> AWAIT
 
       AWAIT --> DECODE
 
       DECODE --> RETURN
    end
    
    subgraph "Client Implementation"
        CLIENT["RpcClient\n(implements RpcServiceCallerInterface)"]
DISPATCHER["RpcDispatcher"]
SESSION["RpcSession"]
CALL_PREBUF -.->|delegates to| CLIENT
 
       CLIENT --> DISPATCHER
 
       DISPATCHER --> SESSION
    end
    
 
   RETURN --> APP_CODE

Client Call Pattern

Client-side calls use the service definition’s call() method, which is automatically provided by the RpcMethodPrebuffered trait:

The call() method handles all serialization, transport, and deserialization automatically. Application code works with typed Rust structs, never touching raw bytes.

Sources : README.md:145-152


graph TB
    subgraph "Concurrent Calls with join!"
        JOIN["join!()
macro"]
CALL1["Add::call(&client, vec![1.0, 2.0, 3.0])"]
CALL2["Add::call(&client, vec![8.0, 3.0, 7.0])"]
CALL3["Mult::call(&client, vec![8.0, 3.0, 7.0])"]
CALL4["Mult::call(&client, vec![1.5, 2.5, 8.5])"]
CALL5["Echo::call(&client, b\"testing 1 2 3\")"]
CALL6["Echo::call(&client, b\"testing 4 5 6\")"]
JOIN --> CALL1
 
       JOIN --> CALL2
 
       JOIN --> CALL3
 
       JOIN --> CALL4
 
       JOIN --> CALL5
 
       JOIN --> CALL6
    end
    
    subgraph "Multiplexing Layer"
        SESSION["RpcSession"]
STREAM1["Stream ID: 1"]
STREAM2["Stream ID: 2"]
STREAM3["Stream ID: 3"]
STREAM4["Stream ID: 4"]
STREAM5["Stream ID: 5"]
STREAM6["Stream ID: 6"]
SESSION --> STREAM1
 
       SESSION --> STREAM2
 
       SESSION --> STREAM3
 
       SESSION --> STREAM4
 
       SESSION --> STREAM5
 
       SESSION --> STREAM6
    end
    
    subgraph "Single WebSocket Connection"
        WS["Binary frames interleaved\nover single connection"]
end
    
 
   CALL1 -.->|assigned| STREAM1
 
   CALL2 -.->|assigned| STREAM2
 
   CALL3 -.->|assigned| STREAM3
 
   CALL4 -.->|assigned| STREAM4
 
   CALL5 -.->|assigned| STREAM5
 
   CALL6 -.->|assigned| STREAM6
    
 
   SESSION --> WS
    
    subgraph "Result Tuple"
        RESULTS["(res1, res2, res3, res4, res5, res6)"]
end
    
 
   JOIN --> RESULTS

Concurrent Request Handling

The example demonstrates concurrent request handling using Tokio’s join! macro:

Each concurrent call is assigned a unique stream ID, allowing frames to be interleaved over the single WebSocket connection. The join! macro waits for all responses before proceeding.

Sources : README.md:145-152


State Change Handling

The client supports registering a state change handler to track connection lifecycle:

StateDescriptionTypical Response
ConnectingInitial connection attemptLog connection start
ConnectedWebSocket establishedEnable UI, start heartbeat
DisconnectedConnection lostDisable UI, attempt reconnect
ErrorConnection error occurredLog error, notify user

The handler is registered using set_state_change_handler():

This callback-driven pattern enables reactive behavior without blocking the main application logic.

Sources : README.md:139-142


graph LR
    subgraph "Handler Error Flow"
        HANDLER["Handler function"]
DECODE_ERR["decode_request()
fails"]
LOGIC_ERR["Business logic fails"]
ENCODE_ERR["encode_response()
fails"]
HANDLER --> DECODE_ERR
 
       HANDLER --> LOGIC_ERR
 
       HANDLER --> ENCODE_ERR
    end
    
    subgraph "Error Propagation"
        RPC_ERR["Err(RpcServiceError)"]
ENDPOINT["RpcServiceEndpointInterface"]
DISPATCHER["RpcDispatcher"]
TRANSPORT["Binary error frame"]
DECODE_ERR --> RPC_ERR
 
       LOGIC_ERR --> RPC_ERR
 
       ENCODE_ERR --> RPC_ERR
        
 
       RPC_ERR --> ENDPOINT
 
       ENDPOINT --> DISPATCHER
 
       DISPATCHER --> TRANSPORT
    end
    
    subgraph "Client Side"
        CLIENT_CALL["call()
method"]
RESULT["Err(RpcServiceError)"]
APP["Application code"]
TRANSPORT --> CLIENT_CALL
 
       CLIENT_CALL --> RESULT
 
       RESULT --> APP
    end

Error Handling in Handlers

Handlers return Result<Vec<u8>, RpcServiceError>. The framework automatically propagates errors to the client:

Common error scenarios:

  • Malformed request bytes (deserialization failure)
  • Business logic errors (e.g., division by zero, validation failure)
  • Resource errors (e.g., database unavailable)

All errors are serialized and transmitted back to the client as part of the RPC protocol.

Sources : README.md:102-118


Running the Example

To run the complete example application:

  1. Clone the repository
  2. Navigate to the workspace root
  3. Execute the example:

Expected output includes:

  • Server binding to random port
  • Handler registration confirmations
  • Client connection establishment
  • State change callback invocations
  • Successful assertion of all RPC results

The example demonstrates a complete lifecycle:

  • Server starts and binds to a port
  • Handlers are registered for three methods
  • Client connects via WebSocket
  • Six concurrent RPC calls are made
  • All responses are verified
  • The application exits cleanly

Sources : README.md:64-162


Best Practices from the Example

The example application demonstrates several important patterns:

PatternImplementationBenefit
Arc-wrapped serverArc::new(RpcServer::new(None))Safe sharing across async tasks
Random port bindingTcpListener::bind("127.0.0.1:0")Avoids port conflicts in testing
Concurrent registrationjoin!() for handler registrationParallel setup reduces startup time
Async handler closuresasync move { ... }Enables async business logic
Destructured joinslet (res1, res2, ...) = join!(...)Clear result assignment
State change callbacksset_state_change_handler()Reactive connection management

These patterns can be adapted for production applications with appropriate error handling, logging, and configuration management.

Sources : README.md:83-161