Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Streaming RPC Calls

Relevant source files

Purpose and Scope

This document describes the streaming RPC mechanism in rust-muxio, which allows bidirectional data transfer over RPC calls with chunked payloads and asynchronous processing. Streaming RPC is used when responses are large, dynamic in size, or need to be processed incrementally.

For information about one-shot RPC calls with complete request/response buffers, see Prebuffered RPC Calls. For the underlying service definition traits, see Service Definitions. For client-side invocation patterns, see Service Caller Interface.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:1-406


Overview of Streaming RPC

Streaming RPC calls provide a mechanism for sending requests and receiving responses that may be too large to buffer entirely in memory, or where the response size is unknown at call time. Unlike prebuffered calls which return complete Result<T, RpcServiceError> values, streaming calls return:

  1. RpcStreamEncoder - For sending additional payload chunks to the server after the initial request
  2. DynamicReceiver - A stream that yields Result<Vec<u8>, RpcServiceError> chunks asynchronously

The streaming mechanism handles:

  • Chunked payload transmission and reassembly
  • Backpressure through bounded or unbounded channels
  • Error propagation and early termination
  • Request/response correlation across multiplexed streams

Key Distinction:

  • Prebuffered RPC : Entire response buffered in memory before returning to caller
  • Streaming RPC : Response chunks streamed incrementally as they arrive

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:33-73


Initiating a Streaming Call

The call_rpc_streaming method on RpcServiceCallerInterface initiates a streaming RPC call:

Method Parameters

ParameterTypeDescription
requestRpcRequestContains rpc_method_id, optional rpc_param_bytes, and optional rpc_prebuffered_payload_bytes
dynamic_channel_typeDynamicChannelTypeSpecifies Bounded or Unbounded channel for response streaming

Return Value

On success, returns a tuple containing:

  • RpcStreamEncoder - Used to send additional payload chunks after the initial request
  • DynamicReceiver - Stream that yields response chunks as Result<Vec<u8>, RpcServiceError>

On failure, returns RpcServiceError::Transport if the client is disconnected or if dispatcher registration fails.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-54


Dynamic Channel Types

The DynamicChannelType enum determines the backpressure characteristics of the response stream:

graph LR
    DCT["DynamicChannelType"]
UNBOUNDED["Unbounded\nmpsc::unbounded()"]
BOUNDED["Bounded\nmpsc::channel(buffer_size)"]
DCT -->|No backpressure| UNBOUNDED
 
   DCT -->|Backpressure at buffer_size| BOUNDED
    
 
   UNBOUNDED -->|Creates| DS_UNBOUNDED["DynamicSender::Unbounded"]
UNBOUNDED -->|Creates| DR_UNBOUNDED["DynamicReceiver::Unbounded"]
BOUNDED -->|Creates| DS_BOUNDED["DynamicSender::Bounded"]
BOUNDED -->|Creates| DR_BOUNDED["DynamicReceiver::Bounded"]

Unbounded Channels

Created with DynamicChannelType::Unbounded. Uses mpsc::unbounded() internally, allowing unlimited buffering of response chunks. Suitable for:

  • Fast consumers that can process chunks quickly
  • Scenarios where response size is bounded and known to fit in memory
  • Testing and development

Risk: Unbounded channels can lead to unbounded memory growth if the receiver is slower than the sender.

Bounded Channels

Created with DynamicChannelType::Bounded. Uses mpsc::channel(DEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE) where DEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE is typically 8. Provides backpressure when the buffer is full. Suitable for:

  • Production systems with predictable memory usage
  • Long-running streams with unknown total size
  • Rate-limiting response processing

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:56-73 extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:50-65


RpcStreamEncoder and DynamicReceiver

RpcStreamEncoder

The RpcStreamEncoder is created by RpcDispatcher::call() and provides methods to send additional payload chunks after the initial request. It wraps an RpcEmit trait implementation that sends binary frames over the transport.

Key characteristics:

  • Created with max_chunk_size from DEFAULT_SERVICE_MAX_CHUNK_SIZE
  • Automatically chunks large payloads into frames
  • Shares the same rpc_request_id as the original request
  • Can send multiple chunks before finalizing the stream

DynamicReceiver

The DynamicReceiver is a unified abstraction over mpsc::UnboundedReceiver and mpsc::Receiver that implements Stream<Item = Result<Vec<u8>, RpcServiceError>>.

