This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Streaming RPC Calls
Relevant source files
- extensions/muxio-rpc-service-caller/src/caller_interface.rs
- extensions/muxio-rpc-service-caller/src/lib.rs
- extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs
- extensions/muxio-tokio-rpc-client/src/rpc_client.rs
- extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs
Purpose and Scope
This document describes the streaming RPC mechanism in rust-muxio, which allows bidirectional data transfer over RPC calls with chunked payloads and asynchronous processing. Streaming RPC is used when responses are large, dynamic in size, or need to be processed incrementally.
For information about one-shot RPC calls with complete request/response buffers, see Prebuffered RPC Calls. For the underlying service definition traits, see Service Definitions. For client-side invocation patterns, see Service Caller Interface.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:1-406
Overview of Streaming RPC
Streaming RPC calls provide a mechanism for sending requests and receiving responses that may be too large to buffer entirely in memory, or where the response size is unknown at call time. Unlike prebuffered calls which return complete Result<T, RpcServiceError> values, streaming calls return:
RpcStreamEncoder- For sending additional payload chunks to the server after the initial requestDynamicReceiver- A stream that yieldsResult<Vec<u8>, RpcServiceError>chunks asynchronously
The streaming mechanism handles:
- Chunked payload transmission and reassembly
- Backpressure through bounded or unbounded channels
- Error propagation and early termination
- Request/response correlation across multiplexed streams
Key Distinction:
- Prebuffered RPC : Entire response buffered in memory before returning to caller
- Streaming RPC : Response chunks streamed incrementally as they arrive
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:33-73
Initiating a Streaming Call
The call_rpc_streaming method on RpcServiceCallerInterface initiates a streaming RPC call:
Method Parameters
| Parameter | Type | Description |
|---|---|---|
request | RpcRequest | Contains rpc_method_id, optional rpc_param_bytes, and optional rpc_prebuffered_payload_bytes |
dynamic_channel_type | DynamicChannelType | Specifies Bounded or Unbounded channel for response streaming |
Return Value
On success, returns a tuple containing:
RpcStreamEncoder- Used to send additional payload chunks after the initial requestDynamicReceiver- Stream that yields response chunks asResult<Vec<u8>, RpcServiceError>
On failure, returns RpcServiceError::Transport if the client is disconnected or if dispatcher registration fails.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-54
Dynamic Channel Types
The DynamicChannelType enum determines the backpressure characteristics of the response stream:
graph LR
DCT["DynamicChannelType"]
UNBOUNDED["Unbounded\nmpsc::unbounded()"]
BOUNDED["Bounded\nmpsc::channel(buffer_size)"]
DCT -->|No backpressure| UNBOUNDED
DCT -->|Backpressure at buffer_size| BOUNDED
UNBOUNDED -->|Creates| DS_UNBOUNDED["DynamicSender::Unbounded"]
UNBOUNDED -->|Creates| DR_UNBOUNDED["DynamicReceiver::Unbounded"]
BOUNDED -->|Creates| DS_BOUNDED["DynamicSender::Bounded"]
BOUNDED -->|Creates| DR_BOUNDED["DynamicReceiver::Bounded"]
Unbounded Channels
Created with DynamicChannelType::Unbounded. Uses mpsc::unbounded() internally, allowing unlimited buffering of response chunks. Suitable for:
- Fast consumers that can process chunks quickly
- Scenarios where response size is bounded and known to fit in memory
- Testing and development
Risk: Unbounded channels can lead to unbounded memory growth if the receiver is slower than the sender.
Bounded Channels
Created with DynamicChannelType::Bounded. Uses mpsc::channel(DEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE) where DEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE is typically 8. Provides backpressure when the buffer is full. Suitable for:
- Production systems with predictable memory usage
- Long-running streams with unknown total size
- Rate-limiting response processing
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:56-73 extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:50-65
RpcStreamEncoder and DynamicReceiver
RpcStreamEncoder
The RpcStreamEncoder is created by RpcDispatcher::call() and provides methods to send additional payload chunks after the initial request. It wraps an RpcEmit trait implementation that sends binary frames over the transport.
Key characteristics:
- Created with
max_chunk_sizefromDEFAULT_SERVICE_MAX_CHUNK_SIZE - Automatically chunks large payloads into frames
- Shares the same
rpc_request_idas the original request - Can send multiple chunks before finalizing the stream
DynamicReceiver
The DynamicReceiver is a unified abstraction over mpsc::UnboundedReceiver and mpsc::Receiver that implements Stream<Item = Result<Vec<u8>, RpcServiceError>>.
| Variant | Underlying Type | Backpressure |
|---|---|---|
Unbounded | mpsc::UnboundedReceiver | None |
Bounded | mpsc::Receiver | Yes |
graph TB
subgraph "Call Flow"
CALL["call_rpc_streaming()"]
DISPATCHER["RpcDispatcher::call()"]
ENCODER["RpcStreamEncoder"]
RECEIVER["DynamicReceiver"]
end
subgraph "Response Flow"
RECV_FN["recv_fn closure\n(RpcResponseHandler)"]
TX["DynamicSender"]
RX["DynamicReceiver"]
APP["Application code\n.next().await"]
end
CALL -->|Creates channel| TX
CALL -->|Creates channel| RX
CALL -->|Registers| DISPATCHER
DISPATCHER -->|Returns| ENCODER
CALL -->|Returns| RECEIVER
RECV_FN -->|send_and_ignore| TX
TX -.->|mpsc| RX
RX -->|yields chunks| APP
Both variants provide the same next() interface through the StreamExt trait, abstracting the channel type from the caller.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:56-73 extensions/muxio-rpc-service-caller/src/caller_interface.rs:289-323
stateDiagram-v2
[*] --> Waiting : recv_fn registered
Waiting --> HeaderReceived: RpcStreamEvent::Header
HeaderReceived --> Streaming: RpcResultStatus::Success
HeaderReceived --> ErrorBuffering: RpcResultStatus::MethodNotFound\nRpcResultStatus::Fail\nRpcResultStatus::SystemError
Streaming --> Streaming: RpcStreamEvent::PayloadChunk
ErrorBuffering --> ErrorBuffering: RpcStreamEvent::PayloadChunk
Streaming --> Complete: RpcStreamEvent::End
ErrorBuffering --> Complete: RpcStreamEvent::End
Waiting --> Error: RpcStreamEvent::Error
HeaderReceived --> Error: RpcStreamEvent::Error
Streaming --> Error: RpcStreamEvent::Error
ErrorBuffering --> Error: RpcStreamEvent::Error
Complete --> [*]
Error --> [*]
Stream Event Processing
The recv_fn closure registered with the dispatcher handles four types of RpcStreamEvent:
Event Types and State Machine
RpcStreamEvent::Header
Received first for every RPC response. Contains RpcHeader with:
rpc_msg_type- Should beRpcMessageType::Responserpc_request_id- Correlation ID matching the requestrpc_method_id- Method identifierrpc_metadata_bytes- First byte containsRpcResultStatus
The recv_fn extracts RpcResultStatus from rpc_metadata_bytes[0] and stores it for subsequent processing. A readiness signal is sent via the oneshot channel to unblock the call_rpc_streaming future.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:118-135
RpcStreamEvent::PayloadChunk
Contains a chunk of the response payload. Processing depends on the previously received RpcResultStatus:
| Status | Behavior |
|---|---|
Success | Chunk sent to DynamicSender with send_and_ignore(Ok(bytes)) |
MethodNotFound, Fail, SystemError | Chunk buffered in error_buffer for error message construction |
None (not yet received) | Chunk buffered defensively |
The synchronous recv_fn uses StdMutex to protect shared state (tx_arc, status, error_buffer).
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:136-174
RpcStreamEvent::End
Signals stream completion. Final actions depend on RpcResultStatus:
RpcResultStatus::MethodNotFound: ConstructsRpcServiceError::RpcwithRpcServiceErrorCode::NotFoundand buffered error payloadRpcResultStatus::Fail: SendsRpcServiceError::RpcwithRpcServiceErrorCode::FailRpcResultStatus::SystemError: SendsRpcServiceError::RpcwithRpcServiceErrorCode::Systemand buffered error payloadRpcResultStatus::Success: Closes the channel normally (no error sent)
The DynamicSender is taken from the Option wrapper and dropped, closing the channel.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:175-245
RpcStreamEvent::Error
Indicates a framing protocol error (e.g., malformed frames, decode errors). Sends RpcServiceError::Transport to the DynamicReceiver and also signals the readiness channel if still waiting for the header. The DynamicSender is dropped immediately.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-285
Error Handling in Streams
Error Propagation Path
Pre-Dispatch Errors
Before the dispatcher registers the request, errors are returned immediately from call_rpc_streaming():
- Disconnected client :
RpcServiceError::Transport(io::ErrorKind::ConnectionAborted) - Dispatcher registration failure :
RpcServiceError::Transportwith error details
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53 extensions/muxio-rpc-service-caller/src/caller_interface.rs:315-328
Post-Dispatch Errors
After the dispatcher registers the request, errors are sent through the DynamicReceiver stream:
- Framing errors :
RpcServiceError::TransportfromRpcStreamEvent::Error - RPC-level errors :
RpcServiceError::Rpcwith appropriateRpcServiceErrorCodebased onRpcResultStatus
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:185-238 extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-285
sequenceDiagram
participant Caller as "call_rpc_streaming()"
participant Dispatcher as "RpcDispatcher"
participant RecvFn as "recv_fn closure"
participant ReadyChan as "oneshot channel"
Caller->>ReadyChan: Create (ready_tx, ready_rx)
Caller->>Dispatcher: call(request, recv_fn)
Dispatcher-->>Caller: Returns encoder
Caller->>ReadyChan: .await on ready_rx
Note over RecvFn: Transport receives response
RecvFn->>RecvFn: RpcStreamEvent::Header
RecvFn->>RecvFn: Extract RpcResultStatus
RecvFn->>ReadyChan: ready_tx.send(Ok(()))
ReadyChan-->>Caller: Ok(())
Caller-->>Caller: Return (encoder, receiver)
Readiness Signaling
The call_rpc_streaming method uses a oneshot channel to signal when the RPC stream is ready to be consumed. This ensures the caller doesn't begin processing until the header has been received and the RpcResultStatus is known.
Signaling Mechanism
Signaling on Error
If an error occurs before receiving the header (e.g., RpcStreamEvent::Error), the readiness channel is signaled with Err(io::Error) instead of Ok(()).
Implementation Details
The readiness sender is stored in Arc<StdMutex<Option<oneshot::Sender>>> and taken using mem::take() when signaling to ensure it's only used once. The recv_fn closure acquires this mutex synchronously with .lock().unwrap().
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:78-80 extensions/muxio-rpc-service-caller/src/caller_interface.rs:127-134 extensions/muxio-rpc-service-caller/src/caller_interface.rs:332-348
Complete Streaming RPC Flow
End-to-End Sequence
Synchronization Points
- Channel Creation :
DynamicSenderandDynamicReceivercreated synchronously incall_rpc_streaming - Dispatcher Registration :
RpcDispatcher::call()registers the request and createsRpcStreamEncoder - Readiness Await :
call_rpc_streamingblocks onready_rx.awaituntil header received - Header Processing : First
RpcStreamEvent::Headerunblocks the caller - Chunk Processing : Each
RpcStreamEvent::PayloadChunkflows through the channel - Stream Termination :
RpcStreamEvent::Endcloses the channel
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:33-349
Integration with Transport Implementations
Tokio RPC Client Usage
The RpcClient struct in muxio-tokio-rpc-client implements RpcServiceCallerInterface, providing the transport-specific get_emit_fn() that sends binary data over the WebSocket connection.
When streaming RPC is used:
call_rpc_streaming()creates the channels and registers with dispatcherget_emit_fn()sends initial request frames viatx.send(WsMessage::Binary(chunk))- Receive loop processes incoming WebSocket binary messages
endpoint.read_bytes()called on received bytes, which dispatches torecv_fnrecv_fnforwards chunks toDynamicSender, which application receives viaDynamicReceiver
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:158-178 extensions/muxio-tokio-rpc-client/src/rpc_client.rs:289-313
Connection State Impact
If the client disconnects during streaming:
is_connected()returnsfalse- Subsequent
call_rpc_streaming()attempts fail immediately withConnectionAborted - Pending streams receive
RpcStreamEvent::Errorfrom dispatcher'sfail_all_pending_requests() - Transport errors propagate through
DynamicReceiverasRpcServiceError::Transport
Sources: extensions/muxio-tokio-rpc-client/src/rpc_client.rs:99-108 extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53
Testing Patterns
Mock Client Testing
Test the dynamic channel mechanism by creating mock implementations of RpcServiceCallerInterface:
Sources: extensions/muxio-rpc-service-caller/tests/dynamic_channel_tests.rs:101-167
Integration Testing
Full integration tests with real client/server validate streaming across the WebSocket transport, testing scenarios like:
- Large payloads chunked correctly
- Bounded channel backpressure
- Early disconnect cancels pending streams
- Error status codes propagate correctly
Sources: extensions/muxio-tokio-rpc-client/tests/transport_state_tests.rs:168-292
Dismiss
Refresh this wiki
Enter email to refresh