Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Custom Transport Implementation

Relevant source files

Purpose and Scope

This document explains how to implement custom transports for the muxio RPC system beyond the provided WebSocket implementations. Custom transports enable the use of alternative communication channels such as Unix domain sockets, named pipes, HTTP/2, QUIC, or in-memory channels for testing.

For information about the existing WebSocket transport implementations, see Tokio RPC Server, Tokio RPC Client, and WASM RPC Client. For the underlying protocol details, see Binary Framing Protocol and RPC Dispatcher.


Transport Architecture Overview

Custom transports act as adapters between the transport-agnostic RpcDispatcher and the specific communication mechanism. The transport layer is responsible for moving bytes bidirectionally while the dispatcher handles multiplexing, request correlation, and protocol encoding/decoding.

graph TB
    APP["Application Code"]
subgraph "Custom Transport Layer"
        CLIENT_IMPL["Custom Client Implementation\n(Your Code)"]
SERVER_IMPL["Custom Server Implementation\n(Your Code)"]
end
    
    subgraph "RPC Interface Layer"
        CALLER["RpcServiceCallerInterface"]
ENDPOINT["RpcServiceEndpointInterface"]
end
    
    subgraph "Core Multiplexing Layer"
        DISPATCHER_C["RpcDispatcher\n(Client-side)"]
DISPATCHER_S["RpcDispatcher\n(Server-side)"]
end
    
    subgraph "Transport Medium"
        CHANNEL["Custom I/O Channel\n(TCP, UDP, Unix Socket, etc.)"]
end
    
 
   APP --> CLIENT_IMPL
 
   APP --> SERVER_IMPL
    
    CLIENT_IMPL -.implements.-> CALLER
    SERVER_IMPL -.implements.-> ENDPOINT
    
 
   CALLER --> DISPATCHER_C
 
   ENDPOINT --> DISPATCHER_S
    
 
   CLIENT_IMPL --> CHANNEL
 
   SERVER_IMPL --> CHANNEL
    
    DISPATCHER_C -.read_bytes/write_bytes.-> CLIENT_IMPL
    DISPATCHER_S -.read_bytes/respond.-> SERVER_IMPL

Transport Integration Points

Sources:


Client Transport Implementation

Client transports must implement the RpcServiceCallerInterface trait to enable RPC method invocation. This trait provides the contract for establishing connections, sending requests, and receiving responses.

Required Interface Methods

The RpcServiceCallerInterface trait requires the following methods:

MethodReturn TypePurpose
get_dispatcher()Arc<TokioMutex<RpcDispatcher<'static>>>Provides access to the dispatcher for request management
is_connected()boolIndicates current connection state
get_emit_fn()Arc<dyn Fn(Vec<u8>) + Send + Sync>Returns function for sending encoded bytes over transport
set_state_change_handler()async fnRegisters callback for connection state changes
call_rpc_streaming()Result<(RpcStreamEncoder, DynamicReceiver), RpcServiceError>Initiates streaming RPC calls (optional, has default impl)

Sources:

Client Transport Structure

Sources:

Minimal Client Implementation Pattern

A minimal client transport implementation follows this pattern:

  1. Dispatcher Management : Create and wrap RpcDispatcher in Arc<TokioMutex<_>>
  2. Connection State : Track connection state with Arc<AtomicBool>
  3. Emit Function : Implement function that sends bytes to the underlying transport
  4. Receive Loop : Spawn task that reads bytes and calls dispatcher.read_bytes()
  5. Send Loop : Spawn task that writes outgoing bytes to transport
  6. State Handler : Support registration of state change callbacks
graph TB
    NEW["CustomClient::new()"]
subgraph "Initialization"
        CONNECT["Connect to transport"]
CREATE_DISP["RpcDispatcher::new()"]
CREATE_CHANNEL["Create send/receive channels"]
end
    
    subgraph "Task Spawning"
        SPAWN_RECV["Spawn receive loop:\nReads bytes → dispatcher.read_bytes()"]
SPAWN_SEND["Spawn send loop:\nWrites bytes to transport"]
end
    
    subgraph "Result"
        RETURN["Return Arc<CustomClient>"]
