Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Prebuffered RPC Calls

Relevant source files

Purpose and Scope

This page documents the prebuffered RPC mechanism in the muxio system, which provides a complete request/response pattern for RPC calls. Prebuffered calls send the entire request payload upfront, wait for the complete response, and return a typed result. This is the simplest and most common RPC pattern in the system.

For information about defining RPC methods, see Service Definitions. For streaming RPC calls that handle chunked data incrementally, see Streaming RPC Calls. For the underlying client and server interfaces, see Service Caller Interface and Service Endpoint Interface.

Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:1-99

Overview of Prebuffered RPC

Prebuffered RPC calls represent a synchronous, request-response communication pattern where:

  1. The client encodes the entire request before sending
  2. The request is transmitted to the server (potentially in chunks if large)
  3. The server processes the complete request and produces a response
  4. The response is transmitted back to the client (potentially in chunks if large)
  5. The client decodes and returns the typed response
sequenceDiagram
    participant App as "Application Code"
    participant Trait as "RpcCallPrebuffered::call()"
    participant Encode as "encode_request()"
    participant Client as "RpcServiceCallerInterface"
    participant Network as "Network Transport"
    participant Server as "RPC Server"
    participant Decode as "decode_response()"
    
    App->>Trait: Add::call(&client, vec![1.0, 2.0, 3.0])
    Trait->>Encode: encode_request(vec![1.0, 2.0, 3.0])
    Encode-->>Trait: encoded_bytes
    
    alt "Small payload (<64KB)"
        Trait->>Trait: rpc_param_bytes = Some(encoded_bytes)
        Trait->>Trait: rpc_prebuffered_payload_bytes = None
    else "Large payload (>=64KB)"
        Trait->>Trait: rpc_param_bytes = None
        Trait->>Trait: rpc_prebuffered_payload_bytes = Some(encoded_bytes)
    end
    
    Trait->>Client: call_rpc_buffered(RpcRequest)
    Client->>Network: transmit (chunked if needed)
    Network->>Server: receive and reassemble
    Server->>Server: process request
    Server->>Network: transmit response (chunked if needed)
    Network->>Client: receive and reassemble
    Client-->>Trait: Result<Vec<u8>, RpcServiceError>
    Trait->>Decode: decode_response(response_bytes)
    Decode-->>Trait: Result<f64, io::Error>
    Trait-->>App: Result<f64, RpcServiceError>

The term "prebuffered" refers to the fact that both request and response payloads are fully buffered before being processed by application code, as opposed to streaming approaches where data is processed incrementally.

Diagram: Complete Prebuffered RPC Call Flow

Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:30-98 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:18-97

The RpcCallPrebuffered Trait

The RpcCallPrebuffered trait provides the high-level interface for making prebuffered RPC calls. It is automatically implemented for any type that implements RpcMethodPrebuffered.

Diagram: RpcCallPrebuffered Trait Hierarchy

graph TB
    subgraph "Trait Definition"
        RpcCallPrebuffered["RpcCallPrebuffered\n(trait)"]
call_method["call()\n(async method)"]
end
    
    subgraph "Trait Bounds"
        RpcMethodPrebuffered["RpcMethodPrebuffered\n(provides encode/decode)"]
Send["Send + Sync\n(thread-safe)"]
sized["Sized\n(known size)"]
end
    
    subgraph "Blanket Implementation"
        blanket["impl&lt;T&gt; RpcCallPrebuffered for T\nwhere T: RpcMethodPrebuffered"]
end
    
    subgraph "Example Types"
        Add["Add\n(example service)"]
Mult["Mult\n(example service)"]
Echo["Echo\n(example service)"]
end
    
 
   RpcCallPrebuffered --> call_method
 
   blanket --> RpcCallPrebuffered
 
   RpcMethodPrebuffered --> blanket
 
   Send --> blanket
 
   sized --> blanket
    
    Add -.implements.-> RpcMethodPrebuffered
    Mult -.implements.-> RpcMethodPrebuffered
    Echo -.implements.-> RpcMethodPrebuffered
    
    Add -.gets.-> RpcCallPrebuffered
    Mult -.gets.-> RpcCallPrebuffered
    Echo -.gets.-> RpcCallPrebuffered

The trait signature is:

TraitMethodParametersReturns
RpcCallPrebufferedcall()rpc_client: &C, input: Self::InputResult<Self::Output, RpcServiceError>

Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:10-21 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:23-28

Request Encoding and Transport Strategy

The RpcCallPrebuffered::call() implementation uses a smart transport strategy to handle arguments of any size. This strategy is necessary because the RPC header frame has a maximum size limit (typically 64KB).

Diagram: Argument Size-Based Transport Selection

graph TB
    start["encode_request(input)"]
check{"encoded_args.len()\n&gt;= DEFAULT_SERVICE_MAX_CHUNK_SIZE?"}
small["Small Arguments Path"]
large["Large Arguments Path"]
param_bytes["rpc_param_bytes = Some(encoded_args)\nrpc_prebuffered_payload_bytes = None"]
payload_bytes["rpc_param_bytes = None\nrpc_prebuffered_payload_bytes = Some(encoded_args)"]
create_request["Create RpcRequest\nwith is_finalized = true"]
send["call_rpc_buffered(request)"]
start --> check
 
   check -->|No < 64KB| small
 
   check -->|Yes &gt;= 64KB| large
 
   small --> param_bytes
 
   large --> payload_bytes
 
   param_bytes --> create_request
 
   payload_bytes --> create_request
 
   create_request --> send

Small Arguments Path (< 64KB)

When encoded arguments are smaller than DEFAULT_SERVICE_MAX_CHUNK_SIZE:

  • Arguments are placed in RpcRequest.rpc_param_bytes
  • The entire request (header + arguments) is transmitted in a single frame
  • Most efficient for typical RPC calls

Large Arguments Path (>= 64KB)

When encoded arguments exceed the chunk size:

  • Arguments are placed in RpcRequest.rpc_prebuffered_payload_bytes
  • The RpcDispatcher automatically chunks the payload
  • The header is sent first, followed by payload chunks
  • The server reassembles chunks before invoking the handler

RpcRequest Structure

FieldTypePurpose
rpc_method_idu64Compile-time generated method identifier
rpc_param_bytesOption<Vec<u8>>Small arguments sent in header
rpc_prebuffered_payload_bytesOption<Vec<u8>>Large arguments sent as chunked payload
is_finalizedboolAlways true for prebuffered calls

Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:49-73 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:30-48

graph LR
    subgraph "Server Processing"
        handler["Method Handler\nprocess request"]
encode_response["encode_response(result)"]
end
    
    subgraph "Network Layer"
        chunk["Automatic Chunking\n(if response > 64KB)"]
reassemble["Automatic Reassembly\n(client-side)"]
end
    
    subgraph "Client Processing"
        call_buffered["call_rpc_buffered()"]
nested_result["Result&lt;Result&lt;Output, io::Error&gt;, RpcServiceError&gt;"]
decode["decode_response(bytes)"]
flatten["Flatten nested Result"]
final["Result&lt;Output, RpcServiceError&gt;"]
end
    
 
   handler --> encode_response
 
   encode_response --> chunk
 
   chunk --> reassemble
 
   reassemble --> call_buffered
 
   call_buffered --> nested_result
 
   nested_result --> decode
 
   decode --> flatten
 
   flatten --> final

Response Handling and Decoding

Once the request is sent via call_rpc_buffered(), the client waits for the complete response. The response may be chunked during transmission, but call_rpc_buffered() handles reassembly transparently.

Diagram: Response Processing Pipeline

The response handling involves nested Result types:

  1. Outer Result : Result<_, RpcServiceError> - Indicates whether the RPC infrastructure succeeded

    • Ok: The request was sent and a response was received
    • Err: Network error, serialization error, or remote RPC error
  2. Inner Result : Result<Output, io::Error> - Indicates whether decoding succeeded

    • Ok: The response was successfully decoded into the typed output
    • Err: Deserialization error in decode_response()

The call() method flattens these nested results and converts the inner io::Error to RpcServiceError::Transport.

Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:75-96

Error Propagation

Prebuffered RPC calls can fail at multiple stages, all represented by RpcServiceError:

Error TypeCauseExample
RpcServiceError::Rpc(code: NotFound)Method not registered on serverCalling unregistered method
RpcServiceError::Rpc(code: System)Server handler returned errorHandler logic failure
RpcServiceError::Rpc(code: Fail)Application-level errorBusiness logic error
RpcServiceError::TransportNetwork or serialization errorConnection closed, decode failure
RpcServiceError::CancelledRequest cancelledClient-side cancellation

Diagram: Error Propagation Through Prebuffered Call

Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:135-177 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:99-152 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:205-240

Usage Examples

Basic Prebuffered Call

This single line:

  1. Encodes the Vec<f64> using Add::encode_request()
  2. Creates an RpcRequest with METHOD_ID and encoded parameters
  3. Transmits the request to the server
  4. Waits for and receives the complete response
  5. Decodes the response using Add::decode_response()
  6. Returns the typed f64 result

Concurrent Prebuffered Calls

Multiple prebuffered calls can be made concurrently over a single connection:

Each call is assigned a unique request ID by the RpcDispatcher, allowing responses to be correlated correctly even when they arrive out of order.

Large Payload Handling

The prebuffered mechanism transparently handles large payloads:

The chunking and reassembly happen automatically in the RpcDispatcher and RpcStreamEncoder/RpcStreamDecoder layers.

Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:81-96 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:154-203

Testing Prebuffered Calls

graph LR
    subgraph "Test Setup"
        MockClient["MockRpcClient\n(implements RpcServiceCallerInterface)"]
SharedSender["Arc&lt;Mutex&lt;Option&lt;DynamicSender&gt;&gt;&gt;\n(response injection)"]
AtomicBool["Arc&lt;AtomicBool&gt;\n(connection state)"]
end
    
    subgraph "Test Execution"
        TestCode["Test code calls\nEcho::call(&mock_client, ...)"]
Background["Background task\ninjects response"]
end
    
    subgraph "Verification"
        Assert["Assert result matches\nexpected value or error"]
end
    
 
   MockClient --> SharedSender
 
   MockClient --> AtomicBool
 
   TestCode --> MockClient
 
   Background --> SharedSender
 
   TestCode --> Assert

Unit Testing with Mock Clients

The prebuffered_caller_tests.rs file demonstrates testing RpcCallPrebuffered with a mock client implementation:

Diagram: Mock Client Testing Architecture

The mock client implementation:

  • Returns a dummy RpcDispatcher from get_dispatcher()
  • Returns a no-op emit function from get_emit_fn()
  • Provides a DynamicSender via shared state for response injection
  • Allows control of is_connected() state via AtomicBool

Integration Testing with Real Server

Integration tests use a real RpcServer and real clients (both Tokio and WASM):

TestPurposeFiles
test_success_client_server_roundtripValidates successful RPC callstokio:19-97 wasm:39-142
test_error_client_server_roundtripValidates error propagationtokio:99-152 wasm:144-227
test_large_prebuffered_payload_roundtripValidates chunked transmissiontokio:154-203 wasm:229-312
test_method_not_found_errorValidates NotFound error codetokio:205-240
graph LR
    WasmClient["RpcWasmClient\n(test subject)"]
Bridge["WebSocket Bridge\n(test harness)"]
TokioServer["RpcServer\n(real server)"]
WasmClient -->|emit callback| Bridge
 
   Bridge -->|WebSocket frames| TokioServer
 
   TokioServer -->|WebSocket frames| Bridge
 
   Bridge -->|handle_message| WasmClient

The WASM integration tests use a WebSocket bridge to connect the RpcWasmClient to a real RpcServer:

Diagram: WASM Integration Test Architecture

Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:20-93 extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:97-133 extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:18-97 extensions/muxio-wasm-rpc-client/tests/prebuffered_integration_tests.rs:1-20

Key Implementation Details

Finalization Requirement

All prebuffered calls set is_finalized: true in the RpcRequest. This signals to the RpcDispatcher that no additional data will be sent after the initial request (and optional prebuffered payload), allowing it to optimize resource management.

Decode Closure Pattern

The call() method creates a decode closure and passes it to call_rpc_buffered():

This pattern allows the generic call_rpc_buffered() method to decode the response without knowing the specific output type.

Instrumentation

The call() method uses the #[instrument(skip(rpc_client, input))] attribute from the tracing crate, providing detailed logging at various trace levels:

  • debug: Method ID, entry/exit points
  • trace: Request structure, result details
  • warn: Large payload detection

Sources: extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:49-98 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs71 extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs:75-76

Dismiss

Refresh this wiki

Enter email to refresh