Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Service Endpoint Interface

Relevant source files

Purpose and Scope

This document describes the RpcServiceEndpointInterface trait, which provides the server-side interface for handling incoming RPC requests in the rust-muxio system. This interface is responsible for registering method handlers, decoding incoming byte streams into RPC requests, dispatching those requests to the appropriate handlers, and encoding responses back to clients.

For information about the client-side interface for making RPC calls, see Service Caller Interface. For details about service method definitions, see Service Definitions.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:1-138


Overview

The RpcServiceEndpointInterface<C> trait defines the contract that server-side endpoint implementations must fulfill. It is generic over a connection context type C, allowing each RPC handler to access connection-specific data such as authentication state, session information, or connection metadata.

Core Responsibilities

ResponsibilityDescription
Handler RegistrationProvides register_prebuffered method to associate method IDs with handler closures
Request DecodingProcesses incoming byte streams and identifies complete RPC requests
Request DispatchRoutes requests to the appropriate handler based on METHOD_ID
Response EncodingEncodes handler results back into binary frames for transmission
Concurrent ExecutionExecutes multiple handlers concurrently when multiple requests arrive

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-14


Trait Definition

Diagram: RpcServiceEndpointInterface Trait Structure

The trait is parameterized by a connection context type C that must be Send + Sync + Clone + 'static. This context is passed to every handler invocation, enabling stateful request processing.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:8-14


Handler Registration

The register_prebuffered Method

Handlers are registered by calling register_prebuffered with a unique METHOD_ID and an asynchronous closure. The method ensures that duplicate registrations are prevented at runtime.

Diagram: Handler Registration Flow

sequenceDiagram
    participant App as "Application Code"
    participant Endpoint as "RpcServiceEndpointInterface"
    participant Handlers as "HandlersLock (WithHandlers)"
    participant HashMap as "HashMap<u64, Handler>"
    
    App->>Endpoint: register_prebuffered(METHOD_ID, handler)
    Endpoint->>Handlers: with_handlers(|handlers| {...})
    Handlers->>HashMap: entry(METHOD_ID)
    
    alt METHOD_ID not registered
        HashMap-->>Handlers: Entry::Vacant
        Handlers->>HashMap: insert(Arc::new(wrapped_handler))
        HashMap-->>Handlers: Ok(())
        Handlers-->>Endpoint: Ok(())
        Endpoint-->>App: Ok(())
    else METHOD_ID already exists
        HashMap-->>Handlers: Entry::Occupied
        Handlers-->>Endpoint: Err(RpcServiceEndpointError::Handler)
        Endpoint-->>App: Err("Handler already registered")
    end

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:35-64

Handler Signature

Handlers must conform to this signature:

ComponentType
InputVec<u8> (raw request bytes)
ContextC (connection context)
OutputFuture<Output = Result<Vec<u8>, Box<dyn Error + Send + Sync>>>

The handler closure is wrapped in an Arc to allow shared ownership across multiple concurrent invocations.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:41-44

Example Usage

Integration tests demonstrate typical handler registration patterns:

Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:35-43


Request Processing Flow

The read_bytes Method

The read_bytes method is the core entry point for processing incoming data. It implements a three-stage pipeline that separates synchronous framing operations from asynchronous handler execution.

Diagram: Three-Stage Request Processing Pipeline

graph TB
    subgraph "Stage 1: Decode Incoming Frames"
 
       A["read_bytes(bytes)"] --> B["RpcDispatcher::read_bytes(bytes)"]
B --> C["Returns Vec<request_id>"]
C --> D["Check is_rpc_request_finalized(id)"]
D --> E["delete_rpc_request(id)"]
E --> F["Collect finalized_requests"]
end
    
    subgraph "Stage 2: Execute RPC Handlers"
 
       F --> G["For each (request_id, request)"]
G --> H["process_single_prebuffered_request"]
H --> I["Lookup handler by METHOD_ID"]
I --> J["Invoke handler(request_bytes, ctx)"]
J --> K["Handler returns response_bytes"]
K --> L["join_all(response_futures)"]
end
    
    subgraph "Stage 3: Encode & Emit Responses"
 
       L --> M["For each response"]
M --> N["dispatcher.respond()"]
N --> O["Chunk and serialize"]
O --> P["on_emit(bytes)"]
end
    
    style A fill:#f9f9f9
    style L fill:#f9f9f9
    style P fill:#f9f9f9

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-137

Stage 1: Frame Decoding

The first stage is synchronous and processes the raw byte stream:

  1. Decode Frames : dispatcher.read_bytes(bytes) parses the binary framing protocol and returns a list of request IDs that were affected by the incoming data.

  2. Identify Complete Requests : For each request ID, check is_rpc_request_finalized(id) to determine if the request is fully received.

  3. Extract Request Data : Call delete_rpc_request(id) to remove the complete request from the dispatcher's internal state and obtain the full RpcRequest structure.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:78-97

Stage 2: Asynchronous Handler Execution

The second stage executes all handlers concurrently:

Diagram: Concurrent Handler Execution

  1. Lookup Handler : Use METHOD_ID from the request to find the registered handler in the handlers map.

  2. Invoke Handler : If found, execute the handler closure with the request bytes and connection context. If not found, generate a NotFound error response.

  3. Await All : Use join_all to wait for all handler futures to complete before proceeding to Stage 3.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:99-126

Stage 3: Response Encoding

The third stage is synchronous and encodes responses back to the transport:

  1. Encode Response : Each RpcResponse is passed to dispatcher.respond() along with the chunk size and emit callback.

  2. Chunk and Serialize : The dispatcher chunks large responses and serializes them according to the binary framing protocol.

  3. Emit Bytes : The on_emit callback is invoked with each chunk of bytes, which the transport implementation sends over the network.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:127-137


Connection Context

Generic Context Type C

The endpoint interface is generic over a connection context type C that represents per-connection state. This context is cloned and passed to every handler invocation, allowing handlers to access:

  • Authentication credentials
  • Session data
  • Connection metadata (IP address, connection time, etc.)
  • Per-connection resources (database connections, etc.)

Context Requirements

Trait BoundReason
SendHandlers run in async tasks that may move between threads
SyncMultiple handlers may reference the same context concurrently
CloneEach handler receives its own clone of the context
'staticContexts must outlive handler invocations

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:9-11

Stateless Servers

For stateless servers, the context type can be ():

Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:37-43


classDiagram
    class WithHandlers~C~ {<<trait>>\n+with_handlers(F) Result~R~}
    
    class PrebufferedHandlers {<<HashMap>>\nu64 → Arc~HandlerFn~}
    
    class RwLock~PrebufferedHandlers~ {+read() RwLockReadGuard\n+write() RwLockWriteGuard}
    
    class TokioRwLock~PrebufferedHandlers~ {+read() RwLockReadGuard\n+write() RwLockWriteGuard}
    
    WithHandlers <|.. RwLock~PrebufferedHandlers~ : implements (std)
    WithHandlers <|.. TokioRwLock~PrebufferedHandlers~ : implements (tokio)
    RwLock --> PrebufferedHandlers : protects
    TokioRwLock --> PrebufferedHandlers : protects

Handler Storage with WithHandlers

The HandlersLock associated type must implement the WithHandlers<C> trait, which provides thread-safe access to the handler storage.

Diagram: Handler Storage Implementations

The with_handlers method provides a closure-based API for accessing the handler map, abstracting over different locking mechanisms (std RwLock vs tokio RwLock).

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs13 extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:46-62


Error Handling

Registration Errors

The register_prebuffered method returns RpcServiceEndpointError if:

ErrorCondition
RpcServiceEndpointError::HandlerA handler for the given METHOD_ID is already registered

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:49-52

Request Processing Errors

During read_bytes, errors can occur at multiple stages:

StageError TypeHandling
Frame DecodingRpcServiceEndpointError::DispatchReturned immediately, processing stops
Handler ExecutionBox<dyn Error>Converted to RpcResponse::error, sent to client
Response EncodingRpcServiceEndpointError::DispatchIgnored (best-effort response delivery)

Handler errors are caught and converted into error responses that are sent back to the client using the RPC error protocol. See RPC Service Errors for details on error serialization.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:74-137


graph LR
 
   A["RpcServer"] --> B["endpoint()"]
B --> C["Arc<RpcEndpoint>"]
C --> D["RpcServiceEndpointInterface impl"]
D --> E["register_prebuffered()"]
D --> F["read_bytes()"]
G["WebSocket Handler"] --> H["receive bytes"]
H --> F
 
   F --> I["on_emit callback"]
I --> J["send bytes"]

Integration with Transport Implementations

Server Implementation Pattern

Transport implementations like RpcServer implement RpcServiceEndpointInterface and provide an endpoint() method to obtain a reference for handler registration:

Diagram: Server-Endpoint Relationship

Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:26-61

Typical Usage Pattern

Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:26-70


Cross-Platform Compatibility

The RpcServiceEndpointInterface is runtime-agnostic and can be implemented for different async runtimes:

ImplementationRuntimeLocking Mechanism
muxio-tokio-rpc-serverTokiotokio::sync::RwLock
Custom implementationsAnystd::sync::RwLock or custom

The trait's design with async_trait allows both Tokio-based and non-Tokio implementations to coexist.

Sources: extensions/muxio-rpc-service-endpoint/Cargo.toml:21-27


Performance Considerations

Concurrent Handler Execution

The read_bytes method uses join_all to execute all handlers that can be dispatched from a single batch of incoming bytes. This maximizes throughput when multiple requests arrive simultaneously.

Zero-Copy Processing

Handlers receive Vec<u8> directly from the dispatcher without intermediate allocations. The binary framing protocol minimizes overhead during frame reassembly.

Handler Caching

Handlers are stored as Arc<Handler> in the handlers map, allowing them to be cloned efficiently when dispatching to multiple concurrent requests.

Sources: extensions/muxio-rpc-service-endpoint/src/endpoint_interface.rs:105-126


graph TB
 
   A["Integration Test"] --> B["Start RpcServer"]
B --> C["endpoint.register_prebuffered()"]
C --> D["server.serve_with_listener()"]
A --> E["Start RpcClient"]
E --> F["Method::call()"]
F --> G["WebSocket Transport"]
G --> H["Server read_bytes()"]
H --> I["Handler Execution"]
I --> J["Response"]
J --> K["Client receives result"]
K --> L["assert_eq!"]

Testing

Integration tests validate the endpoint interface by creating real server-client connections:

Diagram: Integration Test Flow

Tests cover:

  • Successful request-response roundtrips
  • Error propagation from handlers
  • Large payload handling (chunked transmission)
  • Method not found errors

Sources: extensions/muxio-tokio-rpc-client/tests/prebuffered_integration_tests.rs:19-97 extensions/muxio-wasm-rpc-client/tests/prebuffered_integration_tests.rs:40-142

Dismiss

Refresh this wiki

Enter email to refresh