Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

RPC Dispatcher

Relevant source files

Purpose and Scope

The RpcDispatcher is the central request coordination component in the muxio core library. It manages the lifecycle of RPC requests and responses, handling request correlation, stream multiplexing, and response routing over a binary framed transport.

This document covers the internal architecture, request/response flow, queue management, and usage patterns of the RpcDispatcher. For information about the underlying binary framing protocol, see Binary Framing Protocol. For details on the RpcRequest and RpcResponse data structures, see Request and Response Types.

Sources: src/rpc/rpc_dispatcher.rs:1-458


Overview

The RpcDispatcher operates in a non-async , callback-based model that is compatible with both WASM environments and multithreaded runtimes. It provides:

  • Request Correlation : Assigns unique IDs to outbound requests and matches inbound responses
  • Stream Multiplexing : Allows multiple concurrent requests over a single connection
  • Chunked Streaming : Supports payloads split across multiple frames
  • Response Buffering : Optional prebuffering of complete responses before delivery
  • Mid-stream Cancellation : Ability to abort in-flight requests
  • Thread-safe Queue : Synchronized tracking of inbound response metadata

The dispatcher wraps a RpcRespondableSession and maintains a thread-safe request queue for tracking active requests.

Important : A unique dispatcher instance should be used per client connection.

Sources: src/rpc/rpc_dispatcher.rs:20-51


Core Components

Diagram: RpcDispatcher Internal Structure

ComponentTypePurpose
rpc_respondable_sessionRpcRespondableSession<'a>Manages stream lifecycles and response handlers
next_rpc_request_idu32Monotonic ID generator for outbound requests
rpc_request_queueArc<Mutex<VecDeque<(u32, RpcRequest)>>>Thread-safe queue of active inbound responses

Sources: src/rpc/rpc_dispatcher.rs:36-51 src/rpc/rpc_internals/rpc_respondable_session.rs:21-28


Request Lifecycle

Outbound Request Flow

Diagram: Outbound Request Encoding and Transmission

The call() method follows these steps:

  1. ID Assignment : Increments next_rpc_request_id to generate a unique request identifier
  2. Header Construction : Creates an RpcHeader with RpcMessageType::Call, request ID, method ID, and metadata bytes
  3. Handler Registration : Installs the optional on_response callback in the session's response handler map
  4. Stream Initialization : Calls init_respondable_request() to obtain an RpcStreamEncoder
  5. Payload Transmission : Writes prebuffered payload bytes if provided
  6. Finalization : Optionally ends the stream if is_finalized is true
  7. Encoder Return : Returns the encoder to the caller for additional streaming if needed

Sources: src/rpc/rpc_dispatcher.rs:226-286


Inbound Response Flow

Diagram: Inbound Response Processing

The read_bytes() method processes incoming data in three stages:

  1. Frame Decoding : Passes bytes to RpcRespondableSession.read_bytes() for frame reassembly
  2. Event Handling : Decoded frames trigger RpcStreamEvent callbacks
  3. Queue Population : The catch-all handler maintains the request queue
  4. Active IDs Return : Returns a list of all request IDs currently in the queue

Sources: src/rpc/rpc_dispatcher.rs:362-374 src/rpc/rpc_internals/rpc_respondable_session.rs:93-173


Response Handling

Handler Registration and Dispatch

The dispatcher supports two types of response handlers:

Specific Response Handlers

Per-request handlers registered via the on_response parameter in call(). These are stored in the RpcRespondableSession.response_handlers map, keyed by request ID.

response_handlers: HashMap<u32, Box<dyn FnMut(RpcStreamEvent) + Send + 'a>>

Handler Lifecycle:

  • Registered when call() is invoked with a non-None on_response
  • Invoked for each RpcStreamEvent (Header, PayloadChunk, End)
  • Automatically removed when stream ends or errors

Sources: src/rpc/rpc_internals/rpc_respondable_session.rs24

Catch-All Response Handler

A global fallback handler that receives all response events, regardless of whether a specific handler is registered. Installed via init_catch_all_response_handler() during dispatcher construction.

Primary Responsibilities:

  • Populate the rpc_request_queue with incoming response metadata
  • Accumulate payload bytes across multiple chunks
  • Mark requests as finalized when stream ends

Sources: src/rpc/rpc_dispatcher.rs:99-209


Prebuffering vs. Streaming

The dispatcher supports two response delivery modes, controlled by the prebuffer_response parameter in call():

ModeBehaviorUse Case
Prebuffering (true)Accumulates all payload chunks into a single buffer, then delivers once via PayloadChunk event when stream endsComplete request/response RPCs where the full payload is needed before processing
Streaming (false)Delivers each payload chunk immediately as it arrivesProgressive rendering, large file transfers, streaming data

