This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
RPC Dispatcher
Relevant source files
- src/rpc/rpc_dispatcher.rs
- src/rpc/rpc_internals/rpc_header.rs
- src/rpc/rpc_internals/rpc_respondable_session.rs
- src/rpc/rpc_request_response.rs
- tests/rpc_dispatcher_tests.rs
Purpose and Scope
The RpcDispatcher is the central request coordination component in the muxio core library. It manages the lifecycle of RPC requests and responses, handling request correlation, stream multiplexing, and response routing over a binary framed transport.
This document covers the internal architecture, request/response flow, queue management, and usage patterns of the RpcDispatcher. For information about the underlying binary framing protocol, see Binary Framing Protocol. For details on the RpcRequest and RpcResponse data structures, see Request and Response Types.
Sources: src/rpc/rpc_dispatcher.rs:1-458
Overview
The RpcDispatcher operates in a non-async , callback-based model that is compatible with both WASM environments and multithreaded runtimes. It provides:
- Request Correlation : Assigns unique IDs to outbound requests and matches inbound responses
- Stream Multiplexing : Allows multiple concurrent requests over a single connection
- Chunked Streaming : Supports payloads split across multiple frames
- Response Buffering : Optional prebuffering of complete responses before delivery
- Mid-stream Cancellation : Ability to abort in-flight requests
- Thread-safe Queue : Synchronized tracking of inbound response metadata
The dispatcher wraps a RpcRespondableSession and maintains a thread-safe request queue for tracking active requests.
Important : A unique dispatcher instance should be used per client connection.
Sources: src/rpc/rpc_dispatcher.rs:20-51
Core Components
Diagram: RpcDispatcher Internal Structure
| Component | Type | Purpose |
|---|---|---|
rpc_respondable_session | RpcRespondableSession<'a> | Manages stream lifecycles and response handlers |
next_rpc_request_id | u32 | Monotonic ID generator for outbound requests |
rpc_request_queue | Arc<Mutex<VecDeque<(u32, RpcRequest)>>> | Thread-safe queue of active inbound responses |
Sources: src/rpc/rpc_dispatcher.rs:36-51 src/rpc/rpc_internals/rpc_respondable_session.rs:21-28
Request Lifecycle
Outbound Request Flow
Diagram: Outbound Request Encoding and Transmission
The call() method follows these steps:
- ID Assignment : Increments
next_rpc_request_idto generate a unique request identifier - Header Construction : Creates an
RpcHeaderwithRpcMessageType::Call, request ID, method ID, and metadata bytes - Handler Registration : Installs the optional
on_responsecallback in the session's response handler map - Stream Initialization : Calls
init_respondable_request()to obtain anRpcStreamEncoder - Payload Transmission : Writes prebuffered payload bytes if provided
- Finalization : Optionally ends the stream if
is_finalizedis true - Encoder Return : Returns the encoder to the caller for additional streaming if needed
Sources: src/rpc/rpc_dispatcher.rs:226-286
Inbound Response Flow
Diagram: Inbound Response Processing
The read_bytes() method processes incoming data in three stages:
- Frame Decoding : Passes bytes to
RpcRespondableSession.read_bytes()for frame reassembly - Event Handling : Decoded frames trigger
RpcStreamEventcallbacks - Queue Population : The catch-all handler maintains the request queue
- Active IDs Return : Returns a list of all request IDs currently in the queue
Sources: src/rpc/rpc_dispatcher.rs:362-374 src/rpc/rpc_internals/rpc_respondable_session.rs:93-173
Response Handling
Handler Registration and Dispatch
The dispatcher supports two types of response handlers:
Specific Response Handlers
Per-request handlers registered via the on_response parameter in call(). These are stored in the RpcRespondableSession.response_handlers map, keyed by request ID.
response_handlers: HashMap<u32, Box<dyn FnMut(RpcStreamEvent) + Send + 'a>>
Handler Lifecycle:
- Registered when
call()is invoked with a non-Noneon_response - Invoked for each
RpcStreamEvent(Header, PayloadChunk, End) - Automatically removed when stream ends or errors
Sources: src/rpc/rpc_internals/rpc_respondable_session.rs24
Catch-All Response Handler
A global fallback handler that receives all response events, regardless of whether a specific handler is registered. Installed via init_catch_all_response_handler() during dispatcher construction.
Primary Responsibilities:
- Populate the
rpc_request_queuewith incoming response metadata - Accumulate payload bytes across multiple chunks
- Mark requests as finalized when stream ends
Sources: src/rpc/rpc_dispatcher.rs:99-209
Prebuffering vs. Streaming
The dispatcher supports two response delivery modes, controlled by the prebuffer_response parameter in call():
| Mode | Behavior | Use Case |
|---|---|---|
Prebuffering (true) | Accumulates all payload chunks into a single buffer, then delivers once via PayloadChunk event when stream ends | Complete request/response RPCs where the full payload is needed before processing |
Streaming (false) | Delivers each payload chunk immediately as it arrives | Progressive rendering, large file transfers, streaming data |
Prebuffering Implementation:
- Tracks prebuffering flags per request:
prebuffering_flags: HashMap<u32, bool> - Accumulates bytes:
prebuffered_responses: HashMap<u32, Vec<u8>> - On
RpcStreamEvent::End, emits accumulated buffer as a singlePayloadChunk, then emitsEnd
Sources: src/rpc/rpc_internals/rpc_respondable_session.rs:112-147
Request Queue Management
Queue Structure
Arc<Mutex<VecDeque<(u32, RpcRequest)>>>
The queue stores tuples of (request_id, RpcRequest) for all active inbound responses. Each entry represents a response that has received at least a Header event but may not yet be finalized.
Key Fields in QueuedRpcRequest:
| Field | Type | Description |
|---|---|---|
rpc_method_id | u64 | Method identifier from the response header |
rpc_param_bytes | Option<Vec<u8>> | Metadata bytes (converted from header) |
rpc_prebuffered_payload_bytes | Option<Vec<u8>> | Accumulated payload chunks |
is_finalized | bool | Whether End event has been received |
Sources: src/rpc/rpc_dispatcher.rs50 src/rpc/rpc_request_response.rs:9-33
Queue Operations
Diagram: Request Queue Mutation Operations
Core Methods
get_rpc_request(header_id: u32) -> Option<MutexGuard<VecDeque<(u32, RpcRequest)>>>
Returns a lock on the entire queue if the specified request exists. The caller must search the queue again within the guard.
Rationale : Cannot return a reference to a queue element directly due to Rust's borrow checker—the reference would outlive the MutexGuard.
Sources: src/rpc/rpc_dispatcher.rs:381-394
is_rpc_request_finalized(header_id: u32) -> Option<bool>
Checks if a request has received its End event. Returns None if the request is not in the queue.
Sources: src/rpc/rpc_dispatcher.rs:399-405
delete_rpc_request(header_id: u32) -> Option<RpcRequest>
Removes a request from the queue and transfers ownership to the caller. Typically used after confirming is_finalized is true.
Sources: src/rpc/rpc_dispatcher.rs:411-420
sequenceDiagram
participant App as "Server Application"
participant Dispatcher as "RpcDispatcher"
participant Session as "RpcRespondableSession"
participant Encoder as "RpcStreamEncoder"
participant Emit as "on_emit Callback"
App->>Dispatcher: respond(rpc_response, max_chunk_size, on_emit)
Dispatcher->>Dispatcher: Build RpcHeader
Note over Dispatcher: RpcMessageType::Response\nrequest_id, method_id\nmetadata = [result_status]
Dispatcher->>Session: start_reply_stream(header, ...)
Session-->>Dispatcher: RpcStreamEncoder
Dispatcher->>Encoder: write_bytes(payload)
Encoder->>Emit: Emit binary frames
alt "is_finalized == true"
Dispatcher->>Encoder: flush()
Dispatcher->>Encoder: end_stream()
end
Dispatcher-->>App: Return encoder
Outbound Response Flow
When acting as a server, the dispatcher sends responses using the respond() method:
Diagram: Server-Side Response Encoding
Key Differences fromcall():
- No request ID generation—uses
rpc_response.rpc_request_idfrom the original request RpcMessageType::Responseinstead ofCall- Metadata contains only the
rpc_result_statusbyte (if present) - No response handler registration (responses don't receive responses)
Sources: src/rpc/rpc_dispatcher.rs:298-337
Error Handling and Cleanup
Mutex Poisoning
The rpc_request_queue is protected by a Mutex. If a thread panics while holding the lock, the mutex becomes poisoned.
Dispatcher Behavior on Poisoned Lock:
The catch-all handler deliberately panics when encountering a poisoned mutex:
Rationale:
- A poisoned queue indicates inconsistent internal state
- Continuing could cause incorrect routing, data loss, or undefined behavior
- Fast failure provides better safety and debugging signals
Alternative Approaches:
- Graceful recovery could be implemented with a configurable panic policy
- Error reporting mechanism could replace panics
Sources: src/rpc/rpc_dispatcher.rs:85-118
graph TB
ConnectionDrop["Connection Drop Detected"]
FailAll["fail_all_pending_requests(error)"]
TakeHandlers["Take ownership of\nresponse_handlers map"]
IterateHandlers["For each (request_id, handler)"]
CreateError["Create RpcStreamEvent::Error"]
InvokeHandler["Invoke handler(error_event)"]
ClearMap["response_handlers now empty"]
ConnectionDrop --> FailAll
FailAll --> TakeHandlers
TakeHandlers --> IterateHandlers
IterateHandlers --> CreateError
CreateError --> InvokeHandler
InvokeHandler --> ClearMap
Connection Failure Cleanup
When a transport connection drops, pending requests must be notified to prevent indefinite hangs. The fail_all_pending_requests() method handles this:
Diagram: Pending Request Cleanup Flow
Implementation Details:
- Ownership Transfer : Uses
std::mem::take()to move handlers out of the map, leaving it empty - Synthetic Error Event : Creates
RpcStreamEvent::Errorwith the providedFrameDecodeError - Handler Invocation : Calls each handler with the error event
- Automatic Cleanup : Handlers are automatically dropped after invocation
Usage Context : Transport implementations call this method in their disconnection handlers (e.g., WebSocket close events).
Sources: src/rpc/rpc_dispatcher.rs:427-456
Thread Safety
The dispatcher achieves thread safety through:
Shared Request Queue
Arc: Enables shared ownership across threads and callbacksMutex: Ensures exclusive access during mutationsVecDeque: Efficient push/pop operations for queue semantics
Handler Storage
Response handlers are stored as boxed trait objects:
The Send bound allows handlers to be invoked from different threads if the dispatcher is shared across threads.
Sources: src/rpc/rpc_dispatcher.rs50 src/rpc/rpc_internals/rpc_respondable_session.rs24
Usage Patterns
Client-Side Pattern
1. Create RpcDispatcher
2. For each RPC call:
a. Build RpcRequest with method_id, params, payload
b. Call dispatcher.call() with on_response handler
c. Write bytes to transport via on_emit callback
3. When receiving data from transport:
a. Call dispatcher.read_bytes()
b. Response handlers are invoked automatically
Sources: tests/rpc_dispatcher_tests.rs:32-124
Server-Side Pattern
1. Create RpcDispatcher
2. When receiving data from transport:
a. Call dispatcher.read_bytes()
b. Returns list of active request IDs
3. For each active request ID:
a. Check is_rpc_request_finalized()
b. If finalized, delete_rpc_request() to retrieve full request
c. Process request (decode params, execute method)
d. Build RpcResponse with result
e. Call dispatcher.respond() to send response
4. Write response bytes to transport via on_emit callback
Sources: tests/rpc_dispatcher_tests.rs:126-202
graph TB
Dispatcher["RpcDispatcher"]
subgraph "Client Role"
OutboundCall["call()\nInitiate request"]
InboundResponse["read_bytes()\nReceive response"]
end
subgraph "Server Role"
InboundRequest["read_bytes()\nReceive request"]
OutboundResponse["respond()\nSend response"]
end
Dispatcher --> OutboundCall
Dispatcher --> InboundResponse
Dispatcher --> InboundRequest
Dispatcher --> OutboundResponse
OutboundCall -.emits.-> Transport["Transport Layer"]
Transport -.delivers.-> InboundRequest
OutboundResponse -.emits.-> Transport
Transport -.delivers.-> InboundResponse
Bidirectional Pattern
The same dispatcher instance can handle both client and server roles simultaneously:
Diagram: Bidirectional Request/Response Flow
This pattern enables peer-to-peer architectures where both endpoints can initiate requests and respond to requests.
Sources: src/rpc/rpc_dispatcher.rs:20-51
Implementation Notes
ID Generation
Request IDs are generated using increment_u32_id(), which provides monotonic incrementing IDs with wraparound:
Wraparound Behavior : After reaching u32::MAX, wraps to 0 and continues incrementing.
Collision Risk : With 4.29 billion possible IDs, collisions are extremely unlikely in typical usage. For long-running connections with billions of requests, consider implementing ID reuse detection.
Sources: src/rpc/rpc_dispatcher.rs242
Non-Async Design
The dispatcher uses callbacks instead of async/await for several reasons:
- WASM Compatibility : Avoids dependency on async runtimes that may not work in WASM
- Runtime Agnostic : Works with Tokio, async-std, or no runtime at all
- Deterministic : No hidden scheduling or context switching
- Zero-Cost : No Future state machines or executor overhead
Higher-level abstractions (like those in muxio-rpc-service-caller) can wrap the dispatcher with async interfaces when desired.
Sources: src/rpc/rpc_dispatcher.rs:26-27
Summary
The RpcDispatcher provides the core request/response coordination layer for muxio's RPC framework:
| Responsibility | Mechanism |
|---|---|
| Request Correlation | Unique request IDs with monotonic generation |
| Response Routing | Per-request handlers + catch-all fallback |
| Stream Management | Wraps RpcRespondableSession for encoder lifecycle |
| Payload Accumulation | Optional prebuffering or streaming delivery |
| Queue Management | Thread-safe VecDeque for tracking active requests |
| Error Propagation | Synthetic error events on connection failure |
| Thread Safety | Arc<Mutex<>> for shared state |
The dispatcher's non-async, callback-based design enables deployment across native and WASM environments while maintaining type safety and performance.
Sources: src/rpc/rpc_dispatcher.rs:1-458
Dismiss
Refresh this wiki
Enter email to refresh