This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Service Caller Interface
Relevant source files
- extensions/muxio-rpc-service-caller/Cargo.toml
- extensions/muxio-rpc-service-caller/src/caller_interface.rs
- extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs
Purpose and Scope
The Service Caller Interface defines the client-side abstraction for making RPC calls in the rust-muxio system. This interface, defined by the RpcServiceCallerInterface trait in the muxio-rpc-service-caller crate, provides the core logic for encoding requests, managing response streams, and handling errors that is shared by all client implementations.
This page covers the client-side RPC invocation mechanism. For server-side request handling, see Service Endpoint Interface. For information on defining RPC methods, see Service Definitions. For details on using prebuffered methods with this interface, see Prebuffered RPC Calls. For concrete implementations of this interface, see Tokio RPC Client and WASM RPC Client.
Sources: extensions/muxio-rpc-service-caller/Cargo.toml:1-22
Trait Definition
The RpcServiceCallerInterface trait defines the contract that all RPC clients must implement. It is runtime-agnostic and transport-agnostic, focusing solely on the mechanics of RPC invocation.
Core Methods
| Method | Return Type | Purpose |
|---|---|---|
get_dispatcher() | Arc<TokioMutex<RpcDispatcher<'static>>> | Provides access to the RPC dispatcher for request management |
get_emit_fn() | Arc<dyn Fn(Vec<u8>) + Send + Sync> | Returns function to transmit encoded bytes to transport layer |
is_connected() | bool | Checks current transport connection state |
call_rpc_streaming() | Result<(RpcStreamEncoder, DynamicReceiver), RpcServiceError> | Initiates streaming RPC call with chunked response handling |
call_rpc_buffered() | Result<(RpcStreamEncoder, Result<T, RpcServiceError>), RpcServiceError> | Initiates buffered RPC call that collects complete response |
set_state_change_handler() | async | Registers callback for transport state changes |
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-405
RPC Call Flow
The following diagram illustrates how an RPC call flows through the service caller interface:
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-349
sequenceDiagram
participant App as "Application Code"
participant Trait as "RpcServiceCallerInterface"
participant Dispatcher as "RpcDispatcher"
participant SendFn as "send_fn Closure"
participant RecvFn as "recv_fn Closure"
participant Transport as "Transport Layer"
participant DynChan as "DynamicChannel"
App->>Trait: call_rpc_streaming(RpcRequest)
Trait->>Trait: Check is_connected()
alt Not Connected
Trait-->>App: Err(ConnectionAborted)
end
Trait->>DynChan: Create channel (Bounded/Unbounded)
Trait->>SendFn: Create emission closure
Trait->>RecvFn: Create response handler
Trait->>Dispatcher: dispatcher.call(request, send_fn, recv_fn)
Dispatcher-->>Trait: RpcStreamEncoder
Trait->>App: Wait on readiness channel
Transport->>RecvFn: RpcStreamEvent::Header
RecvFn->>RecvFn: Extract RpcResultStatus
RecvFn->>Trait: Signal ready via oneshot
Trait-->>App: Return (encoder, receiver)
loop For each chunk
Transport->>RecvFn: RpcStreamEvent::PayloadChunk
RecvFn->>DynChan: Send Ok(bytes) or buffer error
end
Transport->>RecvFn: RpcStreamEvent::End
RecvFn->>RecvFn: Check final status
alt Success
RecvFn->>DynChan: Close channel
else Error
RecvFn->>DynChan: Send Err(RpcServiceError)
end
Streaming RPC Calls
Method Signature
The call_rpc_streaming method is the foundation for all RPC invocations:
Channel Types
The method accepts a DynamicChannelType parameter that determines the channel behavior:
| Channel Type | Buffer Size | Use Case |
|---|---|---|
Unbounded | Unlimited | Large or unpredictable response sizes |
Bounded | DEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE (8) | Controlled memory usage with backpressure |
Components Created
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:33-73 extensions/muxio-rpc-service-caller/src/caller_interface.rs:77-96
Connection State Validation
Before initiating any RPC call, the interface checks connection state:
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53
Buffered RPC Calls
The call_rpc_buffered method builds on call_rpc_streaming to provide a simpler interface for methods that return complete responses:
Method Signature
Buffering Strategy
The method accumulates all response chunks into a buffer, then applies the decode function once:
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:351-399
Response Handler Implementation
The recv_fn closure implements the response handling logic by processing RpcStreamEvent types:
stateDiagram-v2
[*] --> WaitingForHeader
WaitingForHeader --> ReadySignaled: RpcStreamEvent::Header
ReadySignaled --> ProcessingChunks : Extract RpcResultStatus
ProcessingChunks --> ProcessingChunks: RpcStreamEvent::PayloadChunk\n(Success → DynamicSender\nError → error_buffer)
ProcessingChunks --> Completed: RpcStreamEvent::End
ProcessingChunks --> ErrorState: RpcStreamEvent::Error
Completed --> [*] : Close DynamicSender
ErrorState --> [*] : Send error and close
Event Processing Flow
Status Handling
RpcResultStatus | Action |
|---|---|
Success | Forward payload chunks to DynamicSender |
MethodNotFound | Buffer payload, send RpcServiceError::Rpc with NotFound code |
Fail | Send RpcServiceError::Rpc with Fail code |
SystemError | Buffer error message, send RpcServiceError::Rpc with System code |
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:102-287
Mutex Usage Pattern
The response handler uses std::sync::Mutex (not tokio::sync::Mutex) because it executes in a synchronous context:
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:75-115
Error Handling Strategy
Error Types
Error Propagation Points
| Error Source | When | Error Type |
|---|---|---|
| Disconnected client | Before call_rpc_streaming | RpcServiceError::Transport(ConnectionAborted) |
| Dispatcher failure | During dispatcher.call() | RpcServiceError::Transport(io::Error::other) |
| Readiness timeout | No header received | RpcServiceError::Transport("channel closed prematurely") |
| Frame decode error | RpcStreamEvent::Error | RpcServiceError::Transport(frame_decode_error) |
| Remote RPC error | RpcStreamEvent::End with error status | RpcServiceError::Rpc(code + message) |
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-52 extensions/muxio-rpc-service-caller/src/caller_interface.rs:186-238 extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-284
Implementation Requirements
Required Dependencies
Implementations of RpcServiceCallerInterface must maintain:
RpcDispatcherinstance - For managing request correlation and stream multiplexing- Emit function - For transmitting encoded bytes to the transport layer
- Connection state - Boolean flag tracked by transport implementation
Trait Bounds
The trait requires Send + Sync for async context compatibility:
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-26
Integration with Service Definitions
The interface integrates with RpcMethodPrebuffered through the RpcCallPrebuffered extension trait, providing type-safe method invocation:
graph LR
subgraph "Service Definition"
Method["RpcMethodPrebuffered\ne.g., Echo"]
MethodID["METHOD_ID: u64"]
Encode["encode_request()"]
Decode["decode_response()"]
end
subgraph "Caller Extension"
CallTrait["RpcCallPrebuffered"]
CallFn["call(client, params)"]
end
subgraph "Caller Interface"
Interface["RpcServiceCallerInterface"]
Buffered["call_rpc_buffered()"]
end
Method --> MethodID
Method --> Encode
Method --> Decode
CallTrait --> Method
CallFn --> Encode
CallFn --> Decode
CallFn --> Interface
Interface --> Buffered
Example usage from tests:
Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs203
Mock Implementation for Testing
The test suite demonstrates a minimal implementation:
The mock stores a shared DynamicSender that tests can use to inject response data:
Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:20-93
State Change Handling
The interface provides a method for registering transport state change callbacks:
This allows applications to react to connection state transitions. See Transport State Management for details on state transitions.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:401-405
Relationship to Concrete Implementations
All three implement the same interface, differing only in their transport mechanisms:
- TokioRpcClient (#5.2) - Uses Tokio async runtime and
tokio-tungstenitefor native WebSocket connections - WasmRpcClient (#5.3) - Uses
wasm-bindgenand browser WebSocket APIs for WASM environments - MockRpcClient - Test implementation with injectable response channels
Sources: extensions/muxio-rpc-service-caller/tests/prebuffered_caller_tests.rs:24-93
Dismiss
Refresh this wiki
Enter email to refresh