This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Custom Transport Implementation
Relevant source files
- Cargo.toml
- extensions/muxio-rpc-service-caller/src/lib.rs
- extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs
- extensions/muxio-rpc-service-endpoint/Cargo.toml
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs
Purpose and Scope
This document explains how to implement custom transports for the muxio RPC system beyond the provided WebSocket implementations. Custom transports enable the use of alternative communication channels such as Unix domain sockets, named pipes, HTTP/2, QUIC, or in-memory channels for testing.
For information about the existing WebSocket transport implementations, see Tokio RPC Server, Tokio RPC Client, and WASM RPC Client. For the underlying protocol details, see Binary Framing Protocol and RPC Dispatcher.
Transport Architecture Overview
Custom transports act as adapters between the transport-agnostic RpcDispatcher and the specific communication mechanism. The transport layer is responsible for moving bytes bidirectionally while the dispatcher handles multiplexing, request correlation, and protocol encoding/decoding.
graph TB
APP["Application Code"]
subgraph "Custom Transport Layer"
CLIENT_IMPL["Custom Client Implementation\n(Your Code)"]
SERVER_IMPL["Custom Server Implementation\n(Your Code)"]
end
subgraph "RPC Interface Layer"
CALLER["RpcServiceCallerInterface"]
ENDPOINT["RpcServiceEndpointInterface"]
end
subgraph "Core Multiplexing Layer"
DISPATCHER_C["RpcDispatcher\n(Client-side)"]
DISPATCHER_S["RpcDispatcher\n(Server-side)"]
end
subgraph "Transport Medium"
CHANNEL["Custom I/O Channel\n(TCP, UDP, Unix Socket, etc.)"]
end
APP --> CLIENT_IMPL
APP --> SERVER_IMPL
CLIENT_IMPL -.implements.-> CALLER
SERVER_IMPL -.implements.-> ENDPOINT
CALLER --> DISPATCHER_C
ENDPOINT --> DISPATCHER_S
CLIENT_IMPL --> CHANNEL
SERVER_IMPL --> CHANNEL
DISPATCHER_C -.read_bytes/write_bytes.-> CLIENT_IMPL
DISPATCHER_S -.read_bytes/respond.-> SERVER_IMPL
Transport Integration Points
Sources:
- extensions/muxio-rpc-service-caller/src/lib.rs:1-11
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-138
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-276
Client Transport Implementation
Client transports must implement the RpcServiceCallerInterface trait to enable RPC method invocation. This trait provides the contract for establishing connections, sending requests, and receiving responses.
Required Interface Methods
The RpcServiceCallerInterface trait requires the following methods:
| Method | Return Type | Purpose |
|---|---|---|
get_dispatcher() | Arc<TokioMutex<RpcDispatcher<'static>>> | Provides access to the dispatcher for request management |
is_connected() | bool | Indicates current connection state |
get_emit_fn() | Arc<dyn Fn(Vec<u8>) + Send + Sync> | Returns function for sending encoded bytes over transport |
set_state_change_handler() | async fn | Registers callback for connection state changes |
call_rpc_streaming() | Result<(RpcStreamEncoder, DynamicReceiver), RpcServiceError> | Initiates streaming RPC calls (optional, has default impl) |
Sources:
Client Transport Structure
Sources:
Minimal Client Implementation Pattern
A minimal client transport implementation follows this pattern:
- Dispatcher Management : Create and wrap
RpcDispatcherinArc<TokioMutex<_>> - Connection State : Track connection state with
Arc<AtomicBool> - Emit Function : Implement function that sends bytes to the underlying transport
- Receive Loop : Spawn task that reads bytes and calls
dispatcher.read_bytes() - Send Loop : Spawn task that writes outgoing bytes to transport
- State Handler : Support registration of state change callbacks
graph TB
NEW["CustomClient::new()"]
subgraph "Initialization"
CONNECT["Connect to transport"]
CREATE_DISP["RpcDispatcher::new()"]
CREATE_CHANNEL["Create send/receive channels"]
end
subgraph "Task Spawning"
SPAWN_RECV["Spawn receive loop:\nReads bytes → dispatcher.read_bytes()"]
SPAWN_SEND["Spawn send loop:\nWrites bytes to transport"]
end
subgraph "Result"
RETURN["Return Arc<CustomClient>"]
end
NEW --> CONNECT
CONNECT --> CREATE_DISP
CREATE_DISP --> CREATE_CHANNEL
CREATE_CHANNEL --> SPAWN_RECV
SPAWN_RECV --> SPAWN_SEND
SPAWN_SEND --> RETURN
Example structure from mock test implementation:
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:110-271
- extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:19-88
Server Transport Implementation
Server transports must implement the RpcServiceEndpointInterface trait to handle incoming RPC requests. This trait manages handler registration and request processing.
Required Server Interface Methods
The RpcServiceEndpointInterface trait requires:
| Method | Parameters | Purpose |
|---|---|---|
get_prebuffered_handlers() | - | Returns handlers lock for accessing registered method handlers |
register_prebuffered() | method_id: u64, handler: F | Registers handler function for a specific RPC method |
read_bytes() | dispatcher: &mut RpcDispatcher, context: C, bytes: &[u8], on_emit: E | Processes incoming bytes and dispatches to handlers |
Sources:
sequenceDiagram
participant Transport as "Custom Transport"
participant ReadBytes as "read_bytes()"
participant Dispatcher as "RpcDispatcher"
participant Handlers as "Handler Registry"
participant Handler as "User Handler"
Note over Transport,Handler: Stage 1: Decode & Identify
Transport->>ReadBytes: bytes from transport
ReadBytes->>Dispatcher: dispatcher.read_bytes(bytes)
Dispatcher-->>ReadBytes: Vec<request_id>
loop For each finalized request
ReadBytes->>Dispatcher: is_rpc_request_finalized(id)
Dispatcher-->>ReadBytes: true
ReadBytes->>Dispatcher: delete_rpc_request(id)
Dispatcher-->>ReadBytes: RpcRequest
end
Note over Transport,Handler: Stage 2: Execute Handlers
loop For each request
ReadBytes->>Handlers: get handler for method_id
Handlers-->>ReadBytes: handler function
ReadBytes->>Handler: handler(request_bytes, context)
Handler-->>ReadBytes: response_bytes
end
Note over Transport,Handler: Stage 3: Encode & Emit
loop For each response
ReadBytes->>Dispatcher: dispatcher.respond(response)
Dispatcher->>Transport: on_emit(encoded_bytes)
end
Server Request Processing Flow
The read_bytes method implements a three-stage pipeline for request processing:
Sources:
Server Implementation Pattern
A server transport implementation typically:
- Accepts incoming connections on the transport medium
- For each connection, creates an
RpcDispatcherinstance - Spawns receive loop that calls
endpoint.read_bytes()with incoming bytes - Provides
on_emitclosure that sends response bytes back over transport - Manages connection lifecycle and cleanup
graph TB
ACCEPT["Accept connection"]
subgraph "Per-Connection Setup"
CREATE_DISP["Create RpcDispatcher"]
SPLIT["Split I/O into\nreader and writer"]
end
subgraph "Receive Loop"
READ["Read bytes from transport"]
PROCESS["endpoint.read_bytes(\ndispatcher,\ncontext,\nbytes,\non_emit)"]
DECODE["Stage 1: Decode frames"]
INVOKE["Stage 2: Invoke handlers"]
EMIT["Stage 3: Emit responses"]
end
subgraph "Emit Closure"
ON_EMIT["on_emit closure:\nsends bytes to writer"]
end
ACCEPT --> CREATE_DISP
CREATE_DISP --> SPLIT
SPLIT --> READ
READ --> PROCESS
PROCESS --> DECODE
DECODE --> INVOKE
INVOKE --> EMIT
EMIT --> ON_EMIT
ON_EMIT --> READ
Connection handler structure:
Sources:
stateDiagram-v2
[*] --> Created: RpcDispatcher::new()
Created --> Processing : Bytes received
Processing --> Decoding : read_bytes(bytes)
Decoding --> Pending : Partial request
Decoding --> Finalized : Complete request
Pending --> Processing : More bytes
Finalized --> HandlerInvoked : Server - invoke handler
Finalized --> ResponsePending : Client - await response
HandlerInvoked --> Responding : respond()
Responding --> Processing : More requests
ResponsePending --> ResponseReceived : read_bytes(response)
ResponseReceived --> Processing : More requests
Processing --> Shutdown : fail_all_pending_requests()
Shutdown --> [*]
Managing the RPC Dispatcher
The RpcDispatcher is the core component that handles protocol encoding/decoding, request/response correlation, and stream management. Both client and server transports must properly integrate with the dispatcher.
Dispatcher Lifecycle
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:82-108
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-136
Key Dispatcher Operations
| Operation | Usage Context | Description |
|---|---|---|
read_bytes(&mut self, bytes: &[u8]) | Client & Server | Decodes incoming bytes, returns IDs of affected requests |
is_rpc_request_finalized(id: u32) | Server | Checks if request with given ID is complete |
delete_rpc_request(id: u32) | Server | Removes and returns finalized request |
respond(response, max_chunk_size, on_emit) | Server | Encodes and emits response |
fail_all_pending_requests(error) | Client | Cancels all pending requests on disconnect |
Sources:
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:82-92
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:100-103
graph TB
EMIT_CALL["emit_fn(bytes: Vec<u8>)"]
subgraph "Checks"
CHECK_CONN["Check is_connected"]
CHECK_SIZE["Check byte length"]
end
subgraph "Queueing"
QUEUE["Send to internal channel\n(mpsc, crossbeam, etc.)"]
end
subgraph "Error Handling"
LOG_ERROR["Log failure"]
DROP["Drop bytes\n(don't panic)"]
end
EMIT_CALL --> CHECK_CONN
CHECK_CONN -->|connected| CHECK_SIZE
CHECK_CONN -->|disconnected| LOG_ERROR
CHECK_SIZE --> QUEUE
QUEUE -->|success| LOG_TRACE["Trace log success"]
QUEUE -->|failure| LOG_ERROR
LOG_ERROR --> DROP
LOG_TRACE --> RETURN["Return"]
DROP --> RETURN
Implementing the Emit Function
The emit function is the primary mechanism for sending encoded bytes to the underlying transport. It must handle queueing, flow control, and error conditions.
Emit Function Requirements
Sources:
graph LR
CALLER["RPC Caller invokes emit_fn"]
subgraph "get_emit_fn()
Implementation"
CLOSURE["Arc<dyn Fn(Vec<u8>)>"]
CHECK["is_connected check"]
CONVERT["Vec<u8> → WsMessage::Binary"]
SEND["tx.send(message)"]
end
subgraph "Send Loop Task"
RECV["app_rx.recv()"]
TRANSMIT["ws_sender.send(msg)"]
IO["WebSocket I/O"]
end
CALLER --> CLOSURE
CLOSURE --> CHECK
CHECK -->|connected| CONVERT
CHECK -->|disconnected| DROP["Drop & log warning"]
CONVERT --> SEND
SEND --> RECV
RECV --> TRANSMIT
TRANSMIT --> IO
WebSocket Client Emit Implementation
The Tokio WebSocket client implementation demonstrates a proper emit function pattern:
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:224-257
graph TB
HANDLER["Handler returns response"]
RESPOND["dispatcher.respond(\nresponse,\nchunk_size,\non_emit)"]
subgraph "on_emit Closure"
EMIT["on_emit: impl RpcEmit"]
WRAPPER["Wraps send logic"]
SEND["Send to writer task"]
end
WRITE["Write to transport"]
HANDLER --> RESPOND
RESPOND --> EMIT
EMIT --> WRAPPER
WRAPPER --> SEND
SEND --> WRITE
Server Emit Closure Pattern
On the server side, the emit closure is typically provided as a parameter to read_bytes():
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:166-173
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:132-134
stateDiagram-v2
[*] --> Connecting : new() called
Connecting --> Connected : Transport connected
Connecting --> Error : Connection failed
Connected --> Disconnecting : shutdown_async()
Connected --> Error : I/O error
Disconnecting --> Disconnected : State handler called
Error --> Disconnected : State handler called
Disconnected --> [*] : Resources cleaned up
State Management and Error Handling
Proper state management is critical for custom transports to ensure graceful handling of disconnections, errors, and resource cleanup.
Connection State Tracking
Sources:
State Change Handler Invocation
Transport implementations should invoke the state change handler at appropriate times:
| Event | State to Report | When to Invoke |
|---|---|---|
| Connection established | RpcTransportState::Connected | After successful connection, when handler is set |
| Connection lost (error) | RpcTransportState::Disconnected | When I/O error detected in receive loop |
| Explicit shutdown | RpcTransportState::Disconnected | When shutdown_async() called |
| Drop cleanup | RpcTransportState::Disconnected | In Drop::drop() implementation |
Example from Tokio client:
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:315-334
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:79-108
Error Propagation Strategy
Custom transports should fail pending requests when disconnection occurs:
Sources:
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs:99-103
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:167-292
classDiagram
class MockRpcClient {+Arc~TokioMutex~RpcDispatcher~~ get_dispatcher()\n+Arc~dyn Fn~ get_emit_fn()\n+bool is_connected()\n+call_rpc_streaming()\n+set_state_change_handler()}
class RpcServiceCallerInterface {<<interface>>}
class TestState {
+SharedResponseSender response_sender_provider
+Arc~AtomicBool~ is_connected_atomic
}
MockRpcClient ..|> RpcServiceCallerInterface
MockRpcClient --> TestState : contains
Example: Mock Transport for Testing
The test suite includes a minimal mock transport implementation that demonstrates the core patterns without actual I/O.
Mock Client Structure
Sources:
Mock Implementation Highlights
The mock transport demonstrates minimal required functionality:
- Dispatcher : Returns new dispatcher instance (no state sharing needed for mock)
- Emit Function : No-op closure
Arc::new(|_| {}) - Connection State : Uses
AtomicBoolfor simple state tracking - Streaming Calls : Creates channels and returns them directly without actual I/O
- State Handler : No-op implementation
Key simplifications in mock vs. production transport:
| Aspect | Production Transport | Mock Transport |
|---|---|---|
| I/O | Actual socket/stream operations | No-op or in-memory channels |
| Task Spawning | Spawns send/receive loop tasks | No background tasks |
| Error Handling | Detects I/O errors, handles disconnects | Minimal error simulation |
| Lifecycle | Complex connection management | Simple atomic bool |
| Threading | Requires Send + Sync across tasks | Simpler synchronization |
Sources:
Testing Custom Transports
When implementing a custom transport, comprehensive testing should cover:
Test Coverage Areas
Sources:
Integration Test Pattern
Integration tests should verify end-to-end RPC communication:
Sources:
Key Test Scenarios
Test implementations should validate:
| Scenario | Verification |
|---|---|
| Connection failure | new() returns error for invalid address/port |
| State transitions | Handler called with Connected then Disconnected |
| Pending request cleanup | Pending calls fail with cancellation error on disconnect |
| RPC success path | Method invocation returns expected result |
| Concurrent requests | Multiple simultaneous RPCs complete correctly |
| Large payloads | Chunking and reassembly work correctly |
| Handler registration | Methods can be registered before connection |
| Error propagation | Transport errors surface as RPC errors |
Sources:
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:15-31
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:167-292
Summary
Implementing a custom transport requires:
- Client-side : Implement
RpcServiceCallerInterfacewith dispatcher management, emit function, and state tracking - Server-side : Implement
RpcServiceEndpointInterfacewith handler registration and request processing - Dispatcher Integration : Properly call
read_bytes()andrespond()methods - Emit Function : Implement reliable byte transmission with error handling
- State Management : Track connection state and invoke state change handlers
- Error Handling : Fail pending requests on disconnect and propagate errors
- Testing : Comprehensive tests covering connection, data transfer, and error scenarios
The mock transport implementation in extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:19-88 provides a minimal reference, while the Tokio WebSocket client in extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-336 demonstrates a production-ready implementation pattern.
Dismiss
Refresh this wiki
Enter email to refresh