Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Service Caller Interface

Loading…

Service Caller Interface

Relevant source files

Purpose and Scope

The Service Caller Interface defines the core abstraction for client-side RPC invocation in Muxio. The RpcServiceCallerInterface trait provides a runtime-agnostic interface that allows application code to make RPC calls without depending on specific transport implementations. This abstraction enables the same client code to work across different runtimes (native Tokio, WASM) by implementing the trait for each platform.

For information about defining RPC services and methods, see Service Definitions. For server-side RPC handling, see Service Endpoint Interface. For concrete implementations of this interface, see Tokio RPC Client and WASM RPC Client.

Interface Overview

The RpcServiceCallerInterface is defined as an async trait that provides two primary RPC invocation patterns: streaming and buffered. It abstracts the underlying transport mechanism while exposing connection state and allowing customization through state change handlers.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-405

graph TB
    subgraph "Application Layer"
        AppCode["Application Code"]
ServiceDef["RPC Method Definitions\n(RpcMethodPrebuffered)"]
end
    
    subgraph "Service Caller Interface"
        CallerTrait["RpcServiceCallerInterface\nasync trait"]
CallBuffered["call_rpc_buffered()\nReturns complete response"]
CallStreaming["call_rpc_streaming()\nReturns stream of chunks"]
GetDispatcher["get_dispatcher()\nAccess to RpcDispatcher"]
IsConnected["is_connected()\nConnection status"]
StateHandler["set_state_change_handler()\nState notifications"]
end
    
    subgraph "Concrete Implementations"
        TokioImpl["muxio-tokio-rpc-client\nRpcClient (Tokio)"]
WasmImpl["muxio-wasm-rpc-client\nRpcWasmClient (WASM)"]
end
    
    subgraph "Core Components"
        Dispatcher["RpcDispatcher\nRequest correlation"]
EmitFn["Emit Function\nSend bytes to transport"]
end
    
 
   AppCode --> ServiceDef
 
   ServiceDef --> CallerTrait
 
   CallerTrait --> CallBuffered
 
   CallerTrait --> CallStreaming
 
   CallerTrait --> GetDispatcher
 
   CallerTrait --> IsConnected
 
   CallerTrait --> StateHandler
    
    TokioImpl -.implements.-> CallerTrait
    WasmImpl -.implements.-> CallerTrait
    
 
   GetDispatcher --> Dispatcher
 
   CallBuffered --> Dispatcher
 
   CallStreaming --> Dispatcher
 
   CallBuffered --> EmitFn
 
   CallStreaming --> EmitFn

Trait Definition

The RpcServiceCallerInterface trait requires implementors to provide access to core components and implement multiple invocation patterns:

MethodReturn TypePurpose
get_dispatcher()Arc<TokioMutex<RpcDispatcher<'static>>>Provides access to the RPC dispatcher for request/response correlation
get_emit_fn()Arc<dyn Fn(Vec<u8>) + Send + Sync>Returns the function that sends binary frames to the transport
is_connected()boolChecks current connection state
call_rpc_streaming()Result<(RpcStreamEncoder, DynamicReceiver), RpcServiceError>Initiates streaming RPC call with incremental data reception
call_rpc_buffered()Result<(RpcStreamEncoder, Result<T, RpcServiceError>), RpcServiceError>Initiates buffered RPC call that returns complete response
set_state_change_handler()async fnRegisters callback for transport state changes

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:26-31 extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-405

Core Components Access

Dispatcher Access

The get_dispatcher() method returns an Arc<TokioMutex<RpcDispatcher>> that allows the implementation to register RPC calls and manage request/response correlation. The TokioMutex is used because these methods are async and may need to await the lock.

Emit Function

The get_emit_fn() method returns a closure that the caller interface uses to send binary frames to the underlying transport. This function is called by the RpcStreamEncoder when writing request payloads.

Connection State

The is_connected() method allows implementations to check the connection state before attempting RPC calls. When false, the call_rpc_streaming() method immediately returns a ConnectionAborted error.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:28-30 extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53

