This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Examples and Tutorials
Loading…
Examples and Tutorials
Relevant source files
This section provides practical, hands-on examples demonstrating how to build applications using the muxio framework. The tutorials progress from understanding the complete example application to creating your own service definitions and implementations.
For information about the underlying architecture and design principles, see Core Concepts. For detailed documentation on specific platform implementations, see Platform Implementations.
Available Examples
The muxio repository includes a complete working example demonstrating the end-to-end process of building an RPC application:
| Example Crate | Purpose | Location |
|---|---|---|
example-muxio-rpc-service-definition | Shared service contract definitions | examples/example-muxio-rpc-service-definition/ |
example-muxio-ws-rpc-app | Complete WebSocket RPC application | examples/example-muxio-ws-rpc-app/ |
The example demonstrates:
- Defining shared service contracts using
RpcMethodPrebuffered - Setting up a Tokio-based RPC server with WebSocket transport
- Creating an RPC client and making concurrent calls
- Proper error handling and state management
- Connection lifecycle management
Sources : Cargo.lock:426-449 README.md:64-162
Example Application Architecture
The following diagram shows how the example application components relate to each other and to the muxio framework:
Sources : README.md:70-82 Cargo.lock:426-449
graph TB
subgraph "Shared Service Definition"
DEF["example-muxio-rpc-service-definition"]
ADD["Add::METHOD_ID\nRpcMethodPrebuffered"]
MULT["Mult::METHOD_ID\nRpcMethodPrebuffered"]
ECHO["Echo::METHOD_ID\nRpcMethodPrebuffered"]
DEF --> ADD
DEF --> MULT
DEF --> ECHO
end
subgraph "Server Side (example-muxio-ws-rpc-app)"
SERVER["RpcServer::new()"]
ENDPOINT["server.endpoint()"]
HANDLERS["endpoint.register_prebuffered()"]
LISTENER["TcpListener::bind()"]
SERVER --> ENDPOINT
ENDPOINT --> HANDLERS
SERVER --> LISTENER
end
subgraph "Client Side (example-muxio-ws-rpc-app)"
CLIENT["RpcClient::new()"]
CALLS["Add::call()\nMult::call()\nEcho::call()"]
STATE["set_state_change_handler()"]
CLIENT --> CALLS
CLIENT --> STATE
end
subgraph "Muxio Framework"
CALLER_IF["RpcServiceCallerInterface"]
ENDPOINT_IF["RpcServiceEndpointInterface"]
DISPATCHER["RpcDispatcher"]
CALLER_IF --> DISPATCHER
ENDPOINT_IF --> DISPATCHER
end
DEF -.->|imported by| HANDLERS
DEF -.->|imported by| CALLS
HANDLERS -->|implements| ENDPOINT_IF
CALLS -->|uses| CALLER_IF
CLIENT <-.->|WebSocket frames| SERVER
Project Structure for an Example Application
When building a muxio application, the typical project structure separates concerns into distinct crates:
graph LR
subgraph "Workspace Root"
CARGO["Cargo.toml\n[workspace]"]
end
subgraph "Service Definition Crate"
DEF_CARGO["service-definition/Cargo.toml"]
DEF_LIB["service-definition/src/lib.rs"]
DEF_PREBUF["RpcMethodPrebuffered impls"]
DEF_CARGO --> DEF_LIB
DEF_LIB --> DEF_PREBUF
end
subgraph "Application Crate"
APP_CARGO["app/Cargo.toml"]
APP_MAIN["app/src/main.rs"]
SERVER_CODE["Server setup + handlers"]
CLIENT_CODE["Client setup + calls"]
APP_CARGO --> APP_MAIN
APP_MAIN --> SERVER_CODE
APP_MAIN --> CLIENT_CODE
end
CARGO -->|members| DEF_CARGO
CARGO -->|members| APP_CARGO
APP_CARGO -.->|depends on| DEF_CARGO
DEF_CARGO -.->|muxio-rpc-service| MUXIO_RPC["muxio-rpc-service"]
APP_CARGO -.->|muxio-tokio-rpc-server| TOKIO_SERVER["muxio-tokio-rpc-server"]
APP_CARGO -.->|muxio-tokio-rpc-client| TOKIO_CLIENT["muxio-tokio-rpc-client"]
This structure ensures:
- Service definitions are shared between client and server (compile-time type safety)
- Application code depends on the definition crate
- Changes to the service contract require recompilation of both sides
Sources : Cargo.lock:426-449 README.md:70-74
Understanding the Request-Response Flow
The following sequence diagram traces a complete RPC call through the example application:
Sources : README.md:92-162
sequenceDiagram
participant Main as "main() function"
participant Client as "RpcClient"
participant AddTrait as "Add::call()"
participant Caller as "RpcServiceCallerInterface"
participant WS as "WebSocket Transport"
participant Server as "RpcServer"
participant Endpoint as "RpcServiceEndpointInterface"
participant Handler as "register_prebuffered handler"
Note over Main,Handler: Server Setup Phase
Main->>Server: RpcServer::new()
Main->>Server: server.endpoint()
Server->>Endpoint: returns endpoint handle
Main->>Endpoint: endpoint.register_prebuffered(Add::METHOD_ID, handler)
Endpoint->>Handler: stores async handler
Main->>Server: server.serve_with_listener(listener)
Note over Main,Handler: Client Connection Phase
Main->>Client: RpcClient::new(host, port)
Client->>WS: WebSocket connection established
Note over Main,Handler: RPC Call Phase
Main->>AddTrait: Add::call(&client, vec![1.0, 2.0, 3.0])
AddTrait->>AddTrait: Add::encode_request(params)
AddTrait->>Caller: client.call_prebuffered(METHOD_ID, encoded_bytes)
Caller->>WS: Binary frames with stream_id, method_id
WS->>Server: Receive frames
Server->>Endpoint: Dispatch to registered handler
Endpoint->>Handler: invoke with request_bytes, ctx
Handler->>Handler: Add::decode_request(&request_bytes)
Handler->>Handler: let sum = request_params.iter().sum()
Handler->>Handler: Add::encode_response(sum)
Handler->>Endpoint: Ok(response_bytes)
Endpoint->>WS: Binary frames with result
WS->>Caller: Receive frames
Caller->>AddTrait: response_bytes
AddTrait->>AddTrait: Add::decode_response(&response_bytes)
AddTrait->>Main: Ok(6.0)
Detailed Code Flow Through the Example
The following table maps natural language steps to specific code locations:
| Step | Description | Code Location |
|---|---|---|
| 1. Bind server socket | Create TCP listener on random port | README.md88 |
| 2. Create server instance | Instantiate RpcServer wrapped in Arc | README.md95 |
| 3. Get endpoint handle | Retrieve endpoint for handler registration | README.md98 |
| 4. Register Add handler | Register async handler for Add::METHOD_ID | README.md:102-107 |
| 5. Register Mult handler | Register async handler for Mult::METHOD_ID | README.md:108-113 |
| 6. Register Echo handler | Register async handler for Echo::METHOD_ID | README.md:114-118 |
| 7. Spawn server task | Start server with serve_with_listener() | README.md126 |
| 8. Create client | Connect to server via RpcClient::new() | README.md137 |
| 9. Set state handler | Register callback for connection state changes | README.md:139-142 |
| 10. Make concurrent calls | Use join! macro to await multiple RPC calls | README.md:145-152 |
| 11. Verify results | Assert response values match expected results | README.md:154-159 |
Sources : README.md:83-161
Service Definition Pattern
The service definition crate defines the contract between client and server. Each RPC method is implemented as a unit struct that implements the RpcMethodPrebuffered trait:
graph TB
subgraph "Service Definition Structure"
TRAIT["RpcMethodPrebuffered trait"]
ADD_STRUCT["pub struct Add"]
ADD_IMPL["impl RpcMethodPrebuffered for Add"]
ADD_METHOD_ID["const METHOD_ID: u64"]
ADD_REQUEST["type Request = Vec<f64>"]
ADD_RESPONSE["type Response = f64"]
MULT_STRUCT["pub struct Mult"]
MULT_IMPL["impl RpcMethodPrebuffered for Mult"]
ECHO_STRUCT["pub struct Echo"]
ECHO_IMPL["impl RpcMethodPrebuffered for Echo"]
TRAIT -.->|implemented by| ADD_IMPL
TRAIT -.->|implemented by| MULT_IMPL
TRAIT -.->|implemented by| ECHO_IMPL
ADD_STRUCT --> ADD_IMPL
ADD_IMPL --> ADD_METHOD_ID
ADD_IMPL --> ADD_REQUEST
ADD_IMPL --> ADD_RESPONSE
MULT_STRUCT --> MULT_IMPL
ECHO_STRUCT --> ECHO_IMPL
end
subgraph "Generated by Trait"
ENCODE_REQ["encode_request()"]
DECODE_REQ["decode_request()"]
ENCODE_RESP["encode_response()"]
DECODE_RESP["decode_response()"]
CALL_METHOD["call()
async fn"]
TRAIT -.->|provides| ENCODE_REQ
TRAIT -.->|provides| DECODE_REQ
TRAIT -.->|provides| ENCODE_RESP
TRAIT -.->|provides| DECODE_RESP
TRAIT -.->|provides| CALL_METHOD
end
subgraph "Serialization"
BITCODE["bitcode::encode/decode"]
ENCODE_REQ --> BITCODE
DECODE_REQ --> BITCODE
ENCODE_RESP --> BITCODE
DECODE_RESP --> BITCODE
end
subgraph "Method ID Generation"
XXHASH["xxhash-rust"]
METHOD_NAME["Method name string"]
ADD_METHOD_ID -.->|hash of| METHOD_NAME
METHOD_NAME --> XXHASH
end
The RpcMethodPrebuffered trait provides default implementations for encoding/decoding using bitcode serialization and a call() method that works with any RpcServiceCallerInterface implementation.
Sources : README.md:71-74 Cargo.lock:426-431
Handler Registration Pattern
Server-side handlers are registered on the endpoint using the register_prebuffered() method. Each handler is an async closure that:
- Receives raw request bytes and a context object
- Decodes the request using the service definition’s
decode_request()method - Performs the business logic
- Encodes the response using
encode_response() - Returns
Result<Vec<u8>, RpcServiceError>
Sources : README.md:101-119
graph LR
subgraph "Handler Registration Flow"
ENDPOINT["endpoint.register_prebuffered()"]
METHOD_ID["Add::METHOD_ID"]
CLOSURE["async closure"]
ENDPOINT -->|key| METHOD_ID
ENDPOINT -->|value| CLOSURE
end
subgraph "Handler Execution Flow"
REQ_BYTES["request_bytes: Vec<u8>"]
CTX["_ctx: Arc<RpcContext>"]
DECODE["Add::decode_request(&request_bytes)"]
LOGIC["Business logic: iter().sum()"]
ENCODE["Add::encode_response(sum)"]
RESULT["Ok(response_bytes)"]
REQ_BYTES --> DECODE
CTX -.->|available but unused| LOGIC
DECODE --> LOGIC
LOGIC --> ENCODE
ENCODE --> RESULT
end
CLOSURE -.->|contains| DECODE
graph TB
subgraph "Client Call Flow"
APP_CODE["Application code"]
CALL_METHOD["Add::call(&client, vec![1.0, 2.0, 3.0])"]
APP_CODE --> CALL_METHOD
end
subgraph "Inside call()
implementation"
ENCODE["encode_request(params)"]
CALL_PREBUF["client.call_prebuffered(METHOD_ID, bytes)"]
AWAIT["await response"]
DECODE["decode_response(&response_bytes)"]
RETURN["Ok(Response)"]
CALL_METHOD --> ENCODE
ENCODE --> CALL_PREBUF
CALL_PREBUF --> AWAIT
AWAIT --> DECODE
DECODE --> RETURN
end
subgraph "Client Implementation"
CLIENT["RpcClient\n(implements RpcServiceCallerInterface)"]
DISPATCHER["RpcDispatcher"]
SESSION["RpcSession"]
CALL_PREBUF -.->|delegates to| CLIENT
CLIENT --> DISPATCHER
DISPATCHER --> SESSION
end
RETURN --> APP_CODE
Client Call Pattern
Client-side calls use the service definition’s call() method, which is automatically provided by the RpcMethodPrebuffered trait:
The call() method handles all serialization, transport, and deserialization automatically. Application code works with typed Rust structs, never touching raw bytes.
Sources : README.md:145-152
graph TB
subgraph "Concurrent Calls with join!"
JOIN["join!()
macro"]
CALL1["Add::call(&client, vec![1.0, 2.0, 3.0])"]
CALL2["Add::call(&client, vec![8.0, 3.0, 7.0])"]
CALL3["Mult::call(&client, vec![8.0, 3.0, 7.0])"]
CALL4["Mult::call(&client, vec![1.5, 2.5, 8.5])"]
CALL5["Echo::call(&client, b\"testing 1 2 3\")"]
CALL6["Echo::call(&client, b\"testing 4 5 6\")"]
JOIN --> CALL1
JOIN --> CALL2
JOIN --> CALL3
JOIN --> CALL4
JOIN --> CALL5
JOIN --> CALL6
end
subgraph "Multiplexing Layer"
SESSION["RpcSession"]
STREAM1["Stream ID: 1"]
STREAM2["Stream ID: 2"]
STREAM3["Stream ID: 3"]
STREAM4["Stream ID: 4"]
STREAM5["Stream ID: 5"]
STREAM6["Stream ID: 6"]
SESSION --> STREAM1
SESSION --> STREAM2
SESSION --> STREAM3
SESSION --> STREAM4
SESSION --> STREAM5
SESSION --> STREAM6
end
subgraph "Single WebSocket Connection"
WS["Binary frames interleaved\nover single connection"]
end
CALL1 -.->|assigned| STREAM1
CALL2 -.->|assigned| STREAM2
CALL3 -.->|assigned| STREAM3
CALL4 -.->|assigned| STREAM4
CALL5 -.->|assigned| STREAM5
CALL6 -.->|assigned| STREAM6
SESSION --> WS
subgraph "Result Tuple"
RESULTS["(res1, res2, res3, res4, res5, res6)"]
end
JOIN --> RESULTS
Concurrent Request Handling
The example demonstrates concurrent request handling using Tokio’s join! macro:
Each concurrent call is assigned a unique stream ID, allowing frames to be interleaved over the single WebSocket connection. The join! macro waits for all responses before proceeding.
Sources : README.md:145-152
State Change Handling
The client supports registering a state change handler to track connection lifecycle:
| State | Description | Typical Response |
|---|---|---|
Connecting | Initial connection attempt | Log connection start |
Connected | WebSocket established | Enable UI, start heartbeat |
Disconnected | Connection lost | Disable UI, attempt reconnect |
Error | Connection error occurred | Log error, notify user |
The handler is registered using set_state_change_handler():
This callback-driven pattern enables reactive behavior without blocking the main application logic.
Sources : README.md:139-142
graph LR
subgraph "Handler Error Flow"
HANDLER["Handler function"]
DECODE_ERR["decode_request()
fails"]
LOGIC_ERR["Business logic fails"]
ENCODE_ERR["encode_response()
fails"]
HANDLER --> DECODE_ERR
HANDLER --> LOGIC_ERR
HANDLER --> ENCODE_ERR
end
subgraph "Error Propagation"
RPC_ERR["Err(RpcServiceError)"]
ENDPOINT["RpcServiceEndpointInterface"]
DISPATCHER["RpcDispatcher"]
TRANSPORT["Binary error frame"]
DECODE_ERR --> RPC_ERR
LOGIC_ERR --> RPC_ERR
ENCODE_ERR --> RPC_ERR
RPC_ERR --> ENDPOINT
ENDPOINT --> DISPATCHER
DISPATCHER --> TRANSPORT
end
subgraph "Client Side"
CLIENT_CALL["call()
method"]
RESULT["Err(RpcServiceError)"]
APP["Application code"]
TRANSPORT --> CLIENT_CALL
CLIENT_CALL --> RESULT
RESULT --> APP
end
Error Handling in Handlers
Handlers return Result<Vec<u8>, RpcServiceError>. The framework automatically propagates errors to the client:
Common error scenarios:
- Malformed request bytes (deserialization failure)
- Business logic errors (e.g., division by zero, validation failure)
- Resource errors (e.g., database unavailable)
All errors are serialized and transmitted back to the client as part of the RPC protocol.
Sources : README.md:102-118
Running the Example
To run the complete example application:
- Clone the repository
- Navigate to the workspace root
- Execute the example:
Expected output includes:
- Server binding to random port
- Handler registration confirmations
- Client connection establishment
- State change callback invocations
- Successful assertion of all RPC results
The example demonstrates a complete lifecycle:
- Server starts and binds to a port
- Handlers are registered for three methods
- Client connects via WebSocket
- Six concurrent RPC calls are made
- All responses are verified
- The application exits cleanly
Sources : README.md:64-162
Best Practices from the Example
The example application demonstrates several important patterns:
| Pattern | Implementation | Benefit |
|---|---|---|
| Arc-wrapped server | Arc::new(RpcServer::new(None)) | Safe sharing across async tasks |
| Random port binding | TcpListener::bind("127.0.0.1:0") | Avoids port conflicts in testing |
| Concurrent registration | join!() for handler registration | Parallel setup reduces startup time |
| Async handler closures | async move { ... } | Enables async business logic |
| Destructured joins | let (res1, res2, ...) = join!(...) | Clear result assignment |
| State change callbacks | set_state_change_handler() | Reactive connection management |
These patterns can be adapted for production applications with appropriate error handling, logging, and configuration management.
Sources : README.md:83-161