Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Stream Multiplexing

Loading…

Stream Multiplexing

Relevant source files

Purpose and Scope

This document describes the stream multiplexing mechanism in the Muxio core library, specifically focusing on the RpcSession component. Stream multiplexing enables multiple independent RPC request/response streams to be transmitted concurrently over a single underlying connection without interference.

This page covers the low-level mechanics of stream ID allocation, per-stream state management, frame routing, and cleanup. For information about the binary framing protocol that underlies stream multiplexing, see Binary Framing Protocol. For information about higher-level request/response correlation and RPC lifecycle management, see RPC Dispatcher.

Sources: src/rpc/rpc_internals/rpc_session.rs:1-118 README.md:15-36


Overview

The RpcSession struct is the central component for stream multiplexing. It operates at a layer above binary frame encoding/decoding but below RPC protocol semantics. Its primary responsibilities include:

  • Allocating unique stream IDs for outbound requests
  • Routing incoming frames to the appropriate per-stream decoder
  • Managing the lifecycle of individual stream decoders
  • Emitting stream events (header, payload chunks, completion) for higher layers to process

Each logical RPC call (request or response) is assigned a unique stream_id. Multiple streams can be active simultaneously, with their frames interleaved at the transport level. The RpcSession ensures that frames are correctly demultiplexed and reassembled into coherent stream events.

Sources: src/rpc/rpc_internals/rpc_session.rs:15-24


Architecture

Component Structure

Diagram 1: RpcSession Architecture

This diagram illustrates how RpcSession sits between the binary framing layer and the RPC protocol layer. The frame_mux_stream_decoder field processes raw bytes into DecodedFrame instances, which are then routed to individual RpcStreamDecoder instances stored in the rpc_stream_decoders HashMap based on their stream_id.

Sources: src/rpc/rpc_internals/rpc_session.rs:20-33 src/rpc/rpc_internals/rpc_stream_decoder.rs:11-18 src/rpc/rpc_internals/rpc_session.rs:52-60


Stream ID Allocation

When an outbound RPC request or response is initiated, RpcSession must allocate a unique stream ID. This is managed by the next_stream_id counter, which is incremented for each new stream.

Allocation Process

StepActionImplementation
1Capture current next_stream_id valuesrc/rpc/rpc_internals/rpc_session.rs44
2Increment counter via increment_u32_id()src/rpc/rpc_internals/rpc_session.rs45
3Create RpcStreamEncoder with allocated IDsrc/rpc/rpc_internals/rpc_session.rs:47-48
4Return encoder to callersrc/rpc/rpc_internals/rpc_session.rs49

The increment_u32_id() utility function src/utils.rs wraps the counter on overflow, ensuring continuous operation.

Diagram 2: Stream ID Allocation Sequence

Sources: src/rpc/rpc_internals/rpc_session.rs:35-50 src/utils.rs5


Inbound Frame Processing

The read_bytes() method is the entry point for all incoming data. It processes raw bytes through the following pipeline:

graph LR
    RawBytes["input: &[u8]"] --> ReadBytes["RpcSession::read_bytes()"]
ReadBytes --> FrameMuxDecoder["frame_mux_stream_decoder\n.read_bytes(input)"]
FrameMuxDecoder --> |frames: Iterator<Result<DecodedFrame>>| FrameLoop["for frame_result in frames"]
FrameLoop --> ExtractStreamID["frame.inner.stream_id"]
ExtractStreamID --> LookupDecoder["rpc_stream_decoders\n.entry(stream_id)\n.or_default()"]
LookupDecoder --> DecodeFrame["rpc_stream_decoder\n.decode_rpc_frame(&frame)"]
DecodeFrame --> |Ok events: Vec<RpcStreamEvent>| EmitEvents["for event in events"]
EmitEvents --> CheckEnd["matches!(event,\nRpcStreamEvent::End)"]
CheckEnd --> |true| CleanupDecoder["rpc_stream_decoders\n.remove(&stream_id)"]
CheckEnd --> |false| CallbackInvoke["on_rpc_stream_event(event)"]
DecodeFrame --> |Err e| ErrorCleanup["rpc_stream_decoders\n.remove(&stream_id)\nemit Error event"]

