This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Service Endpoint Interface
Loading…
Service Endpoint Interface
Relevant source files
- extensions/muxio-rpc-service-endpoint/Cargo.toml
- extensions/muxio-rpc-service-endpoint/src/endpoint.rs
- extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs
Purpose and Scope
This document describes the RpcServiceEndpointInterface trait, which provides the server-side abstraction for registering RPC method handlers and processing incoming requests in the muxio framework. This trait is runtime-agnostic and enables any platform-specific server implementation to handle RPC calls using a consistent interface.
For client-side RPC invocation, see Service Caller Interface. For details on defining RPC services, see Service Definitions.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:1-138
Trait Overview
The RpcServiceEndpointInterface<C> trait defines the server-side contract for handling incoming RPC requests. It is generic over a connection context type C, which allows handlers to access per-connection state or metadata.
Core Methods
| Method | Purpose |
|---|---|
register_prebuffered | Registers an async handler for a specific method ID |
read_bytes | Processes incoming transport bytes, routes to handlers, and sends responses |
get_prebuffered_handlers | Provides access to the handler registry (implementation detail) |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-64
Trait Definition Structure
Diagram: Trait structure showing methods, associated types, and key dependencies
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-14 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64
Handler Registration
The register_prebuffered method registers an asynchronous handler function for a specific RPC method. Handlers are identified by a 64-bit method ID, typically generated using the rpc_method_id! macro from the service definition layer.
Handler Function Signature
Registration Flow
Diagram: Handler registration sequence showing duplicate detection
The handler is wrapped in an Arc and stored in a HashMap<u64, RpcPrebufferedHandler<C>>. If a handler for the given method_id already exists, registration fails with an RpcServiceEndpointError::Handler.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64 extensions/muxio-rpc-service-endpoint/src/endpoint.rs:12-23
Three-Stage Request Processing Pipeline
The read_bytes method implements a three-stage pipeline for processing incoming RPC requests:
Stage 1: Decode and Identify
Incoming transport bytes are passed to the RpcDispatcher for frame decoding and stream demultiplexing. The dispatcher identifies which requests are now fully received (finalized) and ready for processing.
Diagram: Stage 1 - Synchronous decoding and request identification
graph LR
BYTES["Incoming bytes[]"]
DISPATCHER["RpcDispatcher::read_bytes()"]
REQUEST_IDS["Vec<request_id>"]
FINALIZED["Finalized requests\nVec<(id, RpcRequest)>"]
BYTES --> DISPATCHER
DISPATCHER --> REQUEST_IDS
REQUEST_IDS --> FINALIZED
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-97
Stage 2: Execute Handlers Asynchronously
Each finalized request is dispatched to its corresponding handler. Handlers execute concurrently using join_all, allowing the event loop to process multiple requests in parallel without blocking.
Diagram: Stage 2 - Concurrent handler execution
graph TB
subgraph "Handler Execution"
REQ1["Request 1"]
REQ2["Request 2"]
REQ3["Request 3"]
HANDLER1["Handler Future 1"]
HANDLER2["Handler Future 2"]
HANDLER3["Handler Future 3"]
JOIN["futures::join_all"]
RESULTS["Vec<RpcResponse>"]
end
REQ1 --> HANDLER1
REQ2 --> HANDLER2
REQ3 --> HANDLER3
HANDLER1 --> JOIN
HANDLER2 --> JOIN
HANDLER3 --> JOIN
JOIN --> RESULTS
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-126
graph LR
RESPONSES["Handler Results\nVec<RpcResponse>"]
ENCODE["dispatcher.respond()"]
CHUNK["Chunk by max_chunk_size"]
EMIT["on_emit callback"]
TRANSPORT["Transport layer"]
RESPONSES --> ENCODE
ENCODE --> CHUNK
CHUNK --> EMIT
EMIT --> TRANSPORT
Stage 3: Encode and Emit Responses
Handler results are synchronously encoded into the RPC protocol format and emitted back to the transport layer via the RpcDispatcher::respond method.
Diagram: Stage 3 - Synchronous response encoding and emission
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:127-137
Complete Processing Flow
Diagram: Complete three-stage request processing sequence
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138
graph TB
subgraph "RpcServiceEndpoint<C>"
ENDPOINT["RpcServiceEndpoint<C>"]
HANDLERS["prebuffered_handlers:\nArc<Mutex<HashMap<u64, Handler>>>"]
PHANTOM["_context: PhantomData<C>"]
ENDPOINT --> HANDLERS
ENDPOINT --> PHANTOM
end
subgraph "Handler Type Definition"
HANDLER_TYPE["RpcPrebufferedHandler<C>"]
FN_TYPE["Arc<Fn(Vec<u8>, C) -> Pin<Box<Future>>>"]
HANDLER_TYPE -.alias.-> FN_TYPE
end
subgraph "Mutex Abstraction"
FEATURE{{"tokio_support feature"}}
TOKIO_MUTEX["tokio::sync::Mutex"]
STD_MUTEX["std::sync::Mutex"]
FEATURE -->|enabled| TOKIO_MUTEX
FEATURE -->|disabled| STD_MUTEX
end
HANDLERS -.type.-> HANDLER_TYPE
HANDLERS -.implementation.-> FEATURE
Concrete Implementation
The RpcServiceEndpoint<C> struct provides a concrete implementation of the RpcServiceEndpointInterface trait.
Structure
Diagram: Structure of the concrete RpcServiceEndpoint implementation
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint.rs:25-54
Key Implementation Details
| Component | Type | Purpose |
|---|---|---|
prebuffered_handlers | Arc<Mutex<HashMap<u64, RpcPrebufferedHandler<C>>>> | Thread-safe handler registry |
_context | PhantomData<C> | Zero-cost marker for generic type C |
Associated HandlersLock | Mutex<HashMap<u64, RpcPrebufferedHandler<C>>> | Provides access pattern for handler registry |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint.rs:25-32 extensions/muxio-rpc-service-endpoint/src/endpoint.rs:56-67
Connection Context Type Pattern
The endpoint is generic over a context type C, which must satisfy:
This context is passed to every handler invocation and enables:
- Per-connection state : Track connection-specific data like authentication tokens, session IDs, or user information
- Shared resources : Provide access to database pools, configuration, or other application state
- Request metadata : Include connection metadata like remote address, protocol version, or timing information
graph LR
TRANSPORT["Transport Layer\n(per connection)"]
CONTEXT["Context Instance\nC: Clone"]
ENDPOINT["RpcServiceEndpoint"]
READ_BYTES["read_bytes(context)"]
HANDLER["Handler(request, context)"]
TRANSPORT --> CONTEXT
CONTEXT --> ENDPOINT
ENDPOINT --> READ_BYTES
READ_BYTES --> HANDLER
Context Flow
Diagram: Context propagation from transport to handler
When calling read_bytes, the transport layer provides a context instance which is cloned for each handler invocation. This allows handlers to access per-connection state without shared mutable references.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:9-12 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:68-76
Runtime-Agnostic Mutex Abstraction
The endpoint uses conditional compilation to select the appropriate mutex implementation based on the runtime environment:
Feature-Based Selection
| Feature | Mutex Type | Use Case |
|---|---|---|
| Default (no features) | std::sync::Mutex | Blocking, non-async environments |
tokio_support | tokio::sync::Mutex | Async/await with Tokio runtime |
This abstraction allows the same endpoint code to work in both synchronous and asynchronous contexts without runtime overhead or complex trait abstractions.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint.rs:5-9 extensions/muxio-rpc-service-endpoint/Cargo.toml:23-27
graph TB
TRAIT["WithHandlers<C> trait"]
METHOD["with_handlers<F, R>(f: F) -> R"]
STD_IMPL["impl for std::sync::Mutex"]
TOKIO_IMPL["impl for tokio::sync::Mutex"]
TRAIT --> METHOD
TRAIT -.implemented by.-> STD_IMPL
TRAIT -.implemented by.-> TOKIO_IMPL
STD_IMPL --> LOCK_STD["lock().unwrap()"]
TOKIO_IMPL --> LOCK_TOKIO["lock().await"]
WithHandlers Trait
The WithHandlers<C> trait provides a uniform interface for accessing the handler registry regardless of the underlying mutex implementation:
Diagram: WithHandlers abstraction over different mutex types
This trait enables the register_prebuffered method to work identically regardless of the feature flag, maintaining the runtime-agnostic design principle.
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs13 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:46-62
Error Handling
The endpoint returns RpcServiceEndpointError for various failure conditions:
| Error Type | Cause | When |
|---|---|---|
Handler(Box<dyn Error>) | Handler already registered | During register_prebuffered with duplicate method_id |
Dispatcher(RpcDispatcherError) | Frame decode failure | During read_bytes if frames are malformed |
Dispatcher(RpcDispatcherError) | Request tracking failure | During read_bytes if request state is inconsistent |
Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:33-34 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:48-53 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs74
Integration with Platform Implementations
The RpcServiceEndpointInterface is implemented by platform-specific server types:
- Tokio Server : muxio-tokio-rpc-server wraps an
RpcServiceEndpointand handles WebSocket connections - WASM Client : muxio-wasm-rpc-client can act as both client and server endpoint in bidirectional scenarios
Both implementations use the same handler registration API and benefit from compile-time type safety through shared service definitions (see Service Definitions).
Sources: extensions/muxio-rpc-service-endpoint/Cargo.toml:1-33