This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Core Library (muxio)
Relevant source files
- README.md
- src/rpc/rpc_dispatcher.rs
- src/rpc/rpc_internals/rpc_respondable_session.rs
- src/rpc/rpc_internals/rpc_session.rs
- src/rpc/rpc_internals/rpc_stream_decoder.rs
Purpose and Scope
The muxio core library provides the foundational stream multiplexing engine that enables multiple independent data streams to coexist over a single connection. This document covers the core library's architecture, components, and design principles. The core library itself is transport-agnostic and runtime-agnostic, operating through a callback-driven model without requiring any specific async runtime.
For detailed information about specific subsystems:
- Binary framing protocol implementation: see Binary Framing Protocol
- Request correlation and stream management: see RPC Dispatcher
- Data structures for requests and responses: see Request and Response Types
For information about RPC abstractions built on top of the core: see RPC Framework
For concrete transport implementations using the core: see Transport Implementations
Sources: README.md:1-61 src/rpc/rpc_dispatcher.rs:1-457
Layered Architecture
The core library follows a three-layer design where each layer has distinct responsibilities and interfaces with adjacent layers through well-defined boundaries.
graph TB
subgraph "Layer 3: Application Interface"
RpcDispatcher["RpcDispatcher\nRequest correlation\nResponse tracking"]
end
subgraph "Layer 2: Session Management"
RpcRespondableSession["RpcRespondableSession\nResponse handler registration\nPer-request callbacks"]
RpcSession["RpcSession\nStream lifecycle management\nStream ID allocation"]
end
subgraph "Layer 1: Binary Protocol"
RpcStreamEncoder["RpcStreamEncoder\nEncodes outbound streams"]
RpcStreamDecoder["RpcStreamDecoder\nDecodes inbound streams"]
FrameMuxStreamDecoder["FrameMuxStreamDecoder\nFrame reassembly"]
end
subgraph "Transport Layer (Not in Core)"
Transport["WebSocket / TCP / Custom\nByte transmission"]
end
RpcDispatcher -->|uses| RpcRespondableSession
RpcRespondableSession -->|wraps| RpcSession
RpcSession -->|creates| RpcStreamEncoder
RpcSession -->|manages| RpcStreamDecoder
RpcSession -->|feeds bytes to| FrameMuxStreamDecoder
RpcStreamEncoder -->|emits bytes via callback| Transport
Transport -->|passes received bytes| FrameMuxStreamDecoder
Architecture Layers
Sources: src/rpc/rpc_dispatcher.rs:20-51 src/rpc/rpc_internals/rpc_session.rs:15-24 src/rpc/rpc_internals/rpc_respondable_session.rs:14-28
| Layer | Component | Responsibility | File Location |
|---|---|---|---|
| 3 | RpcDispatcher | Request/response correlation, queue management, ID generation | src/rpc/rpc_dispatcher.rs |
| 2 | RpcRespondableSession | Response handler registration, prebuffering | src/rpc/rpc_internals/rpc_respondable_session.rs |
| 2 | RpcSession | Stream ID allocation, frame routing | src/rpc/rpc_internals/rpc_session.rs |
| 1 | RpcStreamEncoder | Outbound stream encoding | src/rpc/rpc_internals/rpc_stream_encoder.rs |
| 1 | RpcStreamDecoder | Inbound stream decoding | src/rpc/rpc_internals/rpc_stream_decoder.rs |
| 1 | FrameMuxStreamDecoder | Frame multiplexing/demultiplexing | src/frame/ |
Sources: src/rpc/rpc_dispatcher.rs:36-50 src/rpc/rpc_internals/rpc_session.rs:20-24 README.md:28-34
Core Components
RpcSession
RpcSession is the low-level stream multiplexing engine. It manages stream ID allocation, maintains per-stream decoder state, and routes incoming frames to the appropriate decoder.
Key responsibilities:
- Allocates monotonically increasing stream IDs using
next_stream_id - Maintains a
HashMap<u32, RpcStreamDecoder>mapping stream IDs to decoders - Processes incoming bytes through
FrameMuxStreamDecoder - Cleans up completed or cancelled streams
Public API:
init_request()- Creates a newRpcStreamEncoderfor an outbound streamread_bytes()- Processes incoming bytes and invokes event callbacks
Sources: src/rpc/rpc_internals/rpc_session.rs:15-117
RpcRespondableSession
RpcRespondableSession wraps RpcSession and adds response handler tracking. It allows callers to register per-request callbacks that are invoked when response events arrive.
Key responsibilities:
- Maintains
response_handlers: HashMap<u32, Box<dyn FnMut(RpcStreamEvent)>>for per-request callbacks - Provides optional
catch_all_response_handlerfor unmatched events - Implements prebuffering logic to accumulate payload chunks into single events
- Manages
prebuffering_flags: HashMap<u32, bool>to control buffering per request
Public API:
init_respondable_request()- Starts a request with optional response callback and prebufferingstart_reply_stream()- Initiates a response streamset_catch_all_response_handler()- Registers global fallback handlerread_bytes()- Processes bytes and routes events to registered handlers
Sources: src/rpc/rpc_internals/rpc_respondable_session.rs:14-178
RpcDispatcher
RpcDispatcher is the highest-level component, providing request correlation and queue management. It generates unique request IDs, tracks active requests in a shared queue, and handles request/response lifecycle.
Key responsibilities:
- Generates unique
rpc_request_idvalues vianext_rpc_request_id: u32 - Maintains
rpc_request_queue: Arc<Mutex<VecDeque<(u32, RpcRequest)>>> - Installs a catch-all response handler to populate the request queue
- Provides methods to query, finalize, and delete requests from the queue
Public API:
call()- Initiates an RPC call with anRpcRequest, returnsRpcStreamEncoderrespond()- Sends an RPC response with anRpcResponseread_bytes()- Processes incoming bytes, returns list of active request IDsget_rpc_request()- Retrieves a request from the queue by IDis_rpc_request_finalized()- Checks if a request has received all payload chunksdelete_rpc_request()- Removes and returns a request from the queuefail_all_pending_requests()- Cancels all pending requests with an error
Sources: src/rpc/rpc_dispatcher.rs:36-457
Component Relationships
Initialization and Layering
Sources: src/rpc/rpc_dispatcher.rs:59-71 src/rpc/rpc_internals/rpc_respondable_session.rs:30-39 src/rpc/rpc_internals/rpc_session.rs:26-33
Stream Creation Flow
Sources: src/rpc/rpc_dispatcher.rs:226-286 src/rpc/rpc_internals/rpc_respondable_session.rs:42-68 src/rpc/rpc_internals/rpc_session.rs:35-50
Key Design Principles
Non-Async Callback-Driven Model
The core library does not use async/await or any specific async runtime. Instead, it operates through callbacks:
- Outbound data: Components accept
on_emitcallbacks implementing theRpcEmittrait - Inbound events: Components invoke event handlers implementing
RpcResponseHandler - Thread safety: Shared state uses
Arc<Mutex<>>for safe concurrent access
This design enables the core library to work in WASM environments, multithreaded native applications, and any async runtime without modification.
Sources: src/rpc/rpc_dispatcher.rs:26-30 README.md:34-35
Transport and Runtime Agnostic
The core library has zero dependencies on specific transports or async runtimes:
- Bytes are passed in/out via callbacks, not I/O operations
- No assumptions about WebSocket, TCP, or any protocol
- No tokio, async-std, or runtime dependencies in the core
Extension crates (e.g., muxio-tokio-rpc-client, muxio-wasm-rpc-client) provide transport bindings.
Sources: README.md:34-40 src/rpc/rpc_dispatcher.rs:20-35
Request Correlation via IDs
The core library uses two ID systems for multiplexing:
| ID Type | Scope | Generated By | Purpose |
|---|---|---|---|
stream_id | Transport frame | RpcSession.next_stream_id | Distinguishes interleaved frame streams |
rpc_request_id | RPC protocol | RpcDispatcher.next_rpc_request_id | Correlates requests with responses |
Both use monotonically increasing u32 values via increment_u32_id(). The stream_id operates at the framing layer, while rpc_request_id operates at the RPC layer and is encoded in the RpcHeader.
Sources: src/rpc/rpc_dispatcher.rs:41-42 src/rpc/rpc_internals/rpc_session.rs21 src/utils/increment_u32_id.rs
Mutex Poisoning Policy
The RpcDispatcher maintains a shared rpc_request_queue protected by Mutex. If the mutex becomes poisoned (a thread panicked while holding the lock), the dispatcher panics immediately rather than attempting recovery:
This is a deliberate design choice prioritizing correctness over availability. A poisoned queue likely indicates corrupted state, and continuing execution could lead to silent data loss or incorrect routing.
Sources: src/rpc/rpc_dispatcher.rs:86-118
Stream Lifecycle Management
Outbound Request Lifecycle
Sources: src/rpc/rpc_dispatcher.rs:226-286
Inbound Response Lifecycle
Sources: src/rpc/rpc_dispatcher.rs:98-209 src/rpc/rpc_internals/rpc_stream_decoder.rs:53-185
Decoder State Machine
The RpcStreamDecoder maintains internal state as frames arrive:
Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:11-186
Request Queue Management
The RpcDispatcher provides a shared request queue accessible to application code for inspecting and managing active requests.
Queue Operations
| Method | Return Type | Purpose |
|---|---|---|
read_bytes(&mut self, bytes: &[u8]) | Result<Vec<u32>, FrameDecodeError> | Processes incoming bytes, returns active request IDs |
get_rpc_request(&self, header_id: u32) | Option<MutexGuard<VecDeque<(u32, RpcRequest)>>> | Locks queue if header_id exists |
is_rpc_request_finalized(&self, header_id: u32) | Option<bool> | Checks if request received End frame |
delete_rpc_request(&self, header_id: u32) | Option<RpcRequest> | Removes and returns request from queue |
fail_all_pending_requests(&mut self, error: FrameDecodeError) | () | Cancels all pending requests |
Sources: src/rpc/rpc_dispatcher.rs:362-456
Queue Entry Structure
Each entry in the rpc_request_queue is a tuple:
Where:
- The
u32is therpc_request_id(also calledheader_idin the API) - The
RpcRequestcontains:rpc_method_id: u64rpc_param_bytes: Option<Vec<u8>>rpc_prebuffered_payload_bytes: Option<Vec<u8>>is_finalized: bool
The queue is populated automatically by the catch-all response handler installed during RpcDispatcher::new().
Sources: src/rpc/rpc_dispatcher.rs:45-50 src/rpc/rpc_dispatcher.rs:98-209
graph LR
Dispatcher["RpcDispatcher"]
Encoder["RpcStreamEncoder"]
OnEmit["on_emit callback\n(implements RpcEmit)"]
Transport["Transport Implementation\n(e.g., WebSocket)"]
Dispatcher -->|call creates| Encoder
Encoder -->|write_bytes emits via| OnEmit
OnEmit -->|sends bytes to| Transport
note1["Encoder chunks large payloads\nAdds frame headers\nNo I/O performed"]
Integration with Transport Layer
The core library interfaces with transport implementations through callback boundaries, never performing I/O directly.
Outbound Data Flow
Sources: src/rpc/rpc_dispatcher.rs:226-286
Inbound Data Flow
Sources: src/rpc/rpc_dispatcher.rs:362-374
Transport Requirements
Any transport implementation must:
- Provide byte transmission: Implement or accept callbacks matching the
RpcEmittrait signature - Feed received bytes: Call
RpcDispatcher.read_bytes()with incoming data - Handle connection lifecycle: Call
fail_all_pending_requests()on disconnection - Respect chunk boundaries: The core chunks large payloads; transport must transmit frames as-is
The core library makes no assumptions about:
- Connection establishment
- Error handling semantics
- Threading model
- Async vs sync execution
Sources: README.md:34-40 src/rpc/rpc_dispatcher.rs:362-374
Thread Safety and Concurrency
The core library is designed for safe concurrent access across multiple threads:
| Component | Thread Safety Mechanism | Purpose |
|---|---|---|
rpc_request_queue | Arc<Mutex<VecDeque<>>> | Allows multiple threads to inspect/modify queue |
| Response handlers | Box<dyn FnMut + Send> | Handlers can be called from any thread |
RpcDispatcher | Not Sync, use per-connection | Each connection should have its own dispatcher |
Per-Connection Isolation
The RpcDispatcher documentation explicitly states:
IMPORTANT: A unique dispatcher should be used per-client.
This design ensures that request IDs and stream IDs do not collide across different connections. Transport implementations should create one RpcDispatcher instance per active connection.
Sources: src/rpc/rpc_dispatcher.rs35 src/rpc/rpc_dispatcher.rs:45-50
Error Handling
The core library propagates errors through Result types and error events:
| Error Type | Used By | Represents |
|---|---|---|
FrameEncodeError | Encoding operations | Failed to create valid frames |
FrameDecodeError | Decoding operations | Invalid or corrupt incoming frames |
RpcStreamEvent::Error | Event callbacks | Runtime stream processing errors |
When a decoding error occurs, the core library:
- Removes the affected stream decoder from
rpc_stream_decoders - Emits an
RpcStreamEvent::Errorevent - Returns
Err(FrameDecodeError)to the caller
For connection-level failures, fail_all_pending_requests() should be called to notify all pending response handlers.
Sources: src/rpc/rpc_dispatcher.rs:427-456 src/rpc/rpc_internals/rpc_session.rs:53-117
Summary
The muxio core library provides a three-layer architecture for stream multiplexing:
- Binary Protocol Layer: Frame encoding/decoding with minimal overhead
- Session Management Layer: Stream lifecycle, ID allocation, handler registration
- Application Interface Layer: Request correlation, queue management, high-level operations
The non-async, callback-driven design enables deployment across native, WASM, and any async runtime without modification. Transport implementations integrate by passing bytes through the read_bytes() and on_emit callback boundaries.
For implementation details of specific layers, see:
Sources: README.md:12-61 src/rpc/rpc_dispatcher.rs:20-51 src/rpc/rpc_internals/rpc_session.rs:15-24
Dismiss
Refresh this wiki
Enter email to refresh