VariantUnderlying TypeBackpressure
Unboundedmpsc::UnboundedReceiverNone
Boundedmpsc::ReceiverYes
graph TB
    subgraph "Call Flow"
        CALL["call_rpc_streaming()"]
DISPATCHER["RpcDispatcher::call()"]
ENCODER["RpcStreamEncoder"]
RECEIVER["DynamicReceiver"]
end
    
    subgraph "Response Flow"
        RECV_FN["recv_fn closure\n(RpcResponseHandler)"]
TX["DynamicSender"]
RX["DynamicReceiver"]
APP["Application code\n.next().await"]
end
    
 
   CALL -->|Creates channel| TX
 
   CALL -->|Creates channel| RX
 
   CALL -->|Registers| DISPATCHER
 
   DISPATCHER -->|Returns| ENCODER
 
   CALL -->|Returns| RECEIVER
    
 
   RECV_FN -->|send_and_ignore| TX
 
   TX -.->|mpsc| RX
 
   RX -->|yields chunks| APP

Both variants provide the same next() interface through the StreamExt trait, abstracting the channel type from the caller.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:56-73 extensions/muxio-rpc-service-caller/src/caller_interface.rs:289-323


stateDiagram-v2
    [*] --> Waiting : recv_fn registered
    Waiting --> HeaderReceived: RpcStreamEvent::Header
    HeaderReceived --> Streaming: RpcResultStatus::Success
    HeaderReceived --> ErrorBuffering: RpcResultStatus::MethodNotFound\nRpcResultStatus::Fail\nRpcResultStatus::SystemError
    Streaming --> Streaming: RpcStreamEvent::PayloadChunk
    ErrorBuffering --> ErrorBuffering: RpcStreamEvent::PayloadChunk
    Streaming --> Complete: RpcStreamEvent::End
    ErrorBuffering --> Complete: RpcStreamEvent::End
    Waiting --> Error: RpcStreamEvent::Error
    HeaderReceived --> Error: RpcStreamEvent::Error
    Streaming --> Error: RpcStreamEvent::Error
    ErrorBuffering --> Error: RpcStreamEvent::Error
    Complete --> [*]
    Error --> [*]

Stream Event Processing

The recv_fn closure registered with the dispatcher handles four types of RpcStreamEvent:

Event Types and State Machine

RpcStreamEvent::Header

Received first for every RPC response. Contains RpcHeader with:

  • rpc_msg_type - Should be RpcMessageType::Response
  • rpc_request_id - Correlation ID matching the request
  • rpc_method_id - Method identifier
  • rpc_metadata_bytes - First byte contains RpcResultStatus

The recv_fn extracts RpcResultStatus from rpc_metadata_bytes[0] and stores it for subsequent processing. A readiness signal is sent via the oneshot channel to unblock the call_rpc_streaming future.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:118-135

RpcStreamEvent::PayloadChunk

Contains a chunk of the response payload. Processing depends on the previously received RpcResultStatus:

StatusBehavior
SuccessChunk sent to DynamicSender with send_and_ignore(Ok(bytes))
MethodNotFound, Fail, SystemErrorChunk buffered in error_buffer for error message construction
None (not yet received)Chunk buffered defensively

The synchronous recv_fn uses StdMutex to protect shared state (tx_arc, status, error_buffer).

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:136-174

RpcStreamEvent::End

Signals stream completion. Final actions depend on RpcResultStatus:

  1. RpcResultStatus::MethodNotFound : Constructs RpcServiceError::Rpc with RpcServiceErrorCode::NotFound and buffered error payload
  2. RpcResultStatus::Fail : Sends RpcServiceError::Rpc with RpcServiceErrorCode::Fail
  3. RpcResultStatus::SystemError : Sends RpcServiceError::Rpc with RpcServiceErrorCode::System and buffered error payload
  4. RpcResultStatus::Success : Closes the channel normally (no error sent)

The DynamicSender is taken from the Option wrapper and dropped, closing the channel.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:175-245

RpcStreamEvent::Error

Indicates a framing protocol error (e.g., malformed frames, decode errors). Sends RpcServiceError::Transport to the DynamicReceiver and also signals the readiness channel if still waiting for the header. The DynamicSender is dropped immediately.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-285


Error Handling in Streams

Error Propagation Path

Pre-Dispatch Errors

Before the dispatcher registers the request, errors are returned immediately from call_rpc_streaming():

  • Disconnected client : RpcServiceError::Transport(io::ErrorKind::ConnectionAborted)
  • Dispatcher registration failure : RpcServiceError::Transport with error details

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53 extensions/muxio-rpc-service-caller/src/caller_interface.rs:315-328

Post-Dispatch Errors

After the dispatcher registers the request, errors are sent through the DynamicReceiver stream:

  • Framing errors : RpcServiceError::Transport from RpcStreamEvent::Error
  • RPC-level errors : RpcServiceError::Rpc with appropriate RpcServiceErrorCode based on RpcResultStatus

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:185-238 extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-285


sequenceDiagram
    participant Caller as "call_rpc_streaming()"
    participant Dispatcher as "RpcDispatcher"
    participant RecvFn as "recv_fn closure"
    participant ReadyChan as "oneshot channel"
    
    Caller->>ReadyChan: Create (ready_tx, ready_rx)
    Caller->>Dispatcher: call(request, recv_fn)
    Dispatcher-->>Caller: Returns encoder
    Caller->>ReadyChan: .await on ready_rx
    
    Note over RecvFn: Transport receives response
    RecvFn->>RecvFn: RpcStreamEvent::Header
    RecvFn->>RecvFn: Extract RpcResultStatus
    RecvFn->>ReadyChan: ready_tx.send(Ok(()))
    
    ReadyChan-->>Caller: Ok(())
    Caller-->>Caller: Return (encoder, receiver)

Readiness Signaling

The call_rpc_streaming method uses a oneshot channel to signal when the RPC stream is ready to be consumed. This ensures the caller doesn't begin processing until the header has been received and the RpcResultStatus is known.

Signaling Mechanism

Signaling on Error

If an error occurs before receiving the header (e.g., RpcStreamEvent::Error), the readiness channel is signaled with Err(io::Error) instead of Ok(()).

Implementation Details

The readiness sender is stored in Arc<StdMutex<Option<oneshot::Sender>>> and taken using mem::take() when signaling to ensure it's only used once. The recv_fn closure acquires this mutex synchronously with .lock().unwrap().

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:78-80 extensions/muxio-rpc-service-caller/src/caller_interface.rs:127-134 extensions/muxio-rpc-service-caller/src/caller_interface.rs:332-348


Complete Streaming RPC Flow

End-to-End Sequence

Synchronization Points

  1. Channel Creation : DynamicSender and DynamicReceiver created synchronously in call_rpc_streaming
  2. Dispatcher Registration : RpcDispatcher::call() registers the request and creates RpcStreamEncoder
  3. Readiness Await : call_rpc_streaming blocks on ready_rx.await until header received
  4. Header Processing : First RpcStreamEvent::Header unblocks the caller
  5. Chunk Processing : Each RpcStreamEvent::PayloadChunk flows through the channel
  6. Stream Termination : RpcStreamEvent::End closes the channel

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:33-349


Integration with Transport Implementations

Tokio RPC Client Usage

The RpcClient struct in muxio-tokio-rpc-client implements RpcServiceCallerInterface, providing the transport-specific get_emit_fn() that sends binary data over the WebSocket connection.

When streaming RPC is used:

  1. call_rpc_streaming() creates the channels and registers with dispatcher
  2. get_emit_fn() sends initial request frames via tx.send(WsMessage::Binary(chunk))
  3. Receive loop processes incoming WebSocket binary messages
  4. endpoint.read_bytes() called on received bytes, which dispatches to recv_fn
  5. recv_fn forwards chunks to DynamicSender, which application receives via DynamicReceiver

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:158-178 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313

Connection State Impact

If the client disconnects during streaming:

  1. is_connected() returns false
  2. Subsequent call_rpc_streaming() attempts fail immediately with ConnectionAborted
  3. Pending streams receive RpcStreamEvent::Error from dispatcher's fail_all_pending_requests()
  4. Transport errors propagate through DynamicReceiver as RpcServiceError::Transport

Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:99-108 extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53


Testing Patterns

Mock Client Testing

Test the dynamic channel mechanism by creating mock implementations of RpcServiceCallerInterface:

Sources: extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:101-167

Integration Testing

Full integration tests with real client/server validate streaming across the WebSocket transport, testing scenarios like:

  • Large payloads chunked correctly
  • Bounded channel backpressure
  • Early disconnect cancels pending streams
  • Error status codes propagate correctly

Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:168-292

Dismiss

Refresh this wiki

Enter email to refresh