Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Core Library (muxio)

Relevant source files

Purpose and Scope

The muxio core library provides the foundational stream multiplexing engine that enables multiple independent data streams to coexist over a single connection. This document covers the core library's architecture, components, and design principles. The core library itself is transport-agnostic and runtime-agnostic, operating through a callback-driven model without requiring any specific async runtime.

For detailed information about specific subsystems:

For information about RPC abstractions built on top of the core: see RPC Framework

For concrete transport implementations using the core: see Transport Implementations

Sources: README.md:1-61 src/rpc/rpc_dispatcher.rs:1-457


Layered Architecture

The core library follows a three-layer design where each layer has distinct responsibilities and interfaces with adjacent layers through well-defined boundaries.

graph TB
    subgraph "Layer 3: Application Interface"
        RpcDispatcher["RpcDispatcher\nRequest correlation\nResponse tracking"]
end
    
    subgraph "Layer 2: Session Management"
        RpcRespondableSession["RpcRespondableSession\nResponse handler registration\nPer-request callbacks"]
RpcSession["RpcSession\nStream lifecycle management\nStream ID allocation"]
end
    
    subgraph "Layer 1: Binary Protocol"
        RpcStreamEncoder["RpcStreamEncoder\nEncodes outbound streams"]
RpcStreamDecoder["RpcStreamDecoder\nDecodes inbound streams"]
FrameMuxStreamDecoder["FrameMuxStreamDecoder\nFrame reassembly"]
end
    
    subgraph "Transport Layer (Not in Core)"
        Transport["WebSocket / TCP / Custom\nByte transmission"]
end
    
 
   RpcDispatcher -->|uses| RpcRespondableSession
 
   RpcRespondableSession -->|wraps| RpcSession
 
   RpcSession -->|creates| RpcStreamEncoder
 
   RpcSession -->|manages| RpcStreamDecoder
 
   RpcSession -->|feeds bytes to| FrameMuxStreamDecoder
    
 
   RpcStreamEncoder -->|emits bytes via callback| Transport
 
   Transport -->|passes received bytes| FrameMuxStreamDecoder

Architecture Layers

Sources: src/rpc/rpc_dispatcher.rs:20-51 src/rpc/rpc_internals/rpc_session.rs:15-24 src/rpc/rpc_internals/rpc_respondable_session.rs:14-28

LayerComponentResponsibilityFile Location
3RpcDispatcherRequest/response correlation, queue management, ID generationsrc/rpc/rpc_dispatcher.rs
2RpcRespondableSessionResponse handler registration, prebufferingsrc/rpc/rpc_internals/rpc_respondable_session.rs
2RpcSessionStream ID allocation, frame routingsrc/rpc/rpc_internals/rpc_session.rs
1RpcStreamEncoderOutbound stream encodingsrc/rpc/rpc_internals/rpc_stream_encoder.rs
1RpcStreamDecoderInbound stream decodingsrc/rpc/rpc_internals/rpc_stream_decoder.rs
1FrameMuxStreamDecoderFrame multiplexing/demultiplexingsrc/frame/

Sources: src/rpc/rpc_dispatcher.rs:36-50 src/rpc/rpc_internals/rpc_session.rs:20-24 README.md:28-34


Core Components

RpcSession

RpcSession is the low-level stream multiplexing engine. It manages stream ID allocation, maintains per-stream decoder state, and routes incoming frames to the appropriate decoder.

Key responsibilities:

  • Allocates monotonically increasing stream IDs using next_stream_id
  • Maintains a HashMap<u32, RpcStreamDecoder> mapping stream IDs to decoders
  • Processes incoming bytes through FrameMuxStreamDecoder
  • Cleans up completed or cancelled streams

Public API:

  • init_request() - Creates a new RpcStreamEncoder for an outbound stream
  • read_bytes() - Processes incoming bytes and invokes event callbacks

Sources: src/rpc/rpc_internals/rpc_session.rs:15-117

RpcRespondableSession

RpcRespondableSession wraps RpcSession and adds response handler tracking. It allows callers to register per-request callbacks that are invoked when response events arrive.

Key responsibilities:

  • Maintains response_handlers: HashMap<u32, Box<dyn FnMut(RpcStreamEvent)>> for per-request callbacks
  • Provides optional catch_all_response_handler for unmatched events
  • Implements prebuffering logic to accumulate payload chunks into single events
  • Manages prebuffering_flags: HashMap<u32, bool> to control buffering per request

Public API:

  • init_respondable_request() - Starts a request with optional response callback and prebuffering
  • start_reply_stream() - Initiates a response stream
  • set_catch_all_response_handler() - Registers global fallback handler
  • read_bytes() - Processes bytes and routes events to registered handlers

Sources: src/rpc/rpc_internals/rpc_respondable_session.rs:14-178

RpcDispatcher

RpcDispatcher is the highest-level component, providing request correlation and queue management. It generates unique request IDs, tracks active requests in a shared queue, and handles request/response lifecycle.

Key responsibilities:

  • Generates unique rpc_request_id values via next_rpc_request_id: u32
  • Maintains rpc_request_queue: Arc<Mutex<VecDeque<(u32, RpcRequest)>>>
  • Installs a catch-all response handler to populate the request queue
  • Provides methods to query, finalize, and delete requests from the queue

Public API:

  • call() - Initiates an RPC call with an RpcRequest, returns RpcStreamEncoder
  • respond() - Sends an RPC response with an RpcResponse
  • read_bytes() - Processes incoming bytes, returns list of active request IDs
  • get_rpc_request() - Retrieves a request from the queue by ID
  • is_rpc_request_finalized() - Checks if a request has received all payload chunks
  • delete_rpc_request() - Removes and returns a request from the queue
  • fail_all_pending_requests() - Cancels all pending requests with an error

Sources: src/rpc/rpc_dispatcher.rs:36-457


Component Relationships

Initialization and Layering

Sources: src/rpc/rpc_dispatcher.rs:59-71 src/rpc/rpc_internals/rpc_respondable_session.rs:30-39 src/rpc/rpc_internals/rpc_session.rs:26-33

Stream Creation Flow

Sources: src/rpc/rpc_dispatcher.rs:226-286 src/rpc/rpc_internals/rpc_respondable_session.rs:42-68 src/rpc/rpc_internals/rpc_session.rs:35-50


Key Design Principles

Non-Async Callback-Driven Model

The core library does not use async/await or any specific async runtime. Instead, it operates through callbacks:

  • Outbound data: Components accept on_emit callbacks implementing the RpcEmit trait
  • Inbound events: Components invoke event handlers implementing RpcResponseHandler
  • Thread safety: Shared state uses Arc<Mutex<>> for safe concurrent access

This design enables the core library to work in WASM environments, multithreaded native applications, and any async runtime without modification.

Sources: src/rpc/rpc_dispatcher.rs:26-30 README.md:34-35

Transport and Runtime Agnostic

The core library has zero dependencies on specific transports or async runtimes:

  • Bytes are passed in/out via callbacks, not I/O operations
  • No assumptions about WebSocket, TCP, or any protocol
  • No tokio, async-std, or runtime dependencies in the core

Extension crates (e.g., muxio-tokio-rpc-client, muxio-wasm-rpc-client) provide transport bindings.

Sources: README.md:34-40 src/rpc/rpc_dispatcher.rs:20-35

Request Correlation via IDs

The core library uses two ID systems for multiplexing:

ID TypeScopeGenerated ByPurpose
stream_idTransport frameRpcSession.next_stream_idDistinguishes interleaved frame streams
rpc_request_idRPC protocolRpcDispatcher.next_rpc_request_idCorrelates requests with responses

Both use monotonically increasing u32 values via increment_u32_id(). The stream_id operates at the framing layer, while rpc_request_id operates at the RPC layer and is encoded in the RpcHeader.

Sources: src/rpc/rpc_dispatcher.rs:41-42 src/rpc/rpc_internals/rpc_session.rs21 src/utils/increment_u32_id.rs

Mutex Poisoning Policy

The RpcDispatcher maintains a shared rpc_request_queue protected by Mutex. If the mutex becomes poisoned (a thread panicked while holding the lock), the dispatcher panics immediately rather than attempting recovery:

This is a deliberate design choice prioritizing correctness over availability. A poisoned queue likely indicates corrupted state, and continuing execution could lead to silent data loss or incorrect routing.

Sources: src/rpc/rpc_dispatcher.rs:86-118


Stream Lifecycle Management

Outbound Request Lifecycle

Sources: src/rpc/rpc_dispatcher.rs:226-286

Inbound Response Lifecycle

Sources: src/rpc/rpc_dispatcher.rs:98-209 src/rpc/rpc_internals/rpc_stream_decoder.rs:53-185

Decoder State Machine

The RpcStreamDecoder maintains internal state as frames arrive:

Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:11-186


Request Queue Management

The RpcDispatcher provides a shared request queue accessible to application code for inspecting and managing active requests.

Queue Operations

MethodReturn TypePurpose
read_bytes(&mut self, bytes: &[u8])Result<Vec<u32>, FrameDecodeError>Processes incoming bytes, returns active request IDs
get_rpc_request(&self, header_id: u32)Option<MutexGuard<VecDeque<(u32, RpcRequest)>>>Locks queue if header_id exists
is_rpc_request_finalized(&self, header_id: u32)Option<bool>Checks if request received End frame
delete_rpc_request(&self, header_id: u32)Option<RpcRequest>Removes and returns request from queue
fail_all_pending_requests(&mut self, error: FrameDecodeError)()Cancels all pending requests

Sources: src/rpc/rpc_dispatcher.rs:362-456

Queue Entry Structure

Each entry in the rpc_request_queue is a tuple:

Where:

  • The u32 is the rpc_request_id (also called header_id in the API)
  • The RpcRequest contains:
    • rpc_method_id: u64
    • rpc_param_bytes: Option<Vec<u8>>
    • rpc_prebuffered_payload_bytes: Option<Vec<u8>>
    • is_finalized: bool

The queue is populated automatically by the catch-all response handler installed during RpcDispatcher::new().

Sources: src/rpc/rpc_dispatcher.rs:45-50 src/rpc/rpc_dispatcher.rs:98-209


graph LR
    Dispatcher["RpcDispatcher"]
Encoder["RpcStreamEncoder"]
OnEmit["on_emit callback\n(implements RpcEmit)"]
Transport["Transport Implementation\n(e.g., WebSocket)"]
Dispatcher -->|call creates| Encoder
 
   Encoder -->|write_bytes emits via| OnEmit
 
   OnEmit -->|sends bytes to| Transport
    
    note1["Encoder chunks large payloads\nAdds frame headers\nNo I/O performed"]

Integration with Transport Layer

The core library interfaces with transport implementations through callback boundaries, never performing I/O directly.

Outbound Data Flow

Sources: src/rpc/rpc_dispatcher.rs:226-286

Inbound Data Flow

Sources: src/rpc/rpc_dispatcher.rs:362-374

Transport Requirements

Any transport implementation must:

  1. Provide byte transmission: Implement or accept callbacks matching the RpcEmit trait signature
  2. Feed received bytes: Call RpcDispatcher.read_bytes() with incoming data
  3. Handle connection lifecycle: Call fail_all_pending_requests() on disconnection
  4. Respect chunk boundaries: The core chunks large payloads; transport must transmit frames as-is

The core library makes no assumptions about:

  • Connection establishment
  • Error handling semantics
  • Threading model
  • Async vs sync execution

Sources: README.md:34-40 src/rpc/rpc_dispatcher.rs:362-374


Thread Safety and Concurrency

The core library is designed for safe concurrent access across multiple threads:

ComponentThread Safety MechanismPurpose
rpc_request_queueArc<Mutex<VecDeque<>>>Allows multiple threads to inspect/modify queue
Response handlersBox<dyn FnMut + Send>Handlers can be called from any thread
RpcDispatcherNot Sync, use per-connectionEach connection should have its own dispatcher

Per-Connection Isolation

The RpcDispatcher documentation explicitly states:

IMPORTANT: A unique dispatcher should be used per-client.

This design ensures that request IDs and stream IDs do not collide across different connections. Transport implementations should create one RpcDispatcher instance per active connection.

Sources: src/rpc/rpc_dispatcher.rs35 src/rpc/rpc_dispatcher.rs:45-50


Error Handling

The core library propagates errors through Result types and error events:

Error TypeUsed ByRepresents
FrameEncodeErrorEncoding operationsFailed to create valid frames
FrameDecodeErrorDecoding operationsInvalid or corrupt incoming frames
RpcStreamEvent::ErrorEvent callbacksRuntime stream processing errors

When a decoding error occurs, the core library:

  1. Removes the affected stream decoder from rpc_stream_decoders
  2. Emits an RpcStreamEvent::Error event
  3. Returns Err(FrameDecodeError) to the caller

For connection-level failures, fail_all_pending_requests() should be called to notify all pending response handlers.

Sources: src/rpc/rpc_dispatcher.rs:427-456 src/rpc/rpc_internals/rpc_session.rs:53-117


Summary

The muxio core library provides a three-layer architecture for stream multiplexing:

  1. Binary Protocol Layer: Frame encoding/decoding with minimal overhead
  2. Session Management Layer: Stream lifecycle, ID allocation, handler registration
  3. Application Interface Layer: Request correlation, queue management, high-level operations

The non-async, callback-driven design enables deployment across native, WASM, and any async runtime without modification. Transport implementations integrate by passing bytes through the read_bytes() and on_emit callback boundaries.

For implementation details of specific layers, see:

Sources: README.md:12-61 src/rpc/rpc_dispatcher.rs:20-51 src/rpc/rpc_internals/rpc_session.rs:15-24

Dismiss

Refresh this wiki

Enter email to refresh