Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Service Caller Interface

Relevant source files

Purpose and Scope

The Service Caller Interface defines the client-side abstraction for making RPC calls in the rust-muxio system. This interface, defined by the RpcServiceCallerInterface trait in the muxio-rpc-service-caller crate, provides the core logic for encoding requests, managing response streams, and handling errors that is shared by all client implementations.

This page covers the client-side RPC invocation mechanism. For server-side request handling, see Service Endpoint Interface. For information on defining RPC methods, see Service Definitions. For details on using prebuffered methods with this interface, see Prebuffered RPC Calls. For concrete implementations of this interface, see Tokio RPC Client and WASM RPC Client.

Sources: extensions/muxio-rpc-service-caller/Cargo.toml:1-22


Trait Definition

The RpcServiceCallerInterface trait defines the contract that all RPC clients must implement. It is runtime-agnostic and transport-agnostic, focusing solely on the mechanics of RPC invocation.

Core Methods

MethodReturn TypePurpose
get_dispatcher()Arc<TokioMutex<RpcDispatcher<'static>>>Provides access to the RPC dispatcher for request management
get_emit_fn()Arc<dyn Fn(Vec<u8>) + Send + Sync>Returns function to transmit encoded bytes to transport layer
is_connected()boolChecks current transport connection state
call_rpc_streaming()Result<(RpcStreamEncoder, DynamicReceiver), RpcServiceError>Initiates streaming RPC call with chunked response handling
call_rpc_buffered()Result<(RpcStreamEncoder, Result<T, RpcServiceError>), RpcServiceError>Initiates buffered RPC call that collects complete response
set_state_change_handler()asyncRegisters callback for transport state changes

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-405


RPC Call Flow

The following diagram illustrates how an RPC call flows through the service caller interface:

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-349

sequenceDiagram
    participant App as "Application Code"
    participant Trait as "RpcServiceCallerInterface"
    participant Dispatcher as "RpcDispatcher"
    participant SendFn as "send_fn Closure"
    participant RecvFn as "recv_fn Closure"
    participant Transport as "Transport Layer"
    participant DynChan as "DynamicChannel"
    
    App->>Trait: call_rpc_streaming(RpcRequest)
    Trait->>Trait: Check is_connected()
    
    alt Not Connected
        Trait-->>App: Err(ConnectionAborted)
    end
    
    Trait->>DynChan: Create channel (Bounded/Unbounded)
    Trait->>SendFn: Create emission closure
    Trait->>RecvFn: Create response handler
    Trait->>Dispatcher: dispatcher.call(request, send_fn, recv_fn)
    Dispatcher-->>Trait: RpcStreamEncoder
    
    Trait->>App: Wait on readiness channel
    
    Transport->>RecvFn: RpcStreamEvent::Header
    RecvFn->>RecvFn: Extract RpcResultStatus
    RecvFn->>Trait: Signal ready via oneshot
    
    Trait-->>App: Return (encoder, receiver)
    
    loop For each chunk
        Transport->>RecvFn: RpcStreamEvent::PayloadChunk
        RecvFn->>DynChan: Send Ok(bytes) or buffer error
    end
    
    Transport->>RecvFn: RpcStreamEvent::End
    RecvFn->>RecvFn: Check final status
    
    alt Success
        RecvFn->>DynChan: Close channel
    else Error
        RecvFn->>DynChan: Send Err(RpcServiceError)
    end

Streaming RPC Calls

Method Signature

The call_rpc_streaming method is the foundation for all RPC invocations:

Channel Types

The method accepts a DynamicChannelType parameter that determines the channel behavior:

Channel TypeBuffer SizeUse Case
UnboundedUnlimitedLarge or unpredictable response sizes
BoundedDEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE (8)Controlled memory usage with backpressure

Components Created

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:33-73 extensions/muxio-rpc-service-caller/src/caller_interface.rs:77-96

Connection State Validation

Before initiating any RPC call, the interface checks connection state:

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53


Buffered RPC Calls

The call_rpc_buffered method builds on call_rpc_streaming to provide a simpler interface for methods that return complete responses:

Method Signature

Buffering Strategy

The method accumulates all response chunks into a buffer, then applies the decode function once:

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:351-399


Response Handler Implementation

The recv_fn closure implements the response handling logic by processing RpcStreamEvent types:

stateDiagram-v2
    [*] --> WaitingForHeader
    
    WaitingForHeader --> ReadySignaled: RpcStreamEvent::Header
    ReadySignaled --> ProcessingChunks : Extract RpcResultStatus
    
    ProcessingChunks --> ProcessingChunks: RpcStreamEvent::PayloadChunk\n(Success → DynamicSender\nError → error_buffer)
    
    ProcessingChunks --> Completed: RpcStreamEvent::End
    ProcessingChunks --> ErrorState: RpcStreamEvent::Error
    
    Completed --> [*] : Close DynamicSender
    ErrorState --> [*] : Send error and close

Event Processing Flow

Status Handling

RpcResultStatusAction
SuccessForward payload chunks to DynamicSender
MethodNotFoundBuffer payload, send RpcServiceError::Rpc with NotFound code
FailSend RpcServiceError::Rpc with Fail code
SystemErrorBuffer error message, send RpcServiceError::Rpc with System code

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:102-287

Mutex Usage Pattern

The response handler uses std::sync::Mutex (not tokio::sync::Mutex) because it executes in a synchronous context:

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:75-115


Error Handling Strategy

Error Types

Error Propagation Points

Error SourceWhenError Type
Disconnected clientBefore call_rpc_streamingRpcServiceError::Transport(ConnectionAborted)
Dispatcher failureDuring dispatcher.call()RpcServiceError::Transport(io::Error::other)
Readiness timeoutNo header receivedRpcServiceError::Transport("channel closed prematurely")
Frame decode errorRpcStreamEvent::ErrorRpcServiceError::Transport(frame_decode_error)
Remote RPC errorRpcStreamEvent::End with error statusRpcServiceError::Rpc(code + message)

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-52 extensions/muxio-rpc-service-caller/src/caller_interface.rs:186-238 extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-284


Implementation Requirements

Required Dependencies

Implementations of RpcServiceCallerInterface must maintain:

  1. RpcDispatcher instance - For managing request correlation and stream multiplexing
  2. Emit function - For transmitting encoded bytes to the transport layer
  3. Connection state - Boolean flag tracked by transport implementation

Trait Bounds

The trait requires Send + Sync for async context compatibility:

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-26


Integration with Service Definitions

The interface integrates with RpcMethodPrebuffered through the RpcCallPrebuffered extension trait, providing type-safe method invocation:

graph LR
    subgraph "Service Definition"
        Method["RpcMethodPrebuffered\ne.g., Echo"]
MethodID["METHOD_ID: u64"]
Encode["encode_request()"]
Decode["decode_response()"]
end
    
    subgraph "Caller Extension"
        CallTrait["RpcCallPrebuffered"]
CallFn["call(client, params)"]
end
    
    subgraph "Caller Interface"
        Interface["RpcServiceCallerInterface"]
Buffered["call_rpc_buffered()"]
end
    
 
   Method --> MethodID
 
   Method --> Encode
 
   Method --> Decode
    
 
   CallTrait --> Method
 
   CallFn --> Encode
 
   CallFn --> Decode
    
 
   CallFn --> Interface
 
   Interface --> Buffered

Example usage from tests:

Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs203


Mock Implementation for Testing

The test suite demonstrates a minimal implementation:

The mock stores a shared DynamicSender that tests can use to inject response data:

Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:20-93


State Change Handling

The interface provides a method for registering transport state change callbacks:

This allows applications to react to connection state transitions. See Transport State Management for details on state transitions.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:401-405


Relationship to Concrete Implementations

All three implement the same interface, differing only in their transport mechanisms:

  • TokioRpcClient (#5.2) - Uses Tokio async runtime and tokio-tungstenite for native WebSocket connections
  • WasmRpcClient (#5.3) - Uses wasm-bindgen and browser WebSocket APIs for WASM environments
  • MockRpcClient - Test implementation with injectable response channels

Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:24-93

Dismiss

Refresh this wiki

Enter email to refresh