Prebuffering Implementation:

  • Tracks prebuffering flags per request: prebuffering_flags: HashMap<u32, bool>
  • Accumulates bytes: prebuffered_responses: HashMap<u32, Vec<u8>>
  • On RpcStreamEvent::End, emits accumulated buffer as a single PayloadChunk, then emits End

Sources: src/rpc/rpc_internals/rpc_respondable_session.rs:112-147


Request Queue Management

Queue Structure

Arc<Mutex<VecDeque<(u32, RpcRequest)>>>

The queue stores tuples of (request_id, RpcRequest) for all active inbound responses. Each entry represents a response that has received at least a Header event but may not yet be finalized.

Key Fields in QueuedRpcRequest:

FieldTypeDescription
rpc_method_idu64Method identifier from the response header
rpc_param_bytesOption<Vec<u8>>Metadata bytes (converted from header)
rpc_prebuffered_payload_bytesOption<Vec<u8>>Accumulated payload chunks
is_finalizedboolWhether End event has been received

Sources: src/rpc/rpc_dispatcher.rs50 src/rpc/rpc_request_response.rs:9-33


Queue Operations

Diagram: Request Queue Mutation Operations

Core Methods

get_rpc_request(header_id: u32) -> Option<MutexGuard<VecDeque<(u32, RpcRequest)>>>

Returns a lock on the entire queue if the specified request exists. The caller must search the queue again within the guard.

Rationale : Cannot return a reference to a queue element directly due to Rust's borrow checker—the reference would outlive the MutexGuard.

Sources: src/rpc/rpc_dispatcher.rs:381-394


is_rpc_request_finalized(header_id: u32) -> Option<bool>

Checks if a request has received its End event. Returns None if the request is not in the queue.

Sources: src/rpc/rpc_dispatcher.rs:399-405


delete_rpc_request(header_id: u32) -> Option<RpcRequest>

Removes a request from the queue and transfers ownership to the caller. Typically used after confirming is_finalized is true.

Sources: src/rpc/rpc_dispatcher.rs:411-420


sequenceDiagram
    participant App as "Server Application"
    participant Dispatcher as "RpcDispatcher"
    participant Session as "RpcRespondableSession"
    participant Encoder as "RpcStreamEncoder"
    participant Emit as "on_emit Callback"
    
    App->>Dispatcher: respond(rpc_response, max_chunk_size, on_emit)
    
    Dispatcher->>Dispatcher: Build RpcHeader
    Note over Dispatcher: RpcMessageType::Response\nrequest_id, method_id\nmetadata = [result_status]
    
    Dispatcher->>Session: start_reply_stream(header, ...)
    Session-->>Dispatcher: RpcStreamEncoder
    
    Dispatcher->>Encoder: write_bytes(payload)
    Encoder->>Emit: Emit binary frames
    
    alt "is_finalized == true"
        Dispatcher->>Encoder: flush()
        Dispatcher->>Encoder: end_stream()
    end
    
    Dispatcher-->>App: Return encoder

Outbound Response Flow

When acting as a server, the dispatcher sends responses using the respond() method:

Diagram: Server-Side Response Encoding