Streaming RPC Pattern

The call_rpc_streaming() method provides the foundation for all RPC calls, supporting incremental data reception through dynamic channels. This pattern is used directly for streaming RPC calls and internally by the buffered pattern.

sequenceDiagram
    participant App as "Application"
    participant Interface as "RpcServiceCallerInterface"
    participant Dispatcher as "RpcDispatcher"
    participant Channel as "DynamicChannel\n(mpsc)"
    participant SendFn as "Emit Function"
    participant RecvFn as "Response Handler\n(Closure)"
    
    App->>Interface: call_rpc_streaming(request)
    Interface->>Interface: Check is_connected()
    Interface->>Channel: Create mpsc channel\n(Bounded/Unbounded)
    Interface->>Interface: Create send_fn closure
    Interface->>Interface: Create recv_fn closure
    
    Interface->>Dispatcher: call(request, send_fn, recv_fn)
    Dispatcher-->>Interface: Return RpcStreamEncoder
    
    Note over Interface,Channel: Wait for readiness signal
    RecvFn->>RecvFn: Receive RpcStreamEvent::Header
    RecvFn->>Channel: Send readiness via oneshot
    
    Interface-->>App: Return (encoder, receiver)
    
    loop "For each response chunk"
        RecvFn->>RecvFn: Receive RpcStreamEvent::PayloadChunk
        RecvFn->>Channel: Send Ok(bytes) to DynamicReceiver
        App->>Channel: next().await
        Channel-->>App: Some(Ok(bytes))
    end
    
    RecvFn->>RecvFn: Receive RpcStreamEvent::End
    RecvFn->>Channel: Close sender (drop)
    App->>Channel: next().await
    Channel-->>App: None

Streaming Call Flow

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-349

Dynamic Channel Types

The streaming method accepts a DynamicChannelType parameter that determines the channel buffering strategy:

Channel TypeBuffer SizeUse Case
DynamicChannelType::BoundedDEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZEControlled memory usage, backpressure
DynamicChannelType::UnboundedUnlimitedMaximum throughput, simple buffering

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:56-73

Response Handler Implementation

The recv_fn closure handles all incoming RpcStreamEvent variants synchronously using StdMutex for WASM compatibility. The handler maintains internal state to track response status and buffer error payloads:

stateDiagram-v2
    [*] --> WaitingHeader : recv_fn created
    WaitingHeader --> ProcessingPayload: RpcStreamEvent::Header\nExtract RpcResultStatus
    WaitingHeader --> SendReadiness : Send readiness signal
    SendReadiness --> ProcessingPayload
    
    ProcessingPayload --> BufferSuccess: RpcResultStatus::Success
    ProcessingPayload --> BufferError: RpcResultStatus::*Error
    
    BufferSuccess --> ProcessingPayload : More chunks
    BufferError --> ProcessingPayload : More chunks
    
    ProcessingPayload --> CompleteSuccess: RpcStreamEvent::End\nSuccess status
    ProcessingPayload --> CompleteError: RpcStreamEvent::End\nError status
    ProcessingPayload --> HandleError: RpcStreamEvent::Error
    
    CompleteSuccess --> [*] : Close channel
    CompleteError --> [*] : Send RpcServiceError Close channel
    HandleError --> [*] : Send Transport error Close channel

The response handler uses StdMutex instead of TokioMutex because it operates in a synchronous context and must be compatible with WASM environments where Tokio mutexes are not available.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:91-287 extensions/muxio-rpc-service-caller/src/caller_interface.rs:75-79 extensions/muxio-rpc-service-caller/src/caller_interface.rs:94-96

Response Event Handling

The recv_fn closure processes four event types:

Event TypeActionsChannel Operation
RpcStreamEvent::HeaderExtract RpcResultStatus from metadata, send readiness signalSignal header received via oneshot
RpcStreamEvent::PayloadChunkIf success: forward to receiver; otherwise buffer error payloadsender.send_and_ignore(Ok(bytes))
RpcStreamEvent::EndProcess final status, convert errors to RpcServiceErrorSend final error or close channel
RpcStreamEvent::ErrorSend transport error to both readiness and data channelssender.send_and_ignore(Err(error))

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:118-286

Buffered RPC Pattern

The call_rpc_buffered() method provides a higher-level interface for RPC calls where the entire response is buffered before being returned. This method is built on top of call_rpc_streaming() and is used by RpcMethodPrebuffered implementations.

sequenceDiagram
    participant App as "Application"
    participant Buffered as "call_rpc_buffered()"
    participant Streaming as "call_rpc_streaming()"
    participant Stream as "DynamicReceiver"
    participant Decode as "decode: F"
    
    App->>Buffered: call_rpc_buffered(request, decode)
    Buffered->>Streaming: call_rpc_streaming(request, Unbounded)
    Streaming-->>Buffered: Return (encoder, stream)
    
    Buffered->>Buffered: Create empty success_buf
    
    loop "Stream consumption"
        Buffered->>Stream: stream.next().await
        Stream-->>Buffered: Some(Ok(chunk))
        Buffered->>Buffered: success_buf.extend(chunk)
    end
    
    alt "Stream completed successfully"
        Stream-->>Buffered: None
        Buffered->>Decode: decode(&success_buf)
        Decode-->>Buffered: Return T
        Buffered-->>App: Ok((encoder, Ok(decoded)))
    else "Stream yielded error"
        Stream-->>Buffered: Some(Err(e))
        Buffered-->>App: Ok((encoder, Err(e)))
    end

Buffered Call Implementation

The buffered pattern always uses DynamicChannelType::Unbounded to avoid backpressure complications when consuming the entire stream.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:351-399 extensions/muxio-rpc-service-caller/src/caller_interface.rs:368-370

Decode Function

The decode parameter is a closure that converts the buffered byte slice into the desired return type T. This function is provided by the RPC method implementation and typically uses bitcode::decode() for deserialization:

F: Fn(&[u8]) -> T + Send + Sync + 'static

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:363-365

Return Value Structure

Both streaming and buffered methods return a tuple containing an RpcStreamEncoder and the response data. The encoder allows the caller to write request payloads after initiating the call:

MethodReturn TypeEncoder PurposeResponse Type
call_rpc_streaming()(RpcStreamEncoder, DynamicReceiver)Write request payloadStream of Result<Vec<u8>, RpcServiceError>
call_rpc_buffered()(RpcStreamEncoder, Result<T, RpcServiceError>)Write request payloadComplete decoded response

The RpcStreamEncoder is returned even if the response contains an error, allowing the caller to properly finalize the request payload.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:37-42 extensions/muxio-rpc-service-caller/src/caller_interface.rs:357-362

Connection State Management

State Change Handler

The set_state_change_handler() method allows applications to register callbacks that are invoked when the transport state changes. Implementations store these handlers and invoke them during connection lifecycle events:

The RpcTransportState enum indicates whether the transport is connected or disconnected. For more details on transport state management, see Transport State Management.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:401-405

