Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

GitHub

This documentation is part of the "Projects with Books" initiative at zenOSmosis.

The source code for this project is available on GitHub.

Error Handling

Loading…

Error Handling

Relevant source files

Purpose and Scope

This document describes error handling strategies, error types, and failure modes throughout the rust-muxio RPC system. It covers how errors are detected, propagated across layers, and delivered to calling code. This includes transport failures, framing errors, RPC-level failures, and critical system failures like mutex poisoning.

For information about defining service errors in your own RPC methods, see Creating Service Definitions. For connection lifecycle management and state changes, see Connection Lifecycle and State Management.


Error Type Hierarchy

The muxio system uses a layered error model that mirrors its architectural layers. Each layer defines specific error types appropriate to its abstraction level.

graph TB
    RpcServiceError["RpcServiceError"]
RpcError["RpcServiceError::Rpc"]
TransportError["RpcServiceError::Transport"]
RpcServiceErrorPayload["RpcServiceErrorPayload"]
RpcServiceErrorCode["RpcServiceErrorCode"]
IoError["std::io::Error"]
FrameDecodeError["FrameDecodeError"]
FrameEncodeError["FrameEncodeError"]
RpcResultStatus["RpcResultStatus"]
RpcServiceError --> RpcError
 
   RpcServiceError --> TransportError
    
 
   RpcError --> RpcServiceErrorPayload
 
   RpcServiceErrorPayload --> RpcServiceErrorCode
    
 
   TransportError --> IoError
    IoError -.wraps.-> FrameDecodeError
    
 
   RpcServiceErrorCode --> NotFound["NotFound"]
RpcServiceErrorCode --> Fail["Fail"]
RpcServiceErrorCode --> System["System"]
RpcResultStatus --> Success["Success"]
RpcResultStatus --> MethodNotFound["MethodNotFound"]
RpcResultStatus --> FailStatus["Fail"]
RpcResultStatus --> SystemError["SystemError"]
RpcResultStatus -.maps_to.-> RpcServiceErrorCode

Error Type Relationships

Sources:

RpcServiceError

RpcServiceError is the primary error type exposed to application code when making RPC calls. It has two variants:

VariantDescriptionContains
RpcRemote service returned an errorRpcServiceErrorPayload with code and message
TransportConnection or framing failurestd::io::Error

Sources:

RpcServiceErrorCode

Application-level error codes that indicate why an RPC call failed:

CodeMeaningTypical Cause
NotFoundMethod does not existClient calls unregistered method or method ID mismatch
FailMethod executed but failedHandler returned an error
SystemInternal system errorSerialization failure, internal panic, resource exhaustion

Sources:

RpcResultStatus

Wire-format status codes transmitted in RPC response headers. These are converted to RpcServiceErrorCode on the client side:

StatusWire ByteMaps To
Success0x00(no error)
MethodNotFoundN/ARpcServiceErrorCode::NotFound
FailN/ARpcServiceErrorCode::Fail
SystemErrorN/ARpcServiceErrorCode::System

Sources:

FrameDecodeError and FrameEncodeError

Low-level errors in the binary framing protocol:

  • FrameDecodeError : Occurs when incoming bytes cannot be parsed as valid frames (corrupt header, invalid stream ID, etc.)
  • FrameEncodeError : Occurs when outgoing data cannot be serialized into frames (buffer issues, invalid state, etc.)

These errors are typically wrapped in io::Error and surfaced as RpcServiceError::Transport.

Sources:


Error Propagation Through Layers

Errors flow through multiple layers before reaching application code. The propagation path depends on whether the error originates from transport, framing, RPC protocol, or service logic.

sequenceDiagram
    participant App as "Application Code"
    participant Caller as "RpcServiceCallerInterface"
    participant Dispatcher as "RpcDispatcher"
    participant Session as "RpcSession"
    participant Transport as "WebSocket Transport"
    
    Note over Transport: Transport Error
    Transport->>Session: read_bytes() returns Err
    Session->>Dispatcher: FrameDecodeError
    Dispatcher->>Dispatcher: fail_all_pending_requests()
    Dispatcher->>Caller: RpcStreamEvent::Error
    Caller->>Caller: Convert to RpcServiceError::Transport
    Caller->>App: Err(RpcServiceError::Transport)
    
    Note over Transport: RPC Method Error
    Transport->>Session: Valid frames, status=Fail
    Session->>Dispatcher: RpcStreamEvent::Header (status byte)
    Dispatcher->>Caller: status=RpcResultStatus::Fail
    Caller->>Caller: Convert to RpcServiceError::Rpc
    Caller->>App: Err(RpcServiceError::Rpc)

Error Flow Diagram

Sources:

Streaming vs Buffered Error Delivery

The system supports two error delivery modes depending on the RPC call type:

graph LR
 
   Error["Error Occurs"] --> RecvFn["recv_fn closure"]
RecvFn --> Status["Parse RpcResultStatus"]
Status --> ErrorBuffer["Buffer error payload"]
ErrorBuffer --> End["RpcStreamEvent::End"]
End --> Send["sender.send(Err(...))"]
Send --> AppCode["Application receives Err from stream"]