Key Differences fromcall():

  1. No request ID generation—uses rpc_response.rpc_request_id from the original request
  2. RpcMessageType::Response instead of Call
  3. Metadata contains only the rpc_result_status byte (if present)
  4. No response handler registration (responses don't receive responses)

Sources: src/rpc/rpc_dispatcher.rs:298-337


Error Handling and Cleanup

Mutex Poisoning

The rpc_request_queue is protected by a Mutex. If a thread panics while holding the lock, the mutex becomes poisoned.

Dispatcher Behavior on Poisoned Lock:

The catch-all handler deliberately panics when encountering a poisoned mutex:

Rationale:

  • A poisoned queue indicates inconsistent internal state
  • Continuing could cause incorrect routing, data loss, or undefined behavior
  • Fast failure provides better safety and debugging signals

Alternative Approaches:

  • Graceful recovery could be implemented with a configurable panic policy
  • Error reporting mechanism could replace panics

Sources: src/rpc/rpc_dispatcher.rs:85-118


graph TB
    ConnectionDrop["Connection Drop Detected"]
FailAll["fail_all_pending_requests(error)"]
TakeHandlers["Take ownership of\nresponse_handlers map"]
IterateHandlers["For each (request_id, handler)"]
CreateError["Create RpcStreamEvent::Error"]
InvokeHandler["Invoke handler(error_event)"]
ClearMap["response_handlers now empty"]
ConnectionDrop --> FailAll
 
   FailAll --> TakeHandlers
 
   TakeHandlers --> IterateHandlers
 
   IterateHandlers --> CreateError
 
   CreateError --> InvokeHandler
 
   InvokeHandler --> ClearMap

Connection Failure Cleanup

When a transport connection drops, pending requests must be notified to prevent indefinite hangs. The fail_all_pending_requests() method handles this:

Diagram: Pending Request Cleanup Flow

Implementation Details:

  1. Ownership Transfer : Uses std::mem::take() to move handlers out of the map, leaving it empty
  2. Synthetic Error Event : Creates RpcStreamEvent::Error with the provided FrameDecodeError
  3. Handler Invocation : Calls each handler with the error event
  4. Automatic Cleanup : Handlers are automatically dropped after invocation

Usage Context : Transport implementations call this method in their disconnection handlers (e.g., WebSocket close events).

Sources: src/rpc/rpc_dispatcher.rs:427-456


Thread Safety

The dispatcher achieves thread safety through:

Shared Request Queue

  • Arc : Enables shared ownership across threads and callbacks
  • Mutex : Ensures exclusive access during mutations
  • VecDeque : Efficient push/pop operations for queue semantics

Handler Storage

Response handlers are stored as boxed trait objects:

The Send bound allows handlers to be invoked from different threads if the dispatcher is shared across threads.

Sources: src/rpc/rpc_dispatcher.rs50 src/rpc/rpc_internals/rpc_respondable_session.rs24


Usage Patterns

Client-Side Pattern

1. Create RpcDispatcher
2. For each RPC call:
   a. Build RpcRequest with method_id, params, payload
   b. Call dispatcher.call() with on_response handler
   c. Write bytes to transport via on_emit callback
3. When receiving data from transport:
   a. Call dispatcher.read_bytes()
   b. Response handlers are invoked automatically

Sources: tests/rpc_dispatcher_tests.rs:32-124


Server-Side Pattern

1. Create RpcDispatcher
2. When receiving data from transport:
   a. Call dispatcher.read_bytes()
   b. Returns list of active request IDs
3. For each active request ID:
   a. Check is_rpc_request_finalized()
   b. If finalized, delete_rpc_request() to retrieve full request
   c. Process request (decode params, execute method)
   d. Build RpcResponse with result
   e. Call dispatcher.respond() to send response
4. Write response bytes to transport via on_emit callback

Sources: tests/rpc_dispatcher_tests.rs:126-202


graph TB
    Dispatcher["RpcDispatcher"]
subgraph "Client Role"
        OutboundCall["call()\nInitiate request"]
InboundResponse["read_bytes()\nReceive response"]
end
    
    subgraph "Server Role"
        InboundRequest["read_bytes()\nReceive request"]
OutboundResponse["respond()\nSend response"]
end
    
 
   Dispatcher --> OutboundCall
 
   Dispatcher --> InboundResponse
 
   Dispatcher --> InboundRequest
 
   Dispatcher --> OutboundResponse
    
    OutboundCall -.emits.-> Transport["Transport Layer"]
Transport -.delivers.-> InboundRequest
    OutboundResponse -.emits.-> Transport
    Transport -.delivers.-> InboundResponse

Bidirectional Pattern

The same dispatcher instance can handle both client and server roles simultaneously:

Diagram: Bidirectional Request/Response Flow

This pattern enables peer-to-peer architectures where both endpoints can initiate requests and respond to requests.

Sources: src/rpc/rpc_dispatcher.rs:20-51


Implementation Notes

ID Generation

Request IDs are generated using increment_u32_id(), which provides monotonic incrementing IDs with wraparound:

Wraparound Behavior : After reaching u32::MAX, wraps to 0 and continues incrementing.

Collision Risk : With 4.29 billion possible IDs, collisions are extremely unlikely in typical usage. For long-running connections with billions of requests, consider implementing ID reuse detection.

Sources: src/rpc/rpc_dispatcher.rs242


Non-Async Design

The dispatcher uses callbacks instead of async/await for several reasons:

  1. WASM Compatibility : Avoids dependency on async runtimes that may not work in WASM
  2. Runtime Agnostic : Works with Tokio, async-std, or no runtime at all
  3. Deterministic : No hidden scheduling or context switching
  4. Zero-Cost : No Future state machines or executor overhead

Higher-level abstractions (like those in muxio-rpc-service-caller) can wrap the dispatcher with async interfaces when desired.

Sources: src/rpc/rpc_dispatcher.rs:26-27


Summary

The RpcDispatcher provides the core request/response coordination layer for muxio's RPC framework:

ResponsibilityMechanism
Request CorrelationUnique request IDs with monotonic generation
Response RoutingPer-request handlers + catch-all fallback
Stream ManagementWraps RpcRespondableSession for encoder lifecycle
Payload AccumulationOptional prebuffering or streaming delivery
Queue ManagementThread-safe VecDeque for tracking active requests
Error PropagationSynthetic error events on connection failure
Thread SafetyArc<Mutex<>> for shared state

The dispatcher's non-async, callback-based design enables deployment across native and WASM environments while maintaining type safety and performance.

Sources: src/rpc/rpc_dispatcher.rs:1-458

Dismiss

Refresh this wiki

Enter email to refresh