Processing Pipeline

Diagram 3: Inbound Frame Processing Pipeline

Sources: src/rpc/rpc_internals/rpc_session.rs:52-117

Per-Stream Decoder Management

Each unique stream_id encountered gets its own RpcStreamDecoder instance, which maintains the decoding state for that stream. These decoders are stored in the rpc_stream_decoders: HashMap<u32, RpcStreamDecoder> field and are lazily created on first access using the entry().or_default() pattern.

Decoders are automatically cleaned up when:

  • An RpcStreamEvent::End is emitted after processing (line 73-75)
  • A FrameKind::Cancel or FrameKind::End frame is received (line 98-100)
  • A decoding error occurs in decode_rpc_frame() (line 82)

Sources: src/rpc/rpc_internals/rpc_session.rs68 src/rpc/rpc_internals/rpc_session.rs:73-75 src/rpc/rpc_internals/rpc_session.rs:80-82 src/rpc/rpc_internals/rpc_session.rs:98-100


RpcStreamDecoder State Machine

Each RpcStreamDecoder operates as a state machine that transitions through three states as it processes frames for a single stream.

stateDiagram-v2
    [*] --> AwaitHeader: RpcStreamDecoder::new()
    
    AwaitHeader --> AwaitHeader : buffer.len() < RPC_FRAME_FRAME_HEADER_SIZE or insufficient metadata
    AwaitHeader --> AwaitPayload : Header complete - extract RpcHeader, state = AwaitPayload, emit Header event
    
    AwaitPayload --> AwaitPayload: FrameKind::Data\nemit PayloadChunk event
    AwaitPayload --> Done: frame.inner.kind == FrameKind::End\nstate = Done,\nemit End event
    AwaitPayload --> [*]: frame.inner.kind == FrameKind::Cancel\nreturn ReadAfterCancel error
    
    Done --> [*] : Decoder removed from HashMap

State Transitions

Diagram 4: RpcStreamDecoder State Machine

Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:20-24 src/rpc/rpc_internals/rpc_stream_decoder.rs:53-186 src/rpc/rpc_internals/rpc_stream_decoder.rs:64-65 src/rpc/rpc_internals/rpc_stream_decoder.rs120 src/rpc/rpc_internals/rpc_stream_decoder.rs:156-157 src/rpc/rpc_internals/rpc_stream_decoder.rs:165-166

State Descriptions

StatePurposeBuffer UsageEvents Emitted
AwaitHeaderAccumulate bytes until complete RPC header is availableAccumulates all incoming bytesRpcStreamEvent::Header when header complete
AwaitPayloadProcess payload chunks after header extractedNot used (data forwarded directly)RpcStreamEvent::PayloadChunk for each frame
DoneStream has completedNot usedRpcStreamEvent::End on transition

Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:11-42


Header Decoding

The RPC header is embedded at the beginning of each stream and contains metadata necessary for routing and processing. The header structure is fixed-size with a variable-length metadata field:

Header Structure