Streaming Error Delivery

For streaming RPC calls using call_rpc_streaming(), errors are sent through the DynamicReceiver channel as they occur:

Sources:

graph LR
 
   Stream["call_rpc_streaming"] --> Loop["while let Some(result)"]
Loop --> CheckResult{"result?"}
CheckResult -->|Ok| Accumulate["success_buf.extend"]
CheckResult -->|Err| StoreError["err = Some(e); break"]
Accumulate --> Loop
 
   StoreError --> Return["Err(rpc_service_error)"]
Loop -->|None| Decode["decode(success_buf)"]
Decode --> ReturnOk["Ok(T)"]

Buffered Error Delivery

For prebuffered RPC calls using call_rpc_buffered(), errors are accumulated until the stream ends, then returned as a Result<T, RpcServiceError>:

Sources:


graph TB
    RecvFn["recv_fn(RpcStreamEvent)"]
Header["Header Event"]
Payload["PayloadChunk Event"]
End["End Event"]
Error["Error Event"]
RecvFn --> Header
 
   RecvFn --> Payload
 
   RecvFn --> End
 
   RecvFn --> Error
    
 
   Header --> ParseStatus["Parse RpcResultStatus from metadata"]
ParseStatus --> StoreStatus["Store in status Mutex"]
StoreStatus --> SendReady["Send readiness signal"]
Payload --> CheckStatus{"status?"}
CheckStatus -->|Success| SendChunk["sender.send(Ok(bytes))"]
CheckStatus -->|Error status| BufferError["error_buffer.extend(bytes)"]
End --> FinalStatus{"final status?"}
FinalStatus -->|MethodNotFound| SendNotFound["sender.send(Err(NotFound))"]
FinalStatus -->|Fail| SendFail["sender.send(Err(Fail))"]
FinalStatus -->|SystemError| SendSystem["sender.send(Err(SystemError))"]
FinalStatus -->|Success| Close["Close channel normally"]
Error --> CreateError["Create Transport error"]
CreateError --> SendError["sender.send(Err(Transport))"]
SendError --> DropSender["Drop sender"]

Error Handling in recv_fn Closure

The recv_fn closure in RpcServiceCallerInterface is the primary mechanism for receiving and transforming RPC stream events into application-level errors. It handles four event types:

RpcStreamEvent Processing

Sources:

Error Buffering Logic

When a non-Success status is received, payload chunks are buffered into error_buffer instead of being sent to the application. This allows the complete error message to be assembled:

Event SequenceStatusAction
Header arrivesMethodNotFoundStore status, buffer subsequent payloads
PayloadChunk arrivesMethodNotFoundAppend to error_buffer
PayloadChunk arrivesMethodNotFoundAppend to error_buffer
End arrivesMethodNotFoundDecode error_buffer as error message, send Err(...)

Sources:


Disconnection and Transport Errors

Transport-level failures require special handling to prevent hanging requests and ensure prompt error delivery.

Connection State Checks

Before initiating any RPC call, the caller checks connection state:

Sources:

graph TB
 
   DispatcherCall["dispatcher.call()"] --> WaitReady["ready_rx.await"]
WaitReady --> CheckResult{"Result?"}
CheckResult -->|Ok| ReturnEncoder["Return (encoder, rx)"]
CheckResult -->|Err| ReturnError["Return Err(Transport)"]
TransportFail["Transport fails"] --> SendError["ready_tx.send(Err(io::Error))"]
SendError --> WaitReady
    
 
   ChannelDrop["Handler drops ready_tx"] --> ChannelClosed["ready_rx returns Err"]
ChannelClosed --> CheckResult

Readiness Channel Errors

The call_rpc_streaming() method uses a oneshot channel to signal when the RPC call is ready (header received). If this channel closes prematurely, it indicates a transport failure:

Sources:

graph TB
 
   FrameError["FrameDecodeError occurs"] --> CreateEvent["RpcStreamEvent::Error"]
CreateEvent --> RecvFn["recv_fn(Error event)"]
RecvFn --> WrapError["Wrap as io::Error::ConnectionAborted"]
WrapError --> NotifyReady["Send to ready_tx if pending"]
NotifyReady --> NotifyStream["Send to DynamicSender"]
NotifyStream --> Drop["Drop sender, close channel"]

RpcStreamEvent::Error Handling

When a FrameDecodeError occurs during stream processing, the session generates an RpcStreamEvent::Error:

Sources:


Critical Failure Modes

Certain failures are considered unrecoverable and result in immediate panic or cleanup.

graph TB
 
   LockAttempt["queue.lock()"] --> CheckResult{"Result?"}
CheckResult -->|Ok| ProcessEvent["Process RpcStreamEvent"]
CheckResult -->|Err poisoned| Panic["panic!()"]
Panic --> CrashMsg["'Request queue mutex poisoned'"]
CrashMsg --> Note["Note: Prevents data corruption\nand undefined behavior"]

Mutex Poisoning

