This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Stream Multiplexing
Loading…
Stream Multiplexing
Relevant source files
Purpose and Scope
This document describes the stream multiplexing mechanism in the Muxio core library, specifically focusing on the RpcSession component. Stream multiplexing enables multiple independent RPC request/response streams to be transmitted concurrently over a single underlying connection without interference.
This page covers the low-level mechanics of stream ID allocation, per-stream state management, frame routing, and cleanup. For information about the binary framing protocol that underlies stream multiplexing, see Binary Framing Protocol. For information about higher-level request/response correlation and RPC lifecycle management, see RPC Dispatcher.
Sources: src/rpc/rpc_internals/rpc_session.rs:1-118 README.md:15-36
Overview
The RpcSession struct is the central component for stream multiplexing. It operates at a layer above binary frame encoding/decoding but below RPC protocol semantics. Its primary responsibilities include:
- Allocating unique stream IDs for outbound requests
- Routing incoming frames to the appropriate per-stream decoder
- Managing the lifecycle of individual stream decoders
- Emitting stream events (header, payload chunks, completion) for higher layers to process
Each logical RPC call (request or response) is assigned a unique stream_id. Multiple streams can be active simultaneously, with their frames interleaved at the transport level. The RpcSession ensures that frames are correctly demultiplexed and reassembled into coherent stream events.
Sources: src/rpc/rpc_internals/rpc_session.rs:15-24
Architecture
Component Structure
Diagram 1: RpcSession Architecture
This diagram illustrates how RpcSession sits between the binary framing layer and the RPC protocol layer. The frame_mux_stream_decoder field processes raw bytes into DecodedFrame instances, which are then routed to individual RpcStreamDecoder instances stored in the rpc_stream_decoders HashMap based on their stream_id.
Sources: src/rpc/rpc_internals/rpc_session.rs:20-33 src/rpc/rpc_internals/rpc_stream_decoder.rs:11-18 src/rpc/rpc_internals/rpc_session.rs:52-60
Stream ID Allocation
When an outbound RPC request or response is initiated, RpcSession must allocate a unique stream ID. This is managed by the next_stream_id counter, which is incremented for each new stream.
Allocation Process
| Step | Action | Implementation |
|---|---|---|
| 1 | Capture current next_stream_id value | src/rpc/rpc_internals/rpc_session.rs44 |
| 2 | Increment counter via increment_u32_id() | src/rpc/rpc_internals/rpc_session.rs45 |
| 3 | Create RpcStreamEncoder with allocated ID | src/rpc/rpc_internals/rpc_session.rs:47-48 |
| 4 | Return encoder to caller | src/rpc/rpc_internals/rpc_session.rs49 |
The increment_u32_id() utility function src/utils.rs wraps the counter on overflow, ensuring continuous operation.
Diagram 2: Stream ID Allocation Sequence
Sources: src/rpc/rpc_internals/rpc_session.rs:35-50 src/utils.rs5
Inbound Frame Processing
The read_bytes() method is the entry point for all incoming data. It processes raw bytes through the following pipeline:
graph LR
RawBytes["input: &[u8]"] --> ReadBytes["RpcSession::read_bytes()"]
ReadBytes --> FrameMuxDecoder["frame_mux_stream_decoder\n.read_bytes(input)"]
FrameMuxDecoder --> |frames: Iterator<Result<DecodedFrame>>| FrameLoop["for frame_result in frames"]
FrameLoop --> ExtractStreamID["frame.inner.stream_id"]
ExtractStreamID --> LookupDecoder["rpc_stream_decoders\n.entry(stream_id)\n.or_default()"]
LookupDecoder --> DecodeFrame["rpc_stream_decoder\n.decode_rpc_frame(&frame)"]
DecodeFrame --> |Ok events: Vec<RpcStreamEvent>| EmitEvents["for event in events"]
EmitEvents --> CheckEnd["matches!(event,\nRpcStreamEvent::End)"]
CheckEnd --> |true| CleanupDecoder["rpc_stream_decoders\n.remove(&stream_id)"]
CheckEnd --> |false| CallbackInvoke["on_rpc_stream_event(event)"]
DecodeFrame --> |Err e| ErrorCleanup["rpc_stream_decoders\n.remove(&stream_id)\nemit Error event"]
Processing Pipeline
Diagram 3: Inbound Frame Processing Pipeline
Sources: src/rpc/rpc_internals/rpc_session.rs:52-117
Per-Stream Decoder Management
Each unique stream_id encountered gets its own RpcStreamDecoder instance, which maintains the decoding state for that stream. These decoders are stored in the rpc_stream_decoders: HashMap<u32, RpcStreamDecoder> field and are lazily created on first access using the entry().or_default() pattern.
Decoders are automatically cleaned up when:
- An
RpcStreamEvent::Endis emitted after processing (line 73-75) - A
FrameKind::CancelorFrameKind::Endframe is received (line 98-100) - A decoding error occurs in
decode_rpc_frame()(line 82)
Sources: src/rpc/rpc_internals/rpc_session.rs68 src/rpc/rpc_internals/rpc_session.rs:73-75 src/rpc/rpc_internals/rpc_session.rs:80-82 src/rpc/rpc_internals/rpc_session.rs:98-100
RpcStreamDecoder State Machine
Each RpcStreamDecoder operates as a state machine that transitions through three states as it processes frames for a single stream.
stateDiagram-v2
[*] --> AwaitHeader: RpcStreamDecoder::new()
AwaitHeader --> AwaitHeader : buffer.len() < RPC_FRAME_FRAME_HEADER_SIZE or insufficient metadata
AwaitHeader --> AwaitPayload : Header complete - extract RpcHeader, state = AwaitPayload, emit Header event
AwaitPayload --> AwaitPayload: FrameKind::Data\nemit PayloadChunk event
AwaitPayload --> Done: frame.inner.kind == FrameKind::End\nstate = Done,\nemit End event
AwaitPayload --> [*]: frame.inner.kind == FrameKind::Cancel\nreturn ReadAfterCancel error
Done --> [*] : Decoder removed from HashMap
State Transitions
Diagram 4: RpcStreamDecoder State Machine
Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:20-24 src/rpc/rpc_internals/rpc_stream_decoder.rs:53-186 src/rpc/rpc_internals/rpc_stream_decoder.rs:64-65 src/rpc/rpc_internals/rpc_stream_decoder.rs120 src/rpc/rpc_internals/rpc_stream_decoder.rs:156-157 src/rpc/rpc_internals/rpc_stream_decoder.rs:165-166
State Descriptions
| State | Purpose | Buffer Usage | Events Emitted |
|---|---|---|---|
| AwaitHeader | Accumulate bytes until complete RPC header is available | Accumulates all incoming bytes | RpcStreamEvent::Header when header complete |
| AwaitPayload | Process payload chunks after header extracted | Not used (data forwarded directly) | RpcStreamEvent::PayloadChunk for each frame |
| Done | Stream has completed | Not used | RpcStreamEvent::End on transition |
Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:11-42
Header Decoding
The RPC header is embedded at the beginning of each stream and contains metadata necessary for routing and processing. The header structure is fixed-size with a variable-length metadata field:
Header Structure
| Field | Offset Constant | Size | Description |
|---|---|---|---|
rpc_msg_type | RPC_FRAME_MSG_TYPE_OFFSET (0) | 1 byte | RpcMessageType enum (Call or Response) |
rpc_request_id | RPC_FRAME_ID_OFFSET (1) | 4 bytes | Request correlation ID |
rpc_method_id | RPC_FRAME_METHOD_ID_OFFSET (5) | 8 bytes | Method identifier (xxhash) |
metadata_length | RPC_FRAME_METADATA_LENGTH_OFFSET (13) | 2 bytes | Length of metadata bytes |
rpc_metadata_bytes | 15 | variable | Serialized metadata |
sequenceDiagram
participant Frame as "DecodedFrame"
participant Decoder as "RpcStreamDecoder"
participant Buffer as "self.buffer: Vec<u8>"
participant Events as "events: Vec<RpcStreamEvent>"
Note over Decoder: self.state = AwaitHeader
Frame->>Decoder: decode_rpc_frame(&frame)
Decoder->>Buffer: buffer.extend(&frame.inner.payload)
Decoder->>Buffer: Check buffer.len() >= RPC_FRAME_FRAME_HEADER_SIZE
alt "buffer.len() < RPC_FRAME_FRAME_HEADER_SIZE"
Decoder-->>Events: return Ok(events) // empty
else "Sufficient for header"
Decoder->>Decoder: rpc_msg_type = RpcMessageType::try_from(buffer[RPC_FRAME_MSG_TYPE_OFFSET])
Decoder->>Decoder: rpc_request_id = u32::from_le_bytes(buffer[RPC_FRAME_ID_OFFSET..RPC_FRAME_METHOD_ID_OFFSET])
Decoder->>Decoder: rpc_method_id = u64::from_le_bytes(buffer[RPC_FRAME_METHOD_ID_OFFSET..RPC_FRAME_METADATA_LENGTH_OFFSET])
Decoder->>Decoder: meta_len = u16::from_le_bytes(buffer[RPC_FRAME_METADATA_LENGTH_OFFSET..])
Decoder->>Buffer: Check buffer.len() >= header_size + meta_len
alt "Complete header + metadata available"
Decoder->>Decoder: rpc_metadata_bytes = buffer[15..15+meta_len].to_vec()
Decoder->>Decoder: header_arc = Arc::new(RpcHeader { ... })
Decoder->>Decoder: self.header = Some(header_arc.clone())
Decoder->>Decoder: self.state = AwaitPayload
Decoder->>Buffer: buffer.drain(..15+meta_len)
Decoder->>Events: events.push(RpcStreamEvent::Header { ... })
opt "!buffer.is_empty()"
Decoder->>Events: events.push(RpcStreamEvent::PayloadChunk { ... })
end
end
end
Decoder-->>Frame: return Ok(events)
The decoder buffers incoming data in buffer: Vec<u8> until at least RPC_FRAME_FRAME_HEADER_SIZE + metadata_length bytes are available, then extracts the header fields and transitions to AwaitPayload state.
Diagram 5: Header Decoding Process
Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:60-145 src/constants.rs:13-17
graph TB
subgraph "RpcSession State at Time T"
Session["RpcSession\nnext_stream_id = 104"]
subgraph "rpc_stream_decoders HashMap"
Stream101["stream_id: 101\nState: AwaitPayload\nheader: Some(Add request)\nBuffered: 256 bytes"]
Stream102["stream_id: 102\nState: AwaitHeader\nheader: None\nBuffered: 8 bytes"]
Stream103["stream_id: 103\nState: Done\nheader: Some(Echo response)\nReady for cleanup"]
end
end
IncomingFrame["Incoming Frame\nstream_id: 102\npayload: 24 bytes"] --> Session
Session --> |Route to| Stream102
Stream102 --> |Now 32 bytes total Header complete!| HeaderEvent["RpcStreamEvent::Header\nfor stream 102"]
Stream103 --> |Remove from HashMap| Cleanup["Cleanup"]
Concurrent Stream Example
Multiple streams can be active simultaneously, each at different stages of processing. The following illustrates how three concurrent streams might be managed:
Diagram 6: Concurrent Stream Management Example
This illustrates a snapshot where stream 101 is actively receiving payload, stream 102 is still accumulating its header, and stream 103 has completed and is ready for cleanup.
Sources: src/rpc/rpc_internals/rpc_session.rs:22-32 src/rpc/rpc_internals/rpc_stream_decoder.rs:11-18
Stream Cleanup
Proper cleanup of stream decoders is essential to prevent memory leaks in long-lived connections with many sequential RPC calls. Cleanup occurs at several points:
Cleanup Triggers
| Trigger | Location | Cleanup Action |
|---|---|---|
RpcStreamEvent::End emitted | src/rpc/rpc_internals/rpc_session.rs:73-75 | self.rpc_stream_decoders.remove(&stream_id) |
FrameKind::End received | src/rpc/rpc_internals/rpc_session.rs:98-100 | self.rpc_stream_decoders.remove(&stream_id) |
FrameKind::Cancel received | src/rpc/rpc_internals/rpc_session.rs:98-100 | self.rpc_stream_decoders.remove(&stream_id) |
Decoding error in decode_rpc_frame() | src/rpc/rpc_internals/rpc_session.rs82 | self.rpc_stream_decoders.remove(&stream_id) |
graph TD
Event["Stream Event"] --> CheckType{"Event Type?"}
CheckType --> |RpcStreamEvent::End| Cleanup1["Remove decoder\nfrom HashMap"]
CheckType --> |FrameKind::End| Cleanup2["Remove decoder\nfrom HashMap"]
CheckType --> |FrameKind::Cancel| Cleanup3["Remove decoder\nfrom HashMap"]
CheckType --> |Decode Error| Cleanup4["Remove decoder\nfrom HashMap"]
CheckType --> |Other| Continue["Continue processing"]
Cleanup1 --> Done["Stream resources freed"]
Cleanup2 --> Done
Cleanup3 --> Done
Cleanup4 --> Done
All cleanup operations use the HashMap::remove(&stream_id) method to immediately deallocate the RpcStreamDecoder instance and its buffered data.
Diagram 7: Stream Cleanup Triggers
Sources: src/rpc/rpc_internals/rpc_session.rs:73-100
Error Handling
When errors occur during frame decoding or stream processing, the RpcSession generates RpcStreamEvent::Error events and performs cleanup:
Error Scenarios
| Error Type | Source | Response |
|---|---|---|
| Invalid frame structure | FrameMuxStreamDecoder | Emit error event, continue with other streams |
| Corrupt RPC header | RpcStreamDecoder | Emit error event, remove decoder, return error |
| Read after cancel | FrameKind::Cancel received | Return FrameDecodeError::ReadAfterCancel |
All error events include:
rpc_header: Header data if availablerpc_request_id: Request ID if header was decodedrpc_method_id: Method ID if header was decodedframe_decode_error: The underlying error type
This information allows higher layers to perform appropriate error handling and reporting.
Sources: src/rpc/rpc_internals/rpc_session.rs:80-111 src/rpc/rpc_internals/rpc_stream_decoder.rs:165-166
Key Implementation Details
Thread Safety
RpcSession itself is not thread-safe and does not implement Send or Sync. This design choice aligns with the core philosophy of using callbacks rather than async/await. Higher-level components (like RpcDispatcher) are responsible for managing thread safety if needed.
Memory Efficiency
- Each stream decoder maintains its own buffer, but only while in
AwaitHeaderstate - Once the header is extracted, payload chunks are forwarded directly without additional buffering
- Completed streams are immediately cleaned up, preventing unbounded memory growth
- The
HashMapof decoders shrinks automatically as streams complete
Arc-Wrapped Headers
Headers are wrapped in Arc<RpcHeader> immediately upon decoding:
This allows multiple RpcStreamEvent instances for the same stream to share the same header data via Arc::clone() without deep cloning the rpc_metadata_bytes, which is particularly important for streams with large metadata.
Sources: src/rpc/rpc_internals/rpc_session.rs:15-33 src/rpc/rpc_internals/rpc_stream_decoder.rs:111-117 src/rpc/rpc_internals/rpc_stream_decoder.rs13
Integration with Other Components
Relationship to Binary Framing
RpcSession depends on FrameMuxStreamDecoder from the binary framing layer but does not implement frame encoding/decoding itself. It operates at a higher level of abstraction, concerned with RPC-specific concepts like headers and stream events rather than raw frame structures.
See Binary Framing Protocol for details on frame encoding/decoding.
Relationship to RPC Dispatcher
RpcDispatcher (see RPC Dispatcher) sits above RpcSession and consumes the RpcStreamEvent callbacks. The dispatcher correlates requests with responses and manages the RPC protocol semantics, while RpcSession handles only the mechanical aspects of stream multiplexing.
Sources: README.md:29-35