end
    
 
   NEW --> CONNECT
 
   CONNECT --> CREATE_DISP
 
   CREATE_DISP --> CREATE_CHANNEL
 
   CREATE_CHANNEL --> SPAWN_RECV
 
   SPAWN_RECV --> SPAWN_SEND
 
   SPAWN_SEND --> RETURN

Example structure from mock test implementation:

Sources:


Server Transport Implementation

Server transports must implement the RpcServiceEndpointInterface trait to handle incoming RPC requests. This trait manages handler registration and request processing.

Required Server Interface Methods

The RpcServiceEndpointInterface trait requires:

MethodParametersPurpose
get_prebuffered_handlers()-Returns handlers lock for accessing registered method handlers
register_prebuffered()method_id: u64, handler: FRegisters handler function for a specific RPC method
read_bytes()dispatcher: &mut RpcDispatcher, context: C, bytes: &[u8], on_emit: EProcesses incoming bytes and dispatches to handlers

Sources:

sequenceDiagram
    participant Transport as "Custom Transport"
    participant ReadBytes as "read_bytes()"
    participant Dispatcher as "RpcDispatcher"
    participant Handlers as "Handler Registry"
    participant Handler as "User Handler"
    
    Note over Transport,Handler: Stage 1: Decode & Identify
    Transport->>ReadBytes: bytes from transport
    ReadBytes->>Dispatcher: dispatcher.read_bytes(bytes)
    Dispatcher-->>ReadBytes: Vec<request_id>
    loop For each finalized request
        ReadBytes->>Dispatcher: is_rpc_request_finalized(id)
        Dispatcher-->>ReadBytes: true
        ReadBytes->>Dispatcher: delete_rpc_request(id)
        Dispatcher-->>ReadBytes: RpcRequest
    end
    
    Note over Transport,Handler: Stage 2: Execute Handlers
    loop For each request
        ReadBytes->>Handlers: get handler for method_id
        Handlers-->>ReadBytes: handler function
        ReadBytes->>Handler: handler(request_bytes, context)
        Handler-->>ReadBytes: response_bytes
    end
    
    Note over Transport,Handler: Stage 3: Encode & Emit
    loop For each response
        ReadBytes->>Dispatcher: dispatcher.respond(response)
        Dispatcher->>Transport: on_emit(encoded_bytes)
    end

Server Request Processing Flow

The read_bytes method implements a three-stage pipeline for request processing:

Sources:

Server Implementation Pattern

A server transport implementation typically:

  1. Accepts incoming connections on the transport medium
  2. For each connection, creates an RpcDispatcher instance
  3. Spawns receive loop that calls endpoint.read_bytes() with incoming bytes
  4. Provides on_emit closure that sends response bytes back over transport
  5. Manages connection lifecycle and cleanup
graph TB
    ACCEPT["Accept connection"]
subgraph "Per-Connection Setup"
        CREATE_DISP["Create RpcDispatcher"]
SPLIT["Split I/O into\nreader and writer"]
end
    
    subgraph "Receive Loop"
        READ["Read bytes from transport"]
PROCESS["endpoint.read_bytes(\ndispatcher,\ncontext,\nbytes,\non_emit)"]
DECODE["Stage 1: Decode frames"]
INVOKE["Stage 2: Invoke handlers"]
EMIT["Stage 3: Emit responses"]
end
    
    subgraph "Emit Closure"
        ON_EMIT["on_emit closure:\nsends bytes to writer"]
end
    
 
   ACCEPT --> CREATE_DISP
 
   CREATE_DISP --> SPLIT
 
   SPLIT --> READ
 
   READ --> PROCESS
 
   PROCESS --> DECODE
 
   DECODE --> INVOKE
 
   INVOKE --> EMIT
 
   EMIT --> ON_EMIT
 
   ON_EMIT --> READ

Connection handler structure:

Sources:


stateDiagram-v2
    [*] --> Created: RpcDispatcher::new()
    
    Created --> Processing : Bytes received
    
    Processing --> Decoding : read_bytes(bytes)
    Decoding --> Pending : Partial request
    Decoding --> Finalized : Complete request
    
    Pending --> Processing : More bytes
    Finalized --> HandlerInvoked : Server - invoke handler
    Finalized --> ResponsePending : Client - await response
    
    HandlerInvoked --> Responding : respond()
    Responding --> Processing : More requests
    
    ResponsePending --> ResponseReceived : read_bytes(response)
    ResponseReceived --> Processing : More requests
    
    Processing --> Shutdown : fail_all_pending_requests()
    Shutdown --> [*]

Managing the RPC Dispatcher

The RpcDispatcher is the core component that handles protocol encoding/decoding, request/response correlation, and stream management. Both client and server transports must properly integrate with the dispatcher.

Dispatcher Lifecycle

Sources:

Key Dispatcher Operations

OperationUsage ContextDescription
read_bytes(&mut self, bytes: &[u8])Client & ServerDecodes incoming bytes, returns IDs of affected requests
is_rpc_request_finalized(id: u32)ServerChecks if request with given ID is complete
delete_rpc_request(id: u32)ServerRemoves and returns finalized request
respond(response, max_chunk_size, on_emit)ServerEncodes and emits response
fail_all_pending_requests(error)ClientCancels all pending requests on disconnect

Sources:


graph TB
    EMIT_CALL["emit_fn(bytes: Vec<u8>)"]
subgraph "Checks"
        CHECK_CONN["Check is_connected"]
CHECK_SIZE["Check byte length"]
end
    
    subgraph "Queueing"
        QUEUE["Send to internal channel\n(mpsc, crossbeam, etc.)"]
end
    
    subgraph "Error Handling"
        LOG_ERROR["Log failure"]
DROP["Drop bytes\n(don't panic)"]
end
    
 
   EMIT_CALL --> CHECK_CONN
 
   CHECK_CONN -->|connected| CHECK_SIZE
 
   CHECK_CONN -->|disconnected| LOG_ERROR
 
   CHECK_SIZE --> QUEUE
 
   QUEUE -->|success| LOG_TRACE["Trace log success"]
QUEUE -->|failure| LOG_ERROR
 
   LOG_ERROR --> DROP
 
   LOG_TRACE --> RETURN["Return"]
DROP --> RETURN

Implementing the Emit Function

The emit function is the primary mechanism for sending encoded bytes to the underlying transport. It must handle queueing, flow control, and error conditions.

Emit Function Requirements

Sources:

graph LR
    CALLER["RPC Caller invokes emit_fn"]
subgraph "get_emit_fn()
Implementation"
        CLOSURE["Arc<dyn Fn(Vec<u8>)>"]
CHECK["is_connected check"]
CONVERT["Vec<u8> → WsMessage::Binary"]
SEND["tx.send(message)"]
end
    
    subgraph "Send Loop Task"
        RECV["app_rx.recv()"]
TRANSMIT["ws_sender.send(msg)"]
IO["WebSocket I/O"]
end
    
 
   CALLER --> CLOSURE
 
   CLOSURE --> CHECK
 
   CHECK -->|connected| CONVERT
 
   CHECK -->|disconnected| DROP["Drop & log warning"]
CONVERT --> SEND
 
   SEND --> RECV
 
   RECV --> TRANSMIT
 
   TRANSMIT --> IO

WebSocket Client Emit Implementation

The Tokio WebSocket client implementation demonstrates a proper emit function pattern:

Sources:

graph TB
    HANDLER["Handler returns response"]
RESPOND["dispatcher.respond(\nresponse,\nchunk_size,\non_emit)"]
subgraph "on_emit Closure"
        EMIT["on_emit: impl RpcEmit"]
WRAPPER["Wraps send logic"]
SEND["Send to writer task"]
end
    
    WRITE["Write to transport"]
HANDLER --> RESPOND
 
   RESPOND --> EMIT
 
   EMIT --> WRAPPER
 
   WRAPPER --> SEND
 
   SEND --> WRITE

Server Emit Closure Pattern

On the server side, the emit closure is typically provided as a parameter to read_bytes():

Sources:


stateDiagram-v2
    [*] --> Connecting : new() called
    
    Connecting --> Connected : Transport connected
    Connecting --> Error : Connection failed
    
    Connected --> Disconnecting : shutdown_async()
    Connected --> Error : I/O error
    
    Disconnecting --> Disconnected : State handler called
    Error --> Disconnected : State handler called
    
    Disconnected --> [*] : Resources cleaned up

State Management and Error Handling

Proper state management is critical for custom transports to ensure graceful handling of disconnections, errors, and resource cleanup.

Connection State Tracking

Sources:

State Change Handler Invocation

Transport implementations should invoke the state change handler at appropriate times:

EventState to ReportWhen to Invoke
Connection establishedRpcTransportState::ConnectedAfter successful connection, when handler is set
Connection lost (error)RpcTransportState::DisconnectedWhen I/O error detected in receive loop
Explicit shutdownRpcTransportState::DisconnectedWhen shutdown_async() called
Drop cleanupRpcTransportState::DisconnectedIn Drop::drop() implementation

Example from Tokio client:

Sources:

Error Propagation Strategy

Custom transports should fail pending requests when disconnection occurs:

Sources:


classDiagram
    class MockRpcClient {+Arc~TokioMutex~RpcDispatcher~~ get_dispatcher()\n+Arc~dyn Fn~ get_emit_fn()\n+bool is_connected()\n+call_rpc_streaming()\n+set_state_change_handler()}
    
    class RpcServiceCallerInterface {<<interface>>}
    
    class TestState {
        +SharedResponseSender response_sender_provider
        +Arc~AtomicBool~ is_connected_atomic
    }
    
    MockRpcClient ..|> RpcServiceCallerInterface
    MockRpcClient --> TestState : contains

Example: Mock Transport for Testing

The test suite includes a minimal mock transport implementation that demonstrates the core patterns without actual I/O.

Mock Client Structure

Sources:

Mock Implementation Highlights

The mock transport demonstrates minimal required functionality:

  1. Dispatcher : Returns new dispatcher instance (no state sharing needed for mock)
  2. Emit Function : No-op closure Arc::new(|_| {})
  3. Connection State : Uses AtomicBool for simple state tracking
  4. Streaming Calls : Creates channels and returns them directly without actual I/O
  5. State Handler : No-op implementation

Key simplifications in mock vs. production transport:

AspectProduction TransportMock Transport
I/OActual socket/stream operationsNo-op or in-memory channels
Task SpawningSpawns send/receive loop tasksNo background tasks
Error HandlingDetects I/O errors, handles disconnectsMinimal error simulation
LifecycleComplex connection managementSimple atomic bool
ThreadingRequires Send + Sync across tasksSimpler synchronization

Sources:


Testing Custom Transports

When implementing a custom transport, comprehensive testing should cover:

Test Coverage Areas

Sources:

Integration Test Pattern

Integration tests should verify end-to-end RPC communication:

Sources:

Key Test Scenarios

Test implementations should validate:

ScenarioVerification
Connection failurenew() returns error for invalid address/port
State transitionsHandler called with Connected then Disconnected
Pending request cleanupPending calls fail with cancellation error on disconnect
RPC success pathMethod invocation returns expected result
Concurrent requestsMultiple simultaneous RPCs complete correctly
Large payloadsChunking and reassembly work correctly
Handler registrationMethods can be registered before connection
Error propagationTransport errors surface as RPC errors

Sources:


Summary

Implementing a custom transport requires:

  1. Client-side : Implement RpcServiceCallerInterface with dispatcher management, emit function, and state tracking
  2. Server-side : Implement RpcServiceEndpointInterface with handler registration and request processing
  3. Dispatcher Integration : Properly call read_bytes() and respond() methods
  4. Emit Function : Implement reliable byte transmission with error handling
  5. State Management : Track connection state and invoke state change handlers
  6. Error Handling : Fail pending requests on disconnect and propagate errors
  7. Testing : Comprehensive tests covering connection, data transfer, and error scenarios

The mock transport implementation in extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:19-88 provides a minimal reference, while the Tokio WebSocket client in extensions/muxio-tokio-rpc-client/src/rpc_client.rs:25-336 demonstrates a production-ready implementation pattern.

Dismiss

Refresh this wiki

Enter email to refresh