This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Prebuffered RPC Calls
Relevant source files
- extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs
- extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs
- extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs
- extensions/muxio-wasm-rpc-client/tests/prebuffered_integration_tests.rs
Purpose and Scope
This page documents the prebuffered RPC mechanism in the muxio system, which provides a complete request/response pattern for RPC calls. Prebuffered calls send the entire request payload upfront, wait for the complete response, and return a typed result. This is the simplest and most common RPC pattern in the system.
For information about defining RPC methods, see Service Definitions. For streaming RPC calls that handle chunked data incrementally, see Streaming RPC Calls. For the underlying client and server interfaces, see Service Caller Interface and Service Endpoint Interface.
Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:1-99
Overview of Prebuffered RPC
Prebuffered RPC calls represent a synchronous, request-response communication pattern where:
- The client encodes the entire request before sending
- The request is transmitted to the server (potentially in chunks if large)
- The server processes the complete request and produces a response
- The response is transmitted back to the client (potentially in chunks if large)
- The client decodes and returns the typed response
sequenceDiagram
participant App as "Application Code"
participant Trait as "RpcCallPrebuffered::call()"
participant Encode as "encode_request()"
participant Client as "RpcServiceCallerInterface"
participant Network as "Network Transport"
participant Server as "RPC Server"
participant Decode as "decode_response()"
App->>Trait: Add::call(&client, vec![1.0, 2.0, 3.0])
Trait->>Encode: encode_request(vec![1.0, 2.0, 3.0])
Encode-->>Trait: encoded_bytes
alt "Small payload (<64KB)"
Trait->>Trait: rpc_param_bytes = Some(encoded_bytes)
Trait->>Trait: rpc_prebuffered_payload_bytes = None
else "Large payload (>=64KB)"
Trait->>Trait: rpc_param_bytes = None
Trait->>Trait: rpc_prebuffered_payload_bytes = Some(encoded_bytes)
end
Trait->>Client: call_rpc_buffered(RpcRequest)
Client->>Network: transmit (chunked if needed)
Network->>Server: receive and reassemble
Server->>Server: process request
Server->>Network: transmit response (chunked if needed)
Network->>Client: receive and reassemble
Client-->>Trait: Result<Vec<u8>, RpcServiceError>
Trait->>Decode: decode_response(response_bytes)
Decode-->>Trait: Result<f64, io::Error>
Trait-->>App: Result<f64, RpcServiceError>
The term "prebuffered" refers to the fact that both request and response payloads are fully buffered before being processed by application code, as opposed to streaming approaches where data is processed incrementally.
Diagram: Complete Prebuffered RPC Call Flow
Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:30-98 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:18-97
The RpcCallPrebuffered Trait
The RpcCallPrebuffered trait provides the high-level interface for making prebuffered RPC calls. It is automatically implemented for any type that implements RpcMethodPrebuffered.
Diagram: RpcCallPrebuffered Trait Hierarchy
graph TB
subgraph "Trait Definition"
RpcCallPrebuffered["RpcCallPrebuffered\n(trait)"]
call_method["call()\n(async method)"]
end
subgraph "Trait Bounds"
RpcMethodPrebuffered["RpcMethodPrebuffered\n(provides encode/decode)"]
Send["Send + Sync\n(thread-safe)"]
sized["Sized\n(known size)"]
end
subgraph "Blanket Implementation"
blanket["impl<T> RpcCallPrebuffered for T\nwhere T: RpcMethodPrebuffered"]
end
subgraph "Example Types"
Add["Add\n(example service)"]
Mult["Mult\n(example service)"]
Echo["Echo\n(example service)"]
end
RpcCallPrebuffered --> call_method
blanket --> RpcCallPrebuffered
RpcMethodPrebuffered --> blanket
Send --> blanket
sized --> blanket
Add -.implements.-> RpcMethodPrebuffered
Mult -.implements.-> RpcMethodPrebuffered
Echo -.implements.-> RpcMethodPrebuffered
Add -.gets.-> RpcCallPrebuffered
Mult -.gets.-> RpcCallPrebuffered
Echo -.gets.-> RpcCallPrebuffered
The trait signature is:
| Trait | Method | Parameters | Returns |
|---|---|---|---|
RpcCallPrebuffered | call() | rpc_client: &C, input: Self::Input | Result<Self::Output, RpcServiceError> |
Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:10-21 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:23-28
Request Encoding and Transport Strategy
The RpcCallPrebuffered::call() implementation uses a smart transport strategy to handle arguments of any size. This strategy is necessary because the RPC header frame has a maximum size limit (typically 64KB).
Diagram: Argument Size-Based Transport Selection
graph TB
start["encode_request(input)"]
check{"encoded_args.len()\n>= DEFAULT_SERVICE_MAX_CHUNK_SIZE?"}
small["Small Arguments Path"]
large["Large Arguments Path"]
param_bytes["rpc_param_bytes = Some(encoded_args)\nrpc_prebuffered_payload_bytes = None"]
payload_bytes["rpc_param_bytes = None\nrpc_prebuffered_payload_bytes = Some(encoded_args)"]
create_request["Create RpcRequest\nwith is_finalized = true"]
send["call_rpc_buffered(request)"]
start --> check
check -->|No < 64KB| small
check -->|Yes >= 64KB| large
small --> param_bytes
large --> payload_bytes
param_bytes --> create_request
payload_bytes --> create_request
create_request --> send
Small Arguments Path (< 64KB)
When encoded arguments are smaller than DEFAULT_SERVICE_MAX_CHUNK_SIZE:
- Arguments are placed in
RpcRequest.rpc_param_bytes - The entire request (header + arguments) is transmitted in a single frame
- Most efficient for typical RPC calls
Large Arguments Path (>= 64KB)
When encoded arguments exceed the chunk size:
- Arguments are placed in
RpcRequest.rpc_prebuffered_payload_bytes - The
RpcDispatcherautomatically chunks the payload - The header is sent first, followed by payload chunks
- The server reassembles chunks before invoking the handler
RpcRequest Structure
| Field | Type | Purpose |
|---|---|---|
rpc_method_id | u64 | Compile-time generated method identifier |
rpc_param_bytes | Option<Vec<u8>> | Small arguments sent in header |
rpc_prebuffered_payload_bytes | Option<Vec<u8>> | Large arguments sent as chunked payload |
is_finalized | bool | Always true for prebuffered calls |
Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:49-73 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:30-48
graph LR
subgraph "Server Processing"
handler["Method Handler\nprocess request"]
encode_response["encode_response(result)"]
end
subgraph "Network Layer"
chunk["Automatic Chunking\n(if response > 64KB)"]
reassemble["Automatic Reassembly\n(client-side)"]
end
subgraph "Client Processing"
call_buffered["call_rpc_buffered()"]
nested_result["Result<Result<Output, io::Error>, RpcServiceError>"]
decode["decode_response(bytes)"]
flatten["Flatten nested Result"]
final["Result<Output, RpcServiceError>"]
end
handler --> encode_response
encode_response --> chunk
chunk --> reassemble
reassemble --> call_buffered
call_buffered --> nested_result
nested_result --> decode
decode --> flatten
flatten --> final
Response Handling and Decoding
Once the request is sent via call_rpc_buffered(), the client waits for the complete response. The response may be chunked during transmission, but call_rpc_buffered() handles reassembly transparently.
Diagram: Response Processing Pipeline
The response handling involves nested Result types:
-
Outer Result :
Result<_, RpcServiceError>- Indicates whether the RPC infrastructure succeededOk: The request was sent and a response was receivedErr: Network error, serialization error, or remote RPC error
-
Inner Result :
Result<Output, io::Error>- Indicates whether decoding succeededOk: The response was successfully decoded into the typed outputErr: Deserialization error indecode_response()
The call() method flattens these nested results and converts the inner io::Error to RpcServiceError::Transport.
Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:75-96
Error Propagation
Prebuffered RPC calls can fail at multiple stages, all represented by RpcServiceError:
| Error Type | Cause | Example |
|---|---|---|
RpcServiceError::Rpc(code: NotFound) | Method not registered on server | Calling unregistered method |
RpcServiceError::Rpc(code: System) | Server handler returned error | Handler logic failure |
RpcServiceError::Rpc(code: Fail) | Application-level error | Business logic error |
RpcServiceError::Transport | Network or serialization error | Connection closed, decode failure |
RpcServiceError::Cancelled | Request cancelled | Client-side cancellation |
Diagram: Error Propagation Through Prebuffered Call
Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:135-177 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:99-152 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:205-240
Usage Examples
Basic Prebuffered Call
This single line:
- Encodes the
Vec<f64>usingAdd::encode_request() - Creates an
RpcRequestwithMETHOD_IDand encoded parameters - Transmits the request to the server
- Waits for and receives the complete response
- Decodes the response using
Add::decode_response() - Returns the typed
f64result
Concurrent Prebuffered Calls
Multiple prebuffered calls can be made concurrently over a single connection:
Each call is assigned a unique request ID by the RpcDispatcher, allowing responses to be correlated correctly even when they arrive out of order.
Large Payload Handling
The prebuffered mechanism transparently handles large payloads:
The chunking and reassembly happen automatically in the RpcDispatcher and RpcStreamEncoder/RpcStreamDecoder layers.
Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:81-96 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:154-203
Testing Prebuffered Calls
graph LR
subgraph "Test Setup"
MockClient["MockRpcClient\n(implements RpcServiceCallerInterface)"]
SharedSender["Arc<Mutex<Option<DynamicSender>>>\n(response injection)"]
AtomicBool["Arc<AtomicBool>\n(connection state)"]
end
subgraph "Test Execution"
TestCode["Test code calls\nEcho::call(&mock_client, ...)"]
Background["Background task\ninjects response"]
end
subgraph "Verification"
Assert["Assert result matches\nexpected value or error"]
end
MockClient --> SharedSender
MockClient --> AtomicBool
TestCode --> MockClient
Background --> SharedSender
TestCode --> Assert
Unit Testing with Mock Clients
The prebuffered_caller_tests.rs file demonstrates testing RpcCallPrebuffered with a mock client implementation:
Diagram: Mock Client Testing Architecture
The mock client implementation:
- Returns a dummy
RpcDispatcherfromget_dispatcher() - Returns a no-op emit function from
get_emit_fn() - Provides a
DynamicSendervia shared state for response injection - Allows control of
is_connected()state viaAtomicBool
Integration Testing with Real Server
Integration tests use a real RpcServer and real clients (both Tokio and WASM):
| Test | Purpose | Files |
|---|---|---|
test_success_client_server_roundtrip | Validates successful RPC calls | tokio:19-97 wasm:39-142 |
test_error_client_server_roundtrip | Validates error propagation | tokio:99-152 wasm:144-227 |
test_large_prebuffered_payload_roundtrip | Validates chunked transmission | tokio:154-203 wasm:229-312 |
test_method_not_found_error | Validates NotFound error code | tokio:205-240 |
graph LR
WasmClient["RpcWasmClient\n(test subject)"]
Bridge["WebSocket Bridge\n(test harness)"]
TokioServer["RpcServer\n(real server)"]
WasmClient -->|emit callback| Bridge
Bridge -->|WebSocket frames| TokioServer
TokioServer -->|WebSocket frames| Bridge
Bridge -->|handle_message| WasmClient
The WASM integration tests use a WebSocket bridge to connect the RpcWasmClient to a real RpcServer:
Diagram: WASM Integration Test Architecture
Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:20-93 extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:97-133 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:18-97 extensions/muxio-wasm-rpc-client/tests/prebuffered_integration_tests.rs:1-20
Key Implementation Details
Finalization Requirement
All prebuffered calls set is_finalized: true in the RpcRequest. This signals to the RpcDispatcher that no additional data will be sent after the initial request (and optional prebuffered payload), allowing it to optimize resource management.
Decode Closure Pattern
The call() method creates a decode closure and passes it to call_rpc_buffered():
This pattern allows the generic call_rpc_buffered() method to decode the response without knowing the specific output type.
Instrumentation
The call() method uses the #[instrument(skip(rpc_client, input))] attribute from the tracing crate, providing detailed logging at various trace levels:
debug: Method ID, entry/exit pointstrace: Request structure, result detailswarn: Large payload detection
Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:49-98 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs71 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:75-76
Dismiss
Refresh this wiki
Enter email to refresh