The rpc_request_queue in RpcDispatcher is protected by a Mutex. If a thread panics while holding this lock, the mutex becomes “poisoned.” This is treated as a critical failure:

Mutex Poisoning Handling

The rationale for panicking on mutex poisoning is documented in src/rpc/rpc_dispatcher.rs:85-97:

If the lock is poisoned, it likely means another thread panicked while holding the mutex. The internal state of the request queue may now be inconsistent or partially mutated. Continuing execution could result in incorrect dispatch behavior, undefined state transitions, or silent data loss. This should be treated as a critical failure and escalated appropriately.

Sources:

graph LR
 
   ReadBytes["read_bytes()"] --> SessionRead["rpc_respondable_session.read_bytes()"]
SessionRead --> LockQueue["rpc_request_queue.lock()"]
LockQueue --> CheckLock{"lock()?"}
CheckLock -->|Ok| ReturnIds["Ok(active_request_ids)"]
CheckLock -->|Err poisoned| CorruptFrame["Err(FrameDecodeError::CorruptFrame)"]

FrameDecodeError as Critical Failure

When read_bytes() returns a FrameDecodeError, the dispatcher may also fail to lock the queue and return FrameDecodeError::CorruptFrame:

Sources:


graph TB
 
   ConnDrop["Connection Dropped"] --> FailAll["fail_all_pending_requests(error)"]
FailAll --> TakeHandlers["mem::take(response_handlers)"]
TakeHandlers --> Iterate["For each (request_id, handler)"]
Iterate --> CreateSynthetic["Create synthetic Error event"]
CreateSynthetic --> CallHandler["handler(error_event)"]
CallHandler --> WakesFuture["Wakes waiting Future/stream"]
WakesFuture --> Iterate
    
 
   Iterate --> Done["All handlers notified"]

fail_all_pending_requests Cleanup

When a connection drops, all pending RPC requests must be notified to prevent hanging futures. The fail_all_pending_requests() method performs this cleanup:

Cleanup Flow

The synthetic error event structure:

RpcStreamEvent::Error {
    rpc_header: None,
    rpc_request_id: Some(request_id),
    rpc_method_id: None,
    frame_decode_error: error.clone(),
}

Sources:

Handler Cleanup Guarantee

Taking ownership of the handlers (mem::take) ensures:

  1. The response_handlers map is immediately cleared
  2. No new events can be routed to removed handlers
  3. Each handler is called exactly once with the error
  4. Waiting futures/streams are unblocked promptly

Sources:


graph TB
    Prebuffering{"prebuffer_response?"}
Prebuffering -->|true| AccumulateMode["Accumulate mode"]
Prebuffering -->|false| StreamMode["Stream mode"]
AccumulateMode --> HeaderEvt["Header Event"]
HeaderEvt --> CallHandler["Call handler with Header"]
HeaderEvt --> PayloadEvt["PayloadChunk Events"]
PayloadEvt --> BufferBytes["buffer.extend_from_slice(bytes)"]
BufferBytes --> PayloadEvt
 
   PayloadEvt --> EndEvt["End Event"]
EndEvt --> SendAll["Send entire buffer at once"]
SendAll --> CallEndHandler["Call handler with End"]
StreamMode --> StreamHeader["Header Event"]
StreamHeader --> StreamPayload["PayloadChunk Events"]
StreamPayload --> CallHandlerImmediate["Call handler for each chunk"]
CallHandlerImmediate --> StreamPayload
 
   StreamPayload --> StreamEnd["End Event"]

Error Handling in Prebuffering

The RpcRespondableSession supports prebuffering mode where response payloads are accumulated before delivery. Error handling in this mode differs from streaming:

Prebuffering Error Accumulation

In prebuffering mode, if an error status is detected, the entire error payload is still buffered until the End event, then delivered as a single chunk.

Sources:


Standard Error Handling Patterns

Pattern 1: Immediate Rejection on Disconnect

Always check connection state before starting expensive operations:

Sources:

Pattern 2: Error Conversion at Boundaries

Convert lower-level errors to RpcServiceError at API boundaries:

Sources:

Pattern 3: Synchronous Error Handling in Callbacks

The recv_fn closure is synchronous and uses StdMutex to avoid async context issues:

Sources:

Pattern 4: Tracing for Error Diagnosis

All error paths include structured logging using tracing:

Sources:


Summary Table of Error Types and Handling

Error TypeLayerHandling StrategyRecoverable?
RpcServiceError::RpcApplicationReturn to callerYes
RpcServiceError::TransportTransportReturn to caller, cleanup handlersNo (requires reconnect)
FrameDecodeErrorFramingWrapped in io::Error, propagated upNo
FrameEncodeErrorFramingWrapped in io::Error, propagated upNo
Mutex poisoningInternalpanic!()No
Connection closedTransportfail_all_pending_requests()No (requires reconnect)
Method not foundRPC ProtocolRpcServiceErrorCode::NotFoundYes
Handler failureApplicationRpcServiceErrorCode::Fail or SystemYes

Sources: