This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
RPC Dispatcher
Loading…
RPC Dispatcher
Relevant source files
- src/rpc/rpc_dispatcher.rs
- src/rpc/rpc_internals/rpc_header.rs
- src/rpc/rpc_internals/rpc_respondable_session.rs
- src/rpc/rpc_request_response.rs
- tests/rpc_dispatcher_tests.rs
Purpose and Scope
The RpcDispatcher is the central request coordination component in muxio’s Core Transport Layer. It sits above RpcSession and below the RPC Service Layer, managing the complete lifecycle of RPC requests and responses. The dispatcher handles request correlation via unique IDs, multiplexed stream management, and response routing over the binary framed transport.
This document covers the internal architecture, request/response flow, queue management, and usage patterns. For the underlying stream multiplexing, see Stream Multiplexing. For the RpcRequest and RpcResponse data structures, see Request and Response Types.
Sources: src/rpc/rpc_dispatcher.rs:1-458
Architectural Context
Diagram: RpcDispatcher in Layered Architecture
The RpcDispatcher operates in a non-async , callback-based model compatible with WASM and multithreaded runtimes. Key responsibilities:
| Responsibility | Implementation |
|---|---|
| Request Correlation | next_rpc_request_id: u32 with monotonic increment |
| Response Routing | Per-request handlers in response_handlers: HashMap<u32, Handler> |
| Stream Management | Wraps RpcRespondableSession for lifecycle control |
| Payload Accumulation | rpc_request_queue: Arc<Mutex<VecDeque<(u32, RpcRequest)>>> |
| Error Propagation | fail_all_pending_requests() on connection drop |
Important : Each RpcDispatcher instance is bound to a single connection. Do not share across connections.
Sources: src/rpc/rpc_dispatcher.rs:20-51 src/rpc/rpc_dispatcher.rs:36-51
Core Components and Data Structures
Diagram: RpcDispatcher Internal Structure
Primary Fields
| Field | Type | Line | Purpose |
|---|---|---|---|
rpc_respondable_session | RpcRespondableSession<'a> | 37 | Delegates to RpcSession for frame encoding/decoding |
next_rpc_request_id | u32 | 42 | Monotonic counter for outbound request ID generation |
rpc_request_queue | Arc<Mutex<VecDeque<(u32, RpcRequest)>>> | 50 | Thread-safe queue tracking all active inbound requests |
RpcRespondableSession Internal State
| Field | Type | Purpose |
|---|---|---|
rpc_session | RpcSession | Manages stream IDs and frame encoding/decoding |
response_handlers | HashMap<u32, Box<dyn FnMut(RpcStreamEvent) + Send>> | Per-request response callbacks indexed by request_id |
catch_all_response_handler | Option<Box<dyn FnMut(RpcStreamEvent) + Send>> | Global fallback handler for unmatched events |
prebuffered_responses | HashMap<u32, Vec<u8>> | Accumulates payload bytes when prebuffering is enabled |
prebuffering_flags | HashMap<u32, bool> | Tracks which requests should prebuffer responses |
Sources: src/rpc/rpc_dispatcher.rs:36-51 src/rpc/rpc_internals/rpc_respondable_session.rs:21-28
Request Lifecycle
Outbound Request Flow
Diagram: Outbound Request Encoding via RpcDispatcher::call()
The call() method at src/rpc/rpc_dispatcher.rs:227-286 executes the following steps:
- ID Assignment ([line 241](https://github.com/jzombie/rust-muxio/blob/30450c98/line 241)): Captures
self.next_rpc_request_idas the uniquerpc_request_id - ID Increment ([line 242](https://github.com/jzombie/rust-muxio/blob/30450c98/line 242)): Advances
next_rpc_request_idusingincrement_u32_id() - Header Construction ([lines 252-257](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 252-257)): Creates
RpcHeaderstruct with:rpc_msg_type: RpcMessageType::Callrpc_request_id(from step 1)rpc_method_id(fromRpcRequest.rpc_method_id)rpc_metadata_bytes(converted fromRpcRequest.rpc_param_bytes)
- Handler Registration ([line 260-266](https://github.com/jzombie/rust-muxio/blob/30450c98/line 260-266)): Calls
init_respondable_request()which:- Stores
on_responsehandler inresponse_handlersHashMap - Sets
prebuffering_flags[request_id]to control response buffering
- Stores
- Payload Transmission ([lines 270-276](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 270-276)): If
rpc_prebuffered_payload_bytesexists, writes to encoder - Stream Finalization ([lines 279-283](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 279-283)): If
is_finalized, callsflush()andend_stream() - Encoder Return ([line 285](https://github.com/jzombie/rust-muxio/blob/30450c98/line 285)): Returns
RpcStreamEncoder<E>for additional streaming
Sources: src/rpc/rpc_dispatcher.rs:227-286 src/rpc/rpc_internals/rpc_respondable_session.rs:42-68
Inbound Response Flow
Diagram: Inbound Response Processing via RpcDispatcher::read_bytes()
The read_bytes() method at src/rpc/rpc_dispatcher.rs:362-374 processes incoming transport data:
- Frame Decoding ([line 364](https://github.com/jzombie/rust-muxio/blob/30450c98/line 364)): Delegates to
self.rpc_respondable_session.read_bytes(bytes) - Event Dispatch (src/rpc/rpc_internals/rpc_respondable_session.rs:93-173):
RpcSessiondecodes frames intoRpcStreamEventenum - Handler Invocation : For each event:
- Specific Handler ([line 152](https://github.com/jzombie/rust-muxio/blob/30450c98/line 152)): Calls
response_handlers[rpc_request_id]if registered - Catch-All Handler ([lines 102-208](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 102-208)): Always invoked to populate
rpc_request_queue
- Specific Handler ([line 152](https://github.com/jzombie/rust-muxio/blob/30450c98/line 152)): Calls
- Queue Mutations (via catch-all handler):
Headerevent: Creates newRpcRequestand pushes to queue ([line 140](https://github.com/jzombie/rust-muxio/blob/30450c98/line 140))PayloadChunkevent: Extendsrpc_prebuffered_payload_bytes([lines 154-157](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 154-157))Endevent: Setsis_finalized = true([line 177](https://github.com/jzombie/rust-muxio/blob/30450c98/line 177))
- Active IDs Return ([lines 367-371](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 367-371)): Locks queue and returns
Vec<u32>of allrequest_ids
Sources: src/rpc/rpc_dispatcher.rs:362-374 src/rpc/rpc_dispatcher.rs:99-209 src/rpc/rpc_internals/rpc_respondable_session.rs:93-173
Response Handling Mechanisms
Dual Handler System
Diagram: Dual Handler Dispatch System
The dispatcher uses two parallel handler mechanisms at src/rpc/rpc_internals/rpc_respondable_session.rs:93-173:
Specific Response Handlers
Storage: response_handlers: HashMap<u32, Box<dyn FnMut(RpcStreamEvent) + Send + 'a>> ([line 24](https://github.com/jzombie/rust-muxio/blob/30450c98/line 24))
| Aspect | Implementation |
|---|---|
| Registration | In init_respondable_request() at [line 61-62](https://github.com/jzombie/rust-muxio/blob/30450c98/line 61-62) when on_response is Some |
| Invocation | At [line 152-154](https://github.com/jzombie/rust-muxio/blob/30450c98/line 152-154) for each RpcStreamEvent matching rpc_request_id |
| Removal | At [line 161-162](https://github.com/jzombie/rust-muxio/blob/30450c98/line 161-162) when End or Error event received |
| Use Case | Application-level response processing with custom callbacks |
Catch-All Response Handler
Storage: catch_all_response_handler: Option<Box<dyn FnMut(RpcStreamEvent) + Send + 'a>> ([line 25](https://github.com/jzombie/rust-muxio/blob/30450c98/line 25))
| Aspect | Implementation |
|---|---|
| Registration | In set_catch_all_response_handler() at [line 86-91](https://github.com/jzombie/rust-muxio/blob/30450c98/line 86-91) during RpcDispatcher::new() |
| Invocation | At [line 165-167](https://github.com/jzombie/rust-muxio/blob/30450c98/line 165-167) for all events not handled by specific handlers |
| Primary Role | Populates rpc_request_queue for queue-based processing pattern |
Catch-All Handler Responsibilities ([lines 102-208](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 102-208)):
- Header Event ([lines 122-142](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 122-142)): Creates new
RpcRequest, pushes torpc_request_queue - PayloadChunk Event ([lines 144-169](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 144-169)): Extends
rpc_prebuffered_payload_bytesin existing request - End Event ([lines 171-185](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 171-185)): Sets
is_finalized = true - Error Event ([lines 187-206](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 187-206)): Logs error (currently no queue removal)
Sources: src/rpc/rpc_internals/rpc_respondable_session.rs:24-25 src/rpc/rpc_dispatcher.rs:99-209
Prebuffering vs. Streaming Modes
The prebuffer_response: bool parameter in call() ([line 233](https://github.com/jzombie/rust-muxio/blob/30450c98/line 233)) controls response delivery mode:
| Mode | prebuffer_response | Implementation | Use Case |
|---|---|---|---|
| Prebuffering | true | Accumulates all PayloadChunk events into single buffer, delivers as one chunk when stream ends | Complete request/response RPCs where full payload needed before processing |
| Streaming | false | Delivers each PayloadChunk event immediately as received | Progressive rendering, large file transfers, real-time data |
Prebuffering Implementation (src/rpc/rpc_internals/rpc_respondable_session.rs:112-147):
Diagram: Prebuffering Control Flow
Key Data Structures:
prebuffering_flags: HashMap<u32, bool>([line 27](https://github.com/jzombie/rust-muxio/blob/30450c98/line 27)): Tracks mode per request, set at [line 57-58](https://github.com/jzombie/rust-muxio/blob/30450c98/line 57-58)prebuffered_responses: HashMap<u32, Vec<u8>>([line 26](https://github.com/jzombie/rust-muxio/blob/30450c98/line 26)): Accumulates bytes for prebuffered requests
Prebuffering Sequence ([lines 112-147](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 112-147)):
- Set Flag ([line 57-58](https://github.com/jzombie/rust-muxio/blob/30450c98/line 57-58)):
prebuffering_flags.insert(rpc_request_id, true)ininit_respondable_request() - Accumulate Chunks ([line 127](https://github.com/jzombie/rust-muxio/blob/30450c98/line 127)):
buffer.extend_from_slice(bytes)for eachPayloadChunkevent - Flush on End ([lines 135-142](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 135-142)): Emit synthetic
PayloadChunkwith full buffer, then emitEnd - Cleanup ([line 145](https://github.com/jzombie/rust-muxio/blob/30450c98/line 145)):
prebuffered_responses.remove(&rpc_id)after delivery
Sources: src/rpc/rpc_internals/rpc_respondable_session.rs:112-147 src/rpc/rpc_internals/rpc_respondable_session.rs:26-27
Request Queue Management
Queue Structure and Threading
Type: Arc<Mutex<VecDeque<(u32, RpcRequest)>>> at src/rpc/rpc_dispatcher.rs50
Diagram: Request Queue Threading Model
The queue stores (request_id, RpcRequest) tuples for all active inbound requests. Each entry represents a request that has received at least a Header event.
RpcRequest Structure
Source: src/rpc/rpc_request_response.rs:10-33
| Field | Type | Mutability | Description |
|---|---|---|---|
rpc_method_id | u64 | Set on Header | Method identifier from header |
rpc_param_bytes | Option<Vec<u8>> | Set on Header | Metadata from RpcHeader.rpc_metadata_bytes |
rpc_prebuffered_payload_bytes | Option<Vec<u8>> | Grows on Chunks | Accumulated via extend_from_slice() at [line 157](https://github.com/jzombie/rust-muxio/blob/30450c98/line 157) |
is_finalized | bool | Set on End | true when End event received at [line 177](https://github.com/jzombie/rust-muxio/blob/30450c98/line 177) |
Sources: src/rpc/rpc_dispatcher.rs50 src/rpc/rpc_request_response.rs:10-33
Queue Operation Methods
Diagram: Queue Mutation and Access Operations
Public API Methods
get_rpc_request(header_id: u32)
Signature: -> Option<MutexGuard<'_, VecDeque<(u32, RpcRequest)>>>
Location: src/rpc/rpc_dispatcher.rs:381-394
Behavior:
- Acquires lock:
self.rpc_request_queue.lock()([line 385](https://github.com/jzombie/rust-muxio/blob/30450c98/line 385)) - Searches for
header_id:queue.iter().any(|(id, _)| *id == header_id)([line 389](https://github.com/jzombie/rust-muxio/blob/30450c98/line 389)) - Returns entire
MutexGuardif found,Noneotherwise
Rationale: Cannot return reference to queue element—lifetime would outlive MutexGuard. Caller must re-search under guard.
Example Usage:
is_rpc_request_finalized(header_id: u32)
Signature: -> Option<bool>
Location: src/rpc/rpc_dispatcher.rs:399-405
Returns:
Some(true): Request exists andis_finalized == trueSome(false): Request exists but not finalizedNone: Request not found in queue
Implementation: [line 401-404](https://github.com/jzombie/rust-muxio/blob/30450c98/line 401-404) searches queue and returns req.is_finalized
delete_rpc_request(header_id: u32)
Signature: -> Option<RpcRequest>
Location: src/rpc/rpc_dispatcher.rs:411-420
Behavior:
- Locks queue:
self.rpc_request_queue.lock()([line 412](https://github.com/jzombie/rust-muxio/blob/30450c98/line 412)) - Finds index:
queue.iter().position(|(id, _)| *id == header_id)([line 414](https://github.com/jzombie/rust-muxio/blob/30450c98/line 414)) - Removes entry:
queue.remove(index)?([line 416](https://github.com/jzombie/rust-muxio/blob/30450c98/line 416)) - Returns owned
RpcRequest, discarding request ID
Typical Usage: Call after is_rpc_request_finalized() returns true to consume the completed request.
Sources: src/rpc/rpc_dispatcher.rs:381-420
Server-Side Response Sending
respond() Method Flow
Diagram: RpcDispatcher::respond() Execution Flow
Method Signature: src/rpc/rpc_dispatcher.rs:298-337
Key Differences from call()
| Aspect | call() (Client) | respond() (Server) |
|---|---|---|
| Request ID | Generates new via next_rpc_request_id ([line 241](https://github.com/jzombie/rust-muxio/blob/30450c98/line 241)) | Uses rpc_response.rpc_request_id from original request ([line 308](https://github.com/jzombie/rust-muxio/blob/30450c98/line 308)) |
| Message Type | RpcMessageType::Call ([line 253](https://github.com/jzombie/rust-muxio/blob/30450c98/line 253)) | RpcMessageType::Response ([line 309](https://github.com/jzombie/rust-muxio/blob/30450c98/line 309)) |
| Metadata | rpc_request.rpc_param_bytes converted to rpc_metadata_bytes ([line 250](https://github.com/jzombie/rust-muxio/blob/30450c98/line 250)) | Only rpc_result_status byte if present ([lines 313-317](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 313-317)) |
| Handler Registration | Optionally registers on_response handler ([line 264](https://github.com/jzombie/rust-muxio/blob/30450c98/line 264)) | No handler registration (responses don’t receive responses) |
| Prebuffering | Supports prebuffer_response parameter | Not applicable (prebuffering is for receiving, not sending) |
Metadata Encoding
The metadata field in response headers carries only the result status ([lines 313-317](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 313-317)):
Convention: While muxio core doesn’t enforce semantics, 0 typically indicates success. See RPC Service Errors for error code conventions.
Sources: src/rpc/rpc_dispatcher.rs:298-337 src/rpc/rpc_internals/rpc_respondable_session.rs:70-82
Error Handling and Cleanup
Mutex Poisoning Strategy
The rpc_request_queue uses a Mutex for synchronization. If a thread panics while holding the lock, the mutex becomes poisoned.
Poisoning Detection at src/rpc/rpc_dispatcher.rs:104-118:
Design Rationale:
| Aspect | Justification |
|---|---|
| Panic on Poison | Poisoned mutex indicates another thread panicked during queue mutation |
| No Recovery | Inconsistent state could cause incorrect routing, data loss, silent failures |
| Fast Failure | Explicit crash provides clear debugging signal vs. undefined behavior |
| Production Safety | Better to fail loudly than corrupt application state |
Alternative Implementations (future consideration):
- Configurable panic policy via builder pattern
- Error reporting mechanism instead of panic
- Queue reconstruction from handler state
Other Lock Sites:
read_bytes()at [line 367-370](https://github.com/jzombie/rust-muxio/blob/30450c98/line 367-370): Maps poison toFrameDecodeError::CorruptFrameis_rpc_request_finalized()at [line 400](https://github.com/jzombie/rust-muxio/blob/30450c98/line 400): ReturnsNoneon lock failuredelete_rpc_request()at [line 412](https://github.com/jzombie/rust-muxio/blob/30450c98/line 412): ReturnsNoneon lock failure
Sources: src/rpc/rpc_dispatcher.rs:85-118 src/rpc/rpc_dispatcher.rs:362-374
Connection Failure Cleanup
When transport connection drops, fail_all_pending_requests() at src/rpc/rpc_dispatcher.rs:427-456 prevents indefinite hangs:
Diagram: fail_all_pending_requests() Execution Flow
Implementation Steps:
-
Ownership Transfer ([line 436](https://github.com/jzombie/rust-muxio/blob/30450c98/line 436)):
std::mem::take(&mut self.rpc_respondable_session.response_handlers)- Moves all handlers out of the HashMap
- Leaves
response_handlersempty (prevents further invocations)
-
Synthetic Error Creation ([lines 444-450](https://github.com/jzombie/rust-muxio/blob/30450c98/lines 444-450)):
-
Handler Invocation ([line 452](https://github.com/jzombie/rust-muxio/blob/30450c98/line 452)): Calls
handler(error_event)for each pending request- Wakes async Futures waiting for responses
- Triggers error handling in callback-based code
-
Automatic Cleanup : Handlers dropped after loop completes
Usage Context: Transport implementations (e.g., muxio-tokio-rpc-client, muxio-wasm-rpc-client) call this in WebSocket close handlers.
Sources: src/rpc/rpc_dispatcher.rs:427-456
Thread Safety
The dispatcher achieves thread safety through:
Shared Request Queue
Arc: Enables shared ownership across threads and callbacksMutex: Ensures exclusive access during mutationsVecDeque: Efficient push/pop operations for queue semantics
Handler Storage
Response handlers are stored as boxed trait objects:
The Send bound allows handlers to be invoked from different threads if the dispatcher is shared across threads.
Sources: src/rpc/rpc_dispatcher.rs50 src/rpc/rpc_internals/rpc_respondable_session.rs24
Usage Patterns
Client-Side Pattern
1. Create RpcDispatcher
2. For each RPC call:
a. Build RpcRequest with method_id, params, payload
b. Call dispatcher.call() with on_response handler
c. Write bytes to transport via on_emit callback
3. When receiving data from transport:
a. Call dispatcher.read_bytes()
b. Response handlers are invoked automatically
Sources: tests/rpc_dispatcher_tests.rs:32-124
Server-Side Pattern
1. Create RpcDispatcher
2. When receiving data from transport:
a. Call dispatcher.read_bytes()
b. Returns list of active request IDs
3. For each active request ID:
a. Check is_rpc_request_finalized()
b. If finalized, delete_rpc_request() to retrieve full request
c. Process request (decode params, execute method)
d. Build RpcResponse with result
e. Call dispatcher.respond() to send response
4. Write response bytes to transport via on_emit callback
Sources: tests/rpc_dispatcher_tests.rs:126-202
graph TB
Dispatcher["RpcDispatcher"]
subgraph "Client Role"
OutboundCall["call()\nInitiate request"]
InboundResponse["read_bytes()\nReceive response"]
end
subgraph "Server Role"
InboundRequest["read_bytes()\nReceive request"]
OutboundResponse["respond()\nSend response"]
end
Dispatcher --> OutboundCall
Dispatcher --> InboundResponse
Dispatcher --> InboundRequest
Dispatcher --> OutboundResponse
OutboundCall -.emits.-> Transport["Transport Layer"]
Transport -.delivers.-> InboundRequest
OutboundResponse -.emits.-> Transport
Transport -.delivers.-> InboundResponse
Bidirectional Pattern
The same dispatcher instance can handle both client and server roles simultaneously:
Diagram: Bidirectional Request/Response Flow
This pattern enables peer-to-peer architectures where both endpoints can initiate requests and respond to requests.
Sources: src/rpc/rpc_dispatcher.rs:20-51
Implementation Notes
Request ID Generation
Location: src/rpc/rpc_dispatcher.rs:241-242
Mechanism:
- Captures current
self.next_rpc_request_idfor the outgoing request - Calls
increment_u32_id()from src/utils/increment_u32_id.rs (implementation incrate::utils) - Updates
self.next_rpc_request_idwith next value
Wraparound Behavior:
- After reaching
u32::MAX(4,294,967,295), wraps to0and continues - Provides monotonic sequence within 32-bit range
Collision Analysis:
| Connection Duration | Requests/Second | Time to Wraparound | Collision Risk |
|---|---|---|---|
| Short-lived (hours) | 1,000 | 49.7 days | Negligible |
| Long-lived (days) | 10,000 | 4.97 days | Very low |
| High-throughput | 100,000 | 11.9 hours | Consider ID reuse detection |
Mitigation for Long-Running Connections:
- Track active request IDs in a
HashSetbefore assignment - Reject or delay requests if ID would collide with pending request
- Not currently implemented (acceptable for typical usage)
Initialization: Starts at first ID from increment_u32_id() in [line 64](https://github.com/jzombie/rust-muxio/blob/30450c98/line 64) during RpcDispatcher::new()
Sources: src/rpc/rpc_dispatcher.rs:241-242 src/rpc/rpc_dispatcher.rs64
Non-Async Design
The dispatcher uses callbacks instead of async/await for several reasons:
- WASM Compatibility : Avoids dependency on async runtimes that may not work in WASM
- Runtime Agnostic : Works with Tokio, async-std, or no runtime at all
- Deterministic : No hidden scheduling or context switching
- Zero-Cost : No Future state machines or executor overhead
Higher-level abstractions (like those in muxio-rpc-service-caller) can wrap the dispatcher with async interfaces when desired.
Sources: src/rpc/rpc_dispatcher.rs:26-27
Summary
The RpcDispatcher provides the core request/response coordination layer for muxio’s RPC framework:
| Responsibility | Mechanism |
|---|---|
| Request Correlation | Unique request IDs with monotonic generation |
| Response Routing | Per-request handlers + catch-all fallback |
| Stream Management | Wraps RpcRespondableSession for encoder lifecycle |
| Payload Accumulation | Optional prebuffering or streaming delivery |
| Queue Management | Thread-safe VecDeque for tracking active requests |
| Error Propagation | Synthetic error events on connection failure |
| Thread Safety | Arc<Mutex<>> for shared state |
The dispatcher’s non-async, callback-based design enables deployment across native and WASM environments while maintaining type safety and performance.
Sources: src/rpc/rpc_dispatcher.rs:1-458