This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Service Caller Interface
Loading…
Service Caller Interface
Relevant source files
- extensions/muxio-rpc-service-caller/Cargo.toml
- extensions/muxio-rpc-service-caller/src/caller_interface.rs
Purpose and Scope
The Service Caller Interface defines the core abstraction for client-side RPC invocation in Muxio. The RpcServiceCallerInterface trait provides a runtime-agnostic interface that allows application code to make RPC calls without depending on specific transport implementations. This abstraction enables the same client code to work across different runtimes (native Tokio, WASM) by implementing the trait for each platform.
For information about defining RPC services and methods, see Service Definitions. For server-side RPC handling, see Service Endpoint Interface. For concrete implementations of this interface, see Tokio RPC Client and WASM RPC Client.
Interface Overview
The RpcServiceCallerInterface is defined as an async trait that provides two primary RPC invocation patterns: streaming and buffered. It abstracts the underlying transport mechanism while exposing connection state and allowing customization through state change handlers.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-405
graph TB
subgraph "Application Layer"
AppCode["Application Code"]
ServiceDef["RPC Method Definitions\n(RpcMethodPrebuffered)"]
end
subgraph "Service Caller Interface"
CallerTrait["RpcServiceCallerInterface\nasync trait"]
CallBuffered["call_rpc_buffered()\nReturns complete response"]
CallStreaming["call_rpc_streaming()\nReturns stream of chunks"]
GetDispatcher["get_dispatcher()\nAccess to RpcDispatcher"]
IsConnected["is_connected()\nConnection status"]
StateHandler["set_state_change_handler()\nState notifications"]
end
subgraph "Concrete Implementations"
TokioImpl["muxio-tokio-rpc-client\nRpcClient (Tokio)"]
WasmImpl["muxio-wasm-rpc-client\nRpcWasmClient (WASM)"]
end
subgraph "Core Components"
Dispatcher["RpcDispatcher\nRequest correlation"]
EmitFn["Emit Function\nSend bytes to transport"]
end
AppCode --> ServiceDef
ServiceDef --> CallerTrait
CallerTrait --> CallBuffered
CallerTrait --> CallStreaming
CallerTrait --> GetDispatcher
CallerTrait --> IsConnected
CallerTrait --> StateHandler
TokioImpl -.implements.-> CallerTrait
WasmImpl -.implements.-> CallerTrait
GetDispatcher --> Dispatcher
CallBuffered --> Dispatcher
CallStreaming --> Dispatcher
CallBuffered --> EmitFn
CallStreaming --> EmitFn
Trait Definition
The RpcServiceCallerInterface trait requires implementors to provide access to core components and implement multiple invocation patterns:
| Method | Return Type | Purpose |
|---|---|---|
get_dispatcher() | Arc<TokioMutex<RpcDispatcher<'static>>> | Provides access to the RPC dispatcher for request/response correlation |
get_emit_fn() | Arc<dyn Fn(Vec<u8>) + Send + Sync> | Returns the function that sends binary frames to the transport |
is_connected() | bool | Checks current connection state |
call_rpc_streaming() | Result<(RpcStreamEncoder, DynamicReceiver), RpcServiceError> | Initiates streaming RPC call with incremental data reception |
call_rpc_buffered() | Result<(RpcStreamEncoder, Result<T, RpcServiceError>), RpcServiceError> | Initiates buffered RPC call that returns complete response |
set_state_change_handler() | async fn | Registers callback for transport state changes |
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:26-31 extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-405
Core Components Access
Dispatcher Access
The get_dispatcher() method returns an Arc<TokioMutex<RpcDispatcher>> that allows the implementation to register RPC calls and manage request/response correlation. The TokioMutex is used because these methods are async and may need to await the lock.
Emit Function
The get_emit_fn() method returns a closure that the caller interface uses to send binary frames to the underlying transport. This function is called by the RpcStreamEncoder when writing request payloads.
Connection State
The is_connected() method allows implementations to check the connection state before attempting RPC calls. When false, the call_rpc_streaming() method immediately returns a ConnectionAborted error.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:28-30 extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53
Streaming RPC Pattern
The call_rpc_streaming() method provides the foundation for all RPC calls, supporting incremental data reception through dynamic channels. This pattern is used directly for streaming RPC calls and internally by the buffered pattern.
sequenceDiagram
participant App as "Application"
participant Interface as "RpcServiceCallerInterface"
participant Dispatcher as "RpcDispatcher"
participant Channel as "DynamicChannel\n(mpsc)"
participant SendFn as "Emit Function"
participant RecvFn as "Response Handler\n(Closure)"
App->>Interface: call_rpc_streaming(request)
Interface->>Interface: Check is_connected()
Interface->>Channel: Create mpsc channel\n(Bounded/Unbounded)
Interface->>Interface: Create send_fn closure
Interface->>Interface: Create recv_fn closure
Interface->>Dispatcher: call(request, send_fn, recv_fn)
Dispatcher-->>Interface: Return RpcStreamEncoder
Note over Interface,Channel: Wait for readiness signal
RecvFn->>RecvFn: Receive RpcStreamEvent::Header
RecvFn->>Channel: Send readiness via oneshot
Interface-->>App: Return (encoder, receiver)
loop "For each response chunk"
RecvFn->>RecvFn: Receive RpcStreamEvent::PayloadChunk
RecvFn->>Channel: Send Ok(bytes) to DynamicReceiver
App->>Channel: next().await
Channel-->>App: Some(Ok(bytes))
end
RecvFn->>RecvFn: Receive RpcStreamEvent::End
RecvFn->>Channel: Close sender (drop)
App->>Channel: next().await
Channel-->>App: None
Streaming Call Flow
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:32-349
Dynamic Channel Types
The streaming method accepts a DynamicChannelType parameter that determines the channel buffering strategy:
| Channel Type | Buffer Size | Use Case |
|---|---|---|
DynamicChannelType::Bounded | DEFAULT_RPC_STREAM_CHANNEL_BUFFER_SIZE | Controlled memory usage, backpressure |
DynamicChannelType::Unbounded | Unlimited | Maximum throughput, simple buffering |
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:56-73
Response Handler Implementation
The recv_fn closure handles all incoming RpcStreamEvent variants synchronously using StdMutex for WASM compatibility. The handler maintains internal state to track response status and buffer error payloads:
stateDiagram-v2
[*] --> WaitingHeader : recv_fn created
WaitingHeader --> ProcessingPayload: RpcStreamEvent::Header\nExtract RpcResultStatus
WaitingHeader --> SendReadiness : Send readiness signal
SendReadiness --> ProcessingPayload
ProcessingPayload --> BufferSuccess: RpcResultStatus::Success
ProcessingPayload --> BufferError: RpcResultStatus::*Error
BufferSuccess --> ProcessingPayload : More chunks
BufferError --> ProcessingPayload : More chunks
ProcessingPayload --> CompleteSuccess: RpcStreamEvent::End\nSuccess status
ProcessingPayload --> CompleteError: RpcStreamEvent::End\nError status
ProcessingPayload --> HandleError: RpcStreamEvent::Error
CompleteSuccess --> [*] : Close channel
CompleteError --> [*] : Send RpcServiceError Close channel
HandleError --> [*] : Send Transport error Close channel
The response handler uses StdMutex instead of TokioMutex because it operates in a synchronous context and must be compatible with WASM environments where Tokio mutexes are not available.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:91-287 extensions/muxio-rpc-service-caller/src/caller_interface.rs:75-79 extensions/muxio-rpc-service-caller/src/caller_interface.rs:94-96
Response Event Handling
The recv_fn closure processes four event types:
| Event Type | Actions | Channel Operation |
|---|---|---|
RpcStreamEvent::Header | Extract RpcResultStatus from metadata, send readiness signal | Signal header received via oneshot |
RpcStreamEvent::PayloadChunk | If success: forward to receiver; otherwise buffer error payload | sender.send_and_ignore(Ok(bytes)) |
RpcStreamEvent::End | Process final status, convert errors to RpcServiceError | Send final error or close channel |
RpcStreamEvent::Error | Send transport error to both readiness and data channels | sender.send_and_ignore(Err(error)) |
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:118-286
Buffered RPC Pattern
The call_rpc_buffered() method provides a higher-level interface for RPC calls where the entire response is buffered before being returned. This method is built on top of call_rpc_streaming() and is used by RpcMethodPrebuffered implementations.
sequenceDiagram
participant App as "Application"
participant Buffered as "call_rpc_buffered()"
participant Streaming as "call_rpc_streaming()"
participant Stream as "DynamicReceiver"
participant Decode as "decode: F"
App->>Buffered: call_rpc_buffered(request, decode)
Buffered->>Streaming: call_rpc_streaming(request, Unbounded)
Streaming-->>Buffered: Return (encoder, stream)
Buffered->>Buffered: Create empty success_buf
loop "Stream consumption"
Buffered->>Stream: stream.next().await
Stream-->>Buffered: Some(Ok(chunk))
Buffered->>Buffered: success_buf.extend(chunk)
end
alt "Stream completed successfully"
Stream-->>Buffered: None
Buffered->>Decode: decode(&success_buf)
Decode-->>Buffered: Return T
Buffered-->>App: Ok((encoder, Ok(decoded)))
else "Stream yielded error"
Stream-->>Buffered: Some(Err(e))
Buffered-->>App: Ok((encoder, Err(e)))
end
Buffered Call Implementation
The buffered pattern always uses DynamicChannelType::Unbounded to avoid backpressure complications when consuming the entire stream.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:351-399 extensions/muxio-rpc-service-caller/src/caller_interface.rs:368-370
Decode Function
The decode parameter is a closure that converts the buffered byte slice into the desired return type T. This function is provided by the RPC method implementation and typically uses bitcode::decode() for deserialization:
F: Fn(&[u8]) -> T + Send + Sync + 'static
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:363-365
Return Value Structure
Both streaming and buffered methods return a tuple containing an RpcStreamEncoder and the response data. The encoder allows the caller to write request payloads after initiating the call:
| Method | Return Type | Encoder Purpose | Response Type |
|---|---|---|---|
call_rpc_streaming() | (RpcStreamEncoder, DynamicReceiver) | Write request payload | Stream of Result<Vec<u8>, RpcServiceError> |
call_rpc_buffered() | (RpcStreamEncoder, Result<T, RpcServiceError>) | Write request payload | Complete decoded response |
The RpcStreamEncoder is returned even if the response contains an error, allowing the caller to properly finalize the request payload.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:37-42 extensions/muxio-rpc-service-caller/src/caller_interface.rs:357-362
Connection State Management
State Change Handler
The set_state_change_handler() method allows applications to register callbacks that are invoked when the transport state changes. Implementations store these handlers and invoke them during connection lifecycle events:
The RpcTransportState enum indicates whether the transport is connected or disconnected. For more details on transport state management, see Transport State Management.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:401-405
graph LR
CallStart["call_rpc_streaming()
invoked"]
CheckConn{"is_connected()?"}
RejectCall["Return ConnectionAborted error"]
ProceedCall["Create channels and proceed"]
CallStart --> CheckConn
CheckConn -->|false| RejectCall
CheckConn -->|true| ProceedCall
Connection Checks
The is_connected() method is checked at the beginning of call_rpc_streaming() to prevent RPC calls on disconnected transports:
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:44-53
Error Handling
The caller interface propagates errors from multiple sources through the RpcServiceError enum:
| Error Source | Error Type | Trigger |
|---|---|---|
| Disconnected client | RpcServiceError::Transport(ConnectionAborted) | is_connected() returns false |
| Dispatcher call failure | RpcServiceError::Transport(io::Error) | dispatcher.call() returns error |
| Readiness channel closed | RpcServiceError::Transport(io::Error) | Oneshot channel drops before header received |
| Method not found | RpcServiceError::Rpc(NotFound) | RpcResultStatus::MethodNotFound in response |
| Application error | RpcServiceError::Rpc(Fail) | RpcResultStatus::Fail in response |
| System error | RpcServiceError::Rpc(System) | RpcResultStatus::SystemError in response |
| Frame decode error | RpcServiceError::Transport(io::Error) | RpcStreamEvent::Error received |
For comprehensive error handling documentation, see RPC Service Errors.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:49-52 extensions/muxio-rpc-service-caller/src/caller_interface.rs:186-232 extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-284
Implementation Requirements
Types implementing RpcServiceCallerInterface must:
- Provide thread-safe dispatcher access: Return
Arc<TokioMutex<RpcDispatcher>>fromget_dispatcher() - Implement emit function: Return closure that sends bytes to underlying transport
- Track connection state: Maintain boolean state returned by
is_connected() - Manage state handlers: Store and invoke state change handlers at appropriate lifecycle points
- Satisfy trait bounds: Implement
Send + Syncfor cross-thread usage
The trait uses #[async_trait::async_trait] to support async methods in trait definitions.
Sources: extensions/muxio-rpc-service-caller/src/caller_interface.rs:25-26
graph TB
subgraph "Trait Definition"
Trait["RpcServiceCallerInterface\nextensions/muxio-rpc-service-caller"]
end
subgraph "Native Implementation"
TokioClient["RpcClient\nmuxio-tokio-rpc-client"]
TokioRuntime["Tokio async runtime\ntokio-tungstenite WebSocket"]
end
subgraph "Browser Implementation"
WasmClient["RpcWasmClient\nmuxio-wasm-rpc-client"]
WasmBridge["wasm-bindgen bridge\nBrowser WebSocket API"]
end
Trait -.implemented by.-> TokioClient
Trait -.implemented by.-> WasmClient
TokioClient --> TokioRuntime
WasmClient --> WasmBridge
style Trait fill:#f9f9f9,stroke:#333,stroke-width:2px
Platform-Specific Implementations
The RpcServiceCallerInterface is implemented by two platform-specific clients:
Both implementations provide the same interface to application code while adapting to their respective runtime environments. For implementation details, see Tokio RPC Client and WASM RPC Client.
graph LR
subgraph "Application Code"
Call["Add::call(&client, params)"]
end
subgraph "Method Definition"
Method["RpcMethodPrebuffered::call()\nType-safe wrapper"]
Encode["encode_request(params)\nSerialize arguments"]
Decode["decode_response(bytes)\nDeserialize result"]
end
subgraph "Caller Interface"
CallBuffered["call_rpc_buffered(request, decode)"]
BuildRequest["Build RpcRequest\nwith method_id and params"]
end
Call --> Method
Method --> Encode
Encode --> BuildRequest
BuildRequest --> CallBuffered
CallBuffered -.async.-> Decode
Decode --> Method
Method --> Call
Integration with RPC Methods
RPC method definitions use the caller interface through the RpcMethodPrebuffered trait, which provides a type-safe wrapper around call_rpc_buffered():
For details on method definitions and the prebuffered pattern, see Service Definitions and Prebuffered RPC Calls.
Package Information
The RpcServiceCallerInterface is defined in the muxio-rpc-service-caller package, which provides generic, runtime-agnostic client logic:
| Package | Description | Key Dependencies |
|---|---|---|
muxio-rpc-service-caller | Generic RPC client interface and logic | muxio, muxio-rpc-service, async-trait, futures, tokio (sync only) |
The package uses minimal Tokio features (only sync for TokioMutex) to remain as platform-agnostic as possible while supporting async methods.
Sources: extensions/muxio-rpc-service-caller/Cargo.toml:1-22
Dismiss
Refresh this wiki
Enter email to refresh