FieldOffset ConstantSizeDescription
rpc_msg_typeRPC_FRAME_MSG_TYPE_OFFSET (0)1 byteRpcMessageType enum (Call or Response)
rpc_request_idRPC_FRAME_ID_OFFSET (1)4 bytesRequest correlation ID
rpc_method_idRPC_FRAME_METHOD_ID_OFFSET (5)8 bytesMethod identifier (xxhash)
metadata_lengthRPC_FRAME_METADATA_LENGTH_OFFSET (13)2 bytesLength of metadata bytes
rpc_metadata_bytes15variableSerialized metadata
sequenceDiagram
    participant Frame as "DecodedFrame"
    participant Decoder as "RpcStreamDecoder"
    participant Buffer as "self.buffer: Vec&lt;u8&gt;"
    participant Events as "events: Vec&lt;RpcStreamEvent&gt;"
    
    Note over Decoder: self.state = AwaitHeader
    Frame->>Decoder: decode_rpc_frame(&frame)
    Decoder->>Buffer: buffer.extend(&frame.inner.payload)
    Decoder->>Buffer: Check buffer.len() >= RPC_FRAME_FRAME_HEADER_SIZE
    
    alt "buffer.len() < RPC_FRAME_FRAME_HEADER_SIZE"
        Decoder-->>Events: return Ok(events) // empty
    else "Sufficient for header"
        Decoder->>Decoder: rpc_msg_type = RpcMessageType::try_from(buffer[RPC_FRAME_MSG_TYPE_OFFSET])
        Decoder->>Decoder: rpc_request_id = u32::from_le_bytes(buffer[RPC_FRAME_ID_OFFSET..RPC_FRAME_METHOD_ID_OFFSET])
        Decoder->>Decoder: rpc_method_id = u64::from_le_bytes(buffer[RPC_FRAME_METHOD_ID_OFFSET..RPC_FRAME_METADATA_LENGTH_OFFSET])
        Decoder->>Decoder: meta_len = u16::from_le_bytes(buffer[RPC_FRAME_METADATA_LENGTH_OFFSET..])
        Decoder->>Buffer: Check buffer.len() >= header_size + meta_len
        
        alt "Complete header + metadata available"
            Decoder->>Decoder: rpc_metadata_bytes = buffer[15..15+meta_len].to_vec()
            Decoder->>Decoder: header_arc = Arc::new(RpcHeader { ... })
            Decoder->>Decoder: self.header = Some(header_arc.clone())
            Decoder->>Decoder: self.state = AwaitPayload
            Decoder->>Buffer: buffer.drain(..15+meta_len)
            Decoder->>Events: events.push(RpcStreamEvent::Header { ... })
            
            opt "!buffer.is_empty()"
                Decoder->>Events: events.push(RpcStreamEvent::PayloadChunk { ... })
            end
        end
    end
    
    Decoder-->>Frame: return Ok(events)

The decoder buffers incoming data in buffer: Vec<u8> until at least RPC_FRAME_FRAME_HEADER_SIZE + metadata_length bytes are available, then extracts the header fields and transitions to AwaitPayload state.

Diagram 5: Header Decoding Process

Sources: src/rpc/rpc_internals/rpc_stream_decoder.rs:60-145 src/constants.rs:13-17


graph TB
    subgraph "RpcSession State at Time T"
        Session["RpcSession\nnext_stream_id = 104"]
subgraph "rpc_stream_decoders HashMap"
            Stream101["stream_id: 101\nState: AwaitPayload\nheader: Some(Add request)\nBuffered: 256 bytes"]
Stream102["stream_id: 102\nState: AwaitHeader\nheader: None\nBuffered: 8 bytes"]
Stream103["stream_id: 103\nState: Done\nheader: Some(Echo response)\nReady for cleanup"]
end
    end
    
 
   IncomingFrame["Incoming Frame\nstream_id: 102\npayload: 24 bytes"] --> Session
 
   Session --> |Route to| Stream102
 
   Stream102 --> |Now 32 bytes total Header complete!| HeaderEvent["RpcStreamEvent::Header\nfor stream 102"]
Stream103 --> |Remove from HashMap| Cleanup["Cleanup"]

Concurrent Stream Example

Multiple streams can be active simultaneously, each at different stages of processing. The following illustrates how three concurrent streams might be managed:

Diagram 6: Concurrent Stream Management Example

This illustrates a snapshot where stream 101 is actively receiving payload, stream 102 is still accumulating its header, and stream 103 has completed and is ready for cleanup.

Sources: src/rpc/rpc_internals/rpc_session.rs:22-32 src/rpc/rpc_internals/rpc_stream_decoder.rs:11-18


Stream Cleanup

Proper cleanup of stream decoders is essential to prevent memory leaks in long-lived connections with many sequential RPC calls. Cleanup occurs at several points:

Cleanup Triggers

