Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Service Endpoint Interface

Loading…

Service Endpoint Interface

Relevant source files

Purpose and Scope

This document describes the RpcServiceEndpointInterface trait, which provides the server-side abstraction for registering RPC method handlers and processing incoming requests in the muxio framework. This trait is runtime-agnostic and enables any platform-specific server implementation to handle RPC calls using a consistent interface.

For client-side RPC invocation, see Service Caller Interface. For details on defining RPC services, see Service Definitions.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:1-138


Trait Overview

The RpcServiceEndpointInterface<C> trait defines the server-side contract for handling incoming RPC requests. It is generic over a connection context type C, which allows handlers to access per-connection state or metadata.

Core Methods

MethodPurpose
register_prebufferedRegisters an async handler for a specific method ID
read_bytesProcesses incoming transport bytes, routes to handlers, and sends responses
get_prebuffered_handlersProvides access to the handler registry (implementation detail)

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-64


Trait Definition Structure

Diagram: Trait structure showing methods, associated types, and key dependencies

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-14 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64


Handler Registration

The register_prebuffered method registers an asynchronous handler function for a specific RPC method. Handlers are identified by a 64-bit method ID, typically generated using the rpc_method_id! macro from the service definition layer.

Handler Function Signature

Registration Flow

Diagram: Handler registration sequence showing duplicate detection

The handler is wrapped in an Arc and stored in a HashMap<u64, RpcPrebufferedHandler<C>>. If a handler for the given method_id already exists, registration fails with an RpcServiceEndpointError::Handler.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64 extensions/muxio-rpc-service-endpoint/src/endpoint.rs:12-23


Three-Stage Request Processing Pipeline

The read_bytes method implements a three-stage pipeline for processing incoming RPC requests:

Stage 1: Decode and Identify

Incoming transport bytes are passed to the RpcDispatcher for frame decoding and stream demultiplexing. The dispatcher identifies which requests are now fully received (finalized) and ready for processing.

Diagram: Stage 1 - Synchronous decoding and request identification

graph LR
    BYTES["Incoming bytes[]"]
DISPATCHER["RpcDispatcher::read_bytes()"]
REQUEST_IDS["Vec&lt;request_id&gt;"]
FINALIZED["Finalized requests\nVec&lt;(id, RpcRequest)&gt;"]
BYTES --> DISPATCHER
 
   DISPATCHER --> REQUEST_IDS
 
   REQUEST_IDS --> FINALIZED

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-97

Stage 2: Execute Handlers Asynchronously

Each finalized request is dispatched to its corresponding handler. Handlers execute concurrently using join_all, allowing the event loop to process multiple requests in parallel without blocking.

Diagram: Stage 2 - Concurrent handler execution

graph TB
    subgraph "Handler Execution"
        REQ1["Request 1"]
REQ2["Request 2"]
REQ3["Request 3"]
HANDLER1["Handler Future 1"]
HANDLER2["Handler Future 2"]
HANDLER3["Handler Future 3"]
JOIN["futures::join_all"]
RESULTS["Vec&lt;RpcResponse&gt;"]
end
    
 
   REQ1 --> HANDLER1
 
   REQ2 --> HANDLER2
 
   REQ3 --> HANDLER3
    
 
   HANDLER1 --> JOIN
 
   HANDLER2 --> JOIN
 
   HANDLER3 --> JOIN
    
 
   JOIN --> RESULTS

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-126

graph LR
    RESPONSES["Handler Results\nVec&lt;RpcResponse&gt;"]
ENCODE["dispatcher.respond()"]
CHUNK["Chunk by max_chunk_size"]
EMIT["on_emit callback"]
TRANSPORT["Transport layer"]
RESPONSES --> ENCODE
 
   ENCODE --> CHUNK
 
   CHUNK --> EMIT
 
   EMIT --> TRANSPORT

Stage 3: Encode and Emit Responses

Handler results are synchronously encoded into the RPC protocol format and emitted back to the transport layer via the RpcDispatcher::respond method.

Diagram: Stage 3 - Synchronous response encoding and emission

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:127-137

Complete Processing Flow

Diagram: Complete three-stage request processing sequence

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:66-138


graph TB
    subgraph "RpcServiceEndpoint&lt;C&gt;"
        ENDPOINT["RpcServiceEndpoint&lt;C&gt;"]
HANDLERS["prebuffered_handlers:\nArc&lt;Mutex&lt;HashMap&lt;u64, Handler&gt;&gt;&gt;"]
PHANTOM["_context: PhantomData&lt;C&gt;"]
ENDPOINT --> HANDLERS
 
       ENDPOINT --> PHANTOM
    end
    
    subgraph "Handler Type Definition"
        HANDLER_TYPE["RpcPrebufferedHandler&lt;C&gt;"]
