This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Service Endpoint Interface
Relevant source files
- extensions/muxio-rpc-service-caller/src/prebuffered/traits.rs
- extensions/muxio-rpc-service-endpoint/Cargo.toml
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs
- extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs
- extensions/muxio-wasm-rpc-client/tests/prebuffered_integration_tests.rs
Purpose and Scope
This document describes the RpcServiceEndpointInterface trait, which provides the server-side interface for handling incoming RPC requests in the rust-muxio system. This interface is responsible for registering method handlers, decoding incoming byte streams into RPC requests, dispatching those requests to the appropriate handlers, and encoding responses back to clients.
For information about the client-side interface for making RPC calls, see Service Caller Interface. For details about service method definitions, see Service Definitions.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:1-138
Overview
The RpcServiceEndpointInterface<C> trait defines the contract that server-side endpoint implementations must fulfill. It is generic over a connection context type C, allowing each RPC handler to access connection-specific data such as authentication state, session information, or connection metadata.
Core Responsibilities
| Responsibility | Description |
|---|---|
| Handler Registration | Provides register_prebuffered method to associate method IDs with handler closures |
| Request Decoding | Processes incoming byte streams and identifies complete RPC requests |
| Request Dispatch | Routes requests to the appropriate handler based on METHOD_ID |
| Response Encoding | Encodes handler results back into binary frames for transmission |
| Concurrent Execution | Executes multiple handlers concurrently when multiple requests arrive |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-14
Trait Definition
Diagram: RpcServiceEndpointInterface Trait Structure
The trait is parameterized by a connection context type C that must be Send + Sync + Clone + 'static. This context is passed to every handler invocation, enabling stateful request processing.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-14
Handler Registration
The register_prebuffered Method
Handlers are registered by calling register_prebuffered with a unique METHOD_ID and an asynchronous closure. The method ensures that duplicate registrations are prevented at runtime.
Diagram: Handler Registration Flow
sequenceDiagram
participant App as "Application Code"
participant Endpoint as "RpcServiceEndpointInterface"
participant Handlers as "HandlersLock (WithHandlers)"
participant HashMap as "HashMap<u64, Handler>"
App->>Endpoint: register_prebuffered(METHOD_ID, handler)
Endpoint->>Handlers: with_handlers(|handlers| {...})
Handlers->>HashMap: entry(METHOD_ID)
alt METHOD_ID not registered
HashMap-->>Handlers: Entry::Vacant
Handlers->>HashMap: insert(Arc::new(wrapped_handler))
HashMap-->>Handlers: Ok(())
Handlers-->>Endpoint: Ok(())
Endpoint-->>App: Ok(())
else METHOD_ID already exists
HashMap-->>Handlers: Entry::Occupied
Handlers-->>Endpoint: Err(RpcServiceEndpointError::Handler)
Endpoint-->>App: Err("Handler already registered")
end
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64
Handler Signature
Handlers must conform to this signature:
| Component | Type |
|---|---|
| Input | Vec<u8> (raw request bytes) |
| Context | C (connection context) |
| Output | Future<Output = Result<Vec<u8>, Box<dyn Error + Send + Sync>>> |
The handler closure is wrapped in an Arc to allow shared ownership across multiple concurrent invocations.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:41-44
Example Usage
Integration tests demonstrate typical handler registration patterns:
Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:35-43
Request Processing Flow
The read_bytes Method
The read_bytes method is the core entry point for processing incoming data. It implements a three-stage pipeline that separates synchronous framing operations from asynchronous handler execution.
Diagram: Three-Stage Request Processing Pipeline
graph TB
subgraph "Stage 1: Decode Incoming Frames"
A["read_bytes(bytes)"] --> B["RpcDispatcher::read_bytes(bytes)"]
B --> C["Returns Vec<request_id>"]
C --> D["Check is_rpc_request_finalized(id)"]
D --> E["delete_rpc_request(id)"]
E --> F["Collect finalized_requests"]
end
subgraph "Stage 2: Execute RPC Handlers"
F --> G["For each (request_id, request)"]
G --> H["process_single_prebuffered_request"]
H --> I["Lookup handler by METHOD_ID"]
I --> J["Invoke handler(request_bytes, ctx)"]
J --> K["Handler returns response_bytes"]
K --> L["join_all(response_futures)"]
end
subgraph "Stage 3: Encode & Emit Responses"
L --> M["For each response"]
M --> N["dispatcher.respond()"]
N --> O["Chunk and serialize"]
O --> P["on_emit(bytes)"]
end
style A fill:#f9f9f9
style L fill:#f9f9f9
style P fill:#f9f9f9
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-137
Stage 1: Frame Decoding
The first stage is synchronous and processes the raw byte stream:
-
Decode Frames :
dispatcher.read_bytes(bytes)parses the binary framing protocol and returns a list of request IDs that were affected by the incoming data. -
Identify Complete Requests : For each request ID, check
is_rpc_request_finalized(id)to determine if the request is fully received. -
Extract Request Data : Call
delete_rpc_request(id)to remove the complete request from the dispatcher's internal state and obtain the fullRpcRequeststructure.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-97
Stage 2: Asynchronous Handler Execution
The second stage executes all handlers concurrently:
Diagram: Concurrent Handler Execution
-
Lookup Handler : Use
METHOD_IDfrom the request to find the registered handler in the handlers map. -
Invoke Handler : If found, execute the handler closure with the request bytes and connection context. If not found, generate a
NotFounderror response. -
Await All : Use
join_allto wait for all handler futures to complete before proceeding to Stage 3.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-126
Stage 3: Response Encoding
The third stage is synchronous and encodes responses back to the transport:
-
Encode Response : Each
RpcResponseis passed todispatcher.respond()along with the chunk size and emit callback. -
Chunk and Serialize : The dispatcher chunks large responses and serializes them according to the binary framing protocol.
-
Emit Bytes : The
on_emitcallback is invoked with each chunk of bytes, which the transport implementation sends over the network.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:127-137
Connection Context
Generic Context Type C
The endpoint interface is generic over a connection context type C that represents per-connection state. This context is cloned and passed to every handler invocation, allowing handlers to access:
- Authentication credentials
- Session data
- Connection metadata (IP address, connection time, etc.)
- Per-connection resources (database connections, etc.)
Context Requirements
| Trait Bound | Reason |
|---|---|
Send | Handlers run in async tasks that may move between threads |
Sync | Multiple handlers may reference the same context concurrently |
Clone | Each handler receives its own clone of the context |
'static | Contexts must outlive handler invocations |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:9-11
Stateless Servers
For stateless servers, the context type can be ():
Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:37-43
classDiagram
class WithHandlers~C~ {<<trait>>\n+with_handlers(F) Result~R~}
class PrebufferedHandlers {<<HashMap>>\nu64 → Arc~HandlerFn~}
class RwLock~PrebufferedHandlers~ {+read() RwLockReadGuard\n+write() RwLockWriteGuard}
class TokioRwLock~PrebufferedHandlers~ {+read() RwLockReadGuard\n+write() RwLockWriteGuard}
WithHandlers <|.. RwLock~PrebufferedHandlers~ : implements (std)
WithHandlers <|.. TokioRwLock~PrebufferedHandlers~ : implements (tokio)
RwLock --> PrebufferedHandlers : protects
TokioRwLock --> PrebufferedHandlers : protects
Handler Storage with WithHandlers
The HandlersLock associated type must implement the WithHandlers<C> trait, which provides thread-safe access to the handler storage.
Diagram: Handler Storage Implementations
The with_handlers method provides a closure-based API for accessing the handler map, abstracting over different locking mechanisms (std RwLock vs tokio RwLock).
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs13 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:46-62
Error Handling
Registration Errors
The register_prebuffered method returns RpcServiceEndpointError if:
| Error | Condition |
|---|---|
RpcServiceEndpointError::Handler | A handler for the given METHOD_ID is already registered |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:49-52
Request Processing Errors
During read_bytes, errors can occur at multiple stages:
| Stage | Error Type | Handling |
|---|---|---|
| Frame Decoding | RpcServiceEndpointError::Dispatch | Returned immediately, processing stops |
| Handler Execution | Box<dyn Error> | Converted to RpcResponse::error, sent to client |
| Response Encoding | RpcServiceEndpointError::Dispatch | Ignored (best-effort response delivery) |
Handler errors are caught and converted into error responses that are sent back to the client using the RPC error protocol. See RPC Service Errors for details on error serialization.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:74-137
graph LR A["RpcServer"] --> B["endpoint()"] B --> C["Arc<RpcEndpoint>"] C --> D["RpcServiceEndpointInterface impl"] D --> E["register_prebuffered()"] D --> F["read_bytes()"] G["WebSocket Handler"] --> H["receive bytes"] H --> F F --> I["on_emit callback"] I --> J["send bytes"]
Integration with Transport Implementations
Server Implementation Pattern
Transport implementations like RpcServer implement RpcServiceEndpointInterface and provide an endpoint() method to obtain a reference for handler registration:
Diagram: Server-Endpoint Relationship
Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:26-61
Typical Usage Pattern
Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:26-70
Cross-Platform Compatibility
The RpcServiceEndpointInterface is runtime-agnostic and can be implemented for different async runtimes:
| Implementation | Runtime | Locking Mechanism |
|---|---|---|
muxio-tokio-rpc-server | Tokio | tokio::sync::RwLock |
| Custom implementations | Any | std::sync::RwLock or custom |
The trait's design with async_trait allows both Tokio-based and non-Tokio implementations to coexist.
Sources: extensions/muxio-rpc-service-endpoint/Cargo.toml:21-27
Performance Considerations
Concurrent Handler Execution
The read_bytes method uses join_all to execute all handlers that can be dispatched from a single batch of incoming bytes. This maximizes throughput when multiple requests arrive simultaneously.
Zero-Copy Processing
Handlers receive Vec<u8> directly from the dispatcher without intermediate allocations. The binary framing protocol minimizes overhead during frame reassembly.
Handler Caching
Handlers are stored as Arc<Handler> in the handlers map, allowing them to be cloned efficiently when dispatching to multiple concurrent requests.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:105-126
graph TB A["Integration Test"] --> B["Start RpcServer"] B --> C["endpoint.register_prebuffered()"] C --> D["server.serve_with_listener()"] A --> E["Start RpcClient"] E --> F["Method::call()"] F --> G["WebSocket Transport"] G --> H["Server read_bytes()"] H --> I["Handler Execution"] I --> J["Response"] J --> K["Client receives result"] K --> L["assert_eq!"]
Testing
Integration tests validate the endpoint interface by creating real server-client connections:
Diagram: Integration Test Flow
Tests cover:
- Successful request-response roundtrips
- Error propagation from handlers
- Large payload handling (chunked transmission)
- Method not found errors
Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:19-97 extensions/muxio-wasm-rpc-client/tests/prebuffered_integration_tests.rs:40-142
Dismiss
Refresh this wiki
Enter email to refresh