TriggerLocationCleanup Action
RpcStreamEvent::End emittedsrc/rpc/rpc_internals/rpc_session.rs:73-75self.rpc_stream_decoders.remove(&stream_id)
FrameKind::End receivedsrc/rpc/rpc_internals/rpc_session.rs:98-100self.rpc_stream_decoders.remove(&stream_id)
FrameKind::Cancel receivedsrc/rpc/rpc_internals/rpc_session.rs:98-100self.rpc_stream_decoders.remove(&stream_id)
Decoding error in decode_rpc_frame()src/rpc/rpc_internals/rpc_session.rs82self.rpc_stream_decoders.remove(&stream_id)
graph TD
 
   Event["Stream Event"] --> CheckType{"Event Type?"}
CheckType --> |RpcStreamEvent::End| Cleanup1["Remove decoder\nfrom HashMap"]
CheckType --> |FrameKind::End| Cleanup2["Remove decoder\nfrom HashMap"]
CheckType --> |FrameKind::Cancel| Cleanup3["Remove decoder\nfrom HashMap"]
CheckType --> |Decode Error| Cleanup4["Remove decoder\nfrom HashMap"]
CheckType --> |Other| Continue["Continue processing"]
Cleanup1 --> Done["Stream resources freed"]
Cleanup2 --> Done
 
   Cleanup3 --> Done
 
   Cleanup4 --> Done

All cleanup operations use the HashMap::remove(&stream_id) method to immediately deallocate the RpcStreamDecoder instance and its buffered data.

Diagram 7: Stream Cleanup Triggers

Sources: src/rpc/rpc_internals/rpc_session.rs:73-100


Error Handling

When errors occur during frame decoding or stream processing, the RpcSession generates RpcStreamEvent::Error events and performs cleanup:

Error Scenarios

Error TypeSourceResponse
Invalid frame structureFrameMuxStreamDecoderEmit error event, continue with other streams
Corrupt RPC headerRpcStreamDecoderEmit error event, remove decoder, return error
Read after cancelFrameKind::Cancel receivedReturn FrameDecodeError::ReadAfterCancel

All error events include:

  • rpc_header: Header data if available
  • rpc_request_id: Request ID if header was decoded
  • rpc_method_id: Method ID if header was decoded
  • frame_decode_error: The underlying error type

This information allows higher layers to perform appropriate error handling and reporting.

Sources: src/rpc/rpc_internals/rpc_session.rs:80-111 src/rpc/rpc_internals/rpc_stream_decoder.rs:165-166


Key Implementation Details

Thread Safety

RpcSession itself is not thread-safe and does not implement Send or Sync. This design choice aligns with the core philosophy of using callbacks rather than async/await. Higher-level components (like RpcDispatcher) are responsible for managing thread safety if needed.

Memory Efficiency

  • Each stream decoder maintains its own buffer, but only while in AwaitHeader state
  • Once the header is extracted, payload chunks are forwarded directly without additional buffering
  • Completed streams are immediately cleaned up, preventing unbounded memory growth
  • The HashMap of decoders shrinks automatically as streams complete

Arc-Wrapped Headers

Headers are wrapped in Arc<RpcHeader> immediately upon decoding:

This allows multiple RpcStreamEvent instances for the same stream to share the same header data via Arc::clone() without deep cloning the rpc_metadata_bytes, which is particularly important for streams with large metadata.

Sources: src/rpc/rpc_internals/rpc_session.rs:15-33 src/rpc/rpc_internals/rpc_stream_decoder.rs:111-117 src/rpc/rpc_internals/rpc_stream_decoder.rs13


Integration with Other Components

Relationship to Binary Framing

RpcSession depends on FrameMuxStreamDecoder from the binary framing layer but does not implement frame encoding/decoding itself. It operates at a higher level of abstraction, concerned with RPC-specific concepts like headers and stream events rather than raw frame structures.

See Binary Framing Protocol for details on frame encoding/decoding.

Relationship to RPC Dispatcher

RpcDispatcher (see RPC Dispatcher) sits above RpcSession and consumes the RpcStreamEvent callbacks. The dispatcher correlates requests with responses and manages the RPC protocol semantics, while RpcSession handles only the mechanical aspects of stream multiplexing.

Sources: README.md:29-35