FN_TYPE["Arc&lt;Fn(Vec&lt;u8&gt;, C) -> Pin&lt;Box&lt;Future&gt;&gt;&gt;"]
HANDLER_TYPE -.alias.-> FN_TYPE
    end
    
    subgraph "Mutex Abstraction"
        FEATURE{{"tokio_support feature"}}
TOKIO_MUTEX["tokio::sync::Mutex"]
STD_MUTEX["std::sync::Mutex"]
FEATURE -->|enabled| TOKIO_MUTEX
 
       FEATURE -->|disabled| STD_MUTEX
    end
    
    HANDLERS -.type.-> HANDLER_TYPE
    HANDLERS -.implementation.-> FEATURE

Concrete Implementation

The RpcServiceEndpoint<C> struct provides a concrete implementation of the RpcServiceEndpointInterface trait.

Structure

Diagram: Structure of the concrete RpcServiceEndpoint implementation

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint.rs:25-54

Key Implementation Details

ComponentTypePurpose
prebuffered_handlersArc<Mutex<HashMap<u64, RpcPrebufferedHandler<C>>>>Thread-safe handler registry
_contextPhantomData<C>Zero-cost marker for generic type C
Associated HandlersLockMutex<HashMap<u64, RpcPrebufferedHandler<C>>>Provides access pattern for handler registry

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint.rs:25-32 extensions/muxio-rpc-service-endpoint/src/endpoint.rs:56-67


Connection Context Type Pattern

The endpoint is generic over a context type C, which must satisfy:

This context is passed to every handler invocation and enables:

  • Per-connection state : Track connection-specific data like authentication tokens, session IDs, or user information
  • Shared resources : Provide access to database pools, configuration, or other application state
  • Request metadata : Include connection metadata like remote address, protocol version, or timing information
graph LR
    TRANSPORT["Transport Layer\n(per connection)"]
CONTEXT["Context Instance\nC: Clone"]
ENDPOINT["RpcServiceEndpoint"]
READ_BYTES["read_bytes(context)"]
HANDLER["Handler(request, context)"]
TRANSPORT --> CONTEXT
 
   CONTEXT --> ENDPOINT
 
   ENDPOINT --> READ_BYTES
 
   READ_BYTES --> HANDLER

Context Flow

Diagram: Context propagation from transport to handler

When calling read_bytes, the transport layer provides a context instance which is cloned for each handler invocation. This allows handlers to access per-connection state without shared mutable references.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:9-12 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:68-76


Runtime-Agnostic Mutex Abstraction

The endpoint uses conditional compilation to select the appropriate mutex implementation based on the runtime environment:

Feature-Based Selection

FeatureMutex TypeUse Case
Default (no features)std::sync::MutexBlocking, non-async environments
tokio_supporttokio::sync::MutexAsync/await with Tokio runtime

This abstraction allows the same endpoint code to work in both synchronous and asynchronous contexts without runtime overhead or complex trait abstractions.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint.rs:5-9 extensions/muxio-rpc-service-endpoint/Cargo.toml:23-27


graph TB
    TRAIT["WithHandlers&lt;C&gt; trait"]
METHOD["with_handlers&lt;F, R&gt;(f: F) -> R"]
STD_IMPL["impl for std::sync::Mutex"]
TOKIO_IMPL["impl for tokio::sync::Mutex"]
TRAIT --> METHOD
    TRAIT -.implemented by.-> STD_IMPL
    TRAIT -.implemented by.-> TOKIO_IMPL
    
 
   STD_IMPL --> LOCK_STD["lock().unwrap()"]
TOKIO_IMPL --> LOCK_TOKIO["lock().await"]

WithHandlers Trait

The WithHandlers<C> trait provides a uniform interface for accessing the handler registry regardless of the underlying mutex implementation:

Diagram: WithHandlers abstraction over different mutex types

This trait enables the register_prebuffered method to work identically regardless of the feature flag, maintaining the runtime-agnostic design principle.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs13 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:46-62


Error Handling

The endpoint returns RpcServiceEndpointError for various failure conditions:

Error TypeCauseWhen
Handler(Box<dyn Error>)Handler already registeredDuring register_prebuffered with duplicate method_id
Dispatcher(RpcDispatcherError)Frame decode failureDuring read_bytes if frames are malformed
Dispatcher(RpcDispatcherError)Request tracking failureDuring read_bytes if request state is inconsistent

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:33-34 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:48-53 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs74


Integration with Platform Implementations

The RpcServiceEndpointInterface is implemented by platform-specific server types:

Both implementations use the same handler registration API and benefit from compile-time type safety through shared service definitions (see Service Definitions).

Sources: extensions/muxio-rpc-service-endpoint/Cargo.toml:1-33