graph LR
    CallStart["call_rpc_streaming()
invoked"]
CheckConn{"is_connected()?"}
RejectCall["Return ConnectionAborted error"]
ProceedCall["Create channels and proceed"]
CallStart --> CheckConn
 
   CheckConn -->|false| RejectCall
 
   CheckConn -->|true| ProceedCall

Connection Checks

The is_connected() method is checked at the beginning of call_rpc_streaming() to prevent RPC calls on disconnected transports:

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53

Error Handling

The caller interface propagates errors from multiple sources through the RpcServiceError enum:

Error SourceError TypeTrigger
Disconnected clientRpcServiceError::Transport(ConnectionAborted)is_connected() returns false
Dispatcher call failureRpcServiceError::Transport(io::Error)dispatcher.call() returns error
Readiness channel closedRpcServiceError::Transport(io::Error)Oneshot channel drops before header received
Method not foundRpcServiceError::Rpc(NotFound)RpcResultStatus::MethodNotFound in response
Application errorRpcServiceError::Rpc(Fail)RpcResultStatus::Fail in response
System errorRpcServiceError::Rpc(System)RpcResultStatus::SystemError in response
Frame decode errorRpcServiceError::Transport(io::Error)RpcStreamEvent::Error received

For comprehensive error handling documentation, see RPC Service Errors.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:49-52 extensions/muxio-rpc-service-caller/src/caller_interface.rs:186-232 extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-284

Implementation Requirements

Types implementing RpcServiceCallerInterface must:

  1. Provide thread-safe dispatcher access: Return Arc<TokioMutex<RpcDispatcher>> from get_dispatcher()
  2. Implement emit function: Return closure that sends bytes to underlying transport
  3. Track connection state: Maintain boolean state returned by is_connected()
  4. Manage state handlers: Store and invoke state change handlers at appropriate lifecycle points
  5. Satisfy trait bounds: Implement Send + Sync for cross-thread usage

The trait uses #[async_trait::async_trait] to support async methods in trait definitions.

Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-26

graph TB
    subgraph "Trait Definition"
        Trait["RpcServiceCallerInterface\nextensions/muxio-rpc-service-caller"]
end
    
    subgraph "Native Implementation"
        TokioClient["RpcClient\nmuxio-tokio-rpc-client"]
TokioRuntime["Tokio async runtime\ntokio-tungstenite WebSocket"]
end
    
    subgraph "Browser Implementation"
        WasmClient["RpcWasmClient\nmuxio-wasm-rpc-client"]
WasmBridge["wasm-bindgen bridge\nBrowser WebSocket API"]
end
    
    Trait -.implemented by.-> TokioClient
    Trait -.implemented by.-> WasmClient
    
 
   TokioClient --> TokioRuntime
 
   WasmClient --> WasmBridge
    
    style Trait fill:#f9f9f9,stroke:#333,stroke-width:2px

Platform-Specific Implementations

The RpcServiceCallerInterface is implemented by two platform-specific clients:

Both implementations provide the same interface to application code while adapting to their respective runtime environments. For implementation details, see Tokio RPC Client and WASM RPC Client.

graph LR
    subgraph "Application Code"
        Call["Add::call(&client, params)"]
end
    
    subgraph "Method Definition"
        Method["RpcMethodPrebuffered::call()\nType-safe wrapper"]
Encode["encode_request(params)\nSerialize arguments"]
Decode["decode_response(bytes)\nDeserialize result"]
end
    
    subgraph "Caller Interface"
        CallBuffered["call_rpc_buffered(request, decode)"]
BuildRequest["Build RpcRequest\nwith method_id and params"]
end
    
 
   Call --> Method
 
   Method --> Encode
 
   Encode --> BuildRequest
 
   BuildRequest --> CallBuffered
    CallBuffered -.async.-> Decode
 
   Decode --> Method
 
   Method --> Call

Integration with RPC Methods

RPC method definitions use the caller interface through the RpcMethodPrebuffered trait, which provides a type-safe wrapper around call_rpc_buffered():

For details on method definitions and the prebuffered pattern, see Service Definitions and Prebuffered RPC Calls.

Package Information

The RpcServiceCallerInterface is defined in the muxio-rpc-service-caller package, which provides generic, runtime-agnostic client logic:

PackageDescriptionKey Dependencies
muxio-rpc-service-callerGeneric RPC client interface and logicmuxio, muxio-rpc-service, async-trait, futures, tokio (sync only)

The package uses minimal Tokio features (only sync for TokioMutex) to remain as platform-agnostic as possible while supporting async methods.

Sources: extensions/muxio-rpc-service-caller/Cargo.toml:1-22

Dismiss

Refresh this wiki

Enter email to refresh