This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Error Handling
Loading…
Error Handling
Relevant source files
- extensions/muxio-rpc-service-caller/src/caller_interface.rs
- src/rpc/rpc_dispatcher.rs
- src/rpc/rpc_internals/rpc_respondable_session.rs
Purpose and Scope
This document describes error handling strategies, error types, and failure modes throughout the rust-muxio RPC system. It covers how errors are detected, propagated across layers, and delivered to calling code. This includes transport failures, framing errors, RPC-level failures, and critical system failures like mutex poisoning.
For information about defining service errors in your own RPC methods, see Creating Service Definitions. For connection lifecycle management and state changes, see Connection Lifecycle and State Management.
Error Type Hierarchy
The muxio system uses a layered error model that mirrors its architectural layers. Each layer defines specific error types appropriate to its abstraction level.
graph TB
RpcServiceError["RpcServiceError"]
RpcError["RpcServiceError::Rpc"]
TransportError["RpcServiceError::Transport"]
RpcServiceErrorPayload["RpcServiceErrorPayload"]
RpcServiceErrorCode["RpcServiceErrorCode"]
IoError["std::io::Error"]
FrameDecodeError["FrameDecodeError"]
FrameEncodeError["FrameEncodeError"]
RpcResultStatus["RpcResultStatus"]
RpcServiceError --> RpcError
RpcServiceError --> TransportError
RpcError --> RpcServiceErrorPayload
RpcServiceErrorPayload --> RpcServiceErrorCode
TransportError --> IoError
IoError -.wraps.-> FrameDecodeError
RpcServiceErrorCode --> NotFound["NotFound"]
RpcServiceErrorCode --> Fail["Fail"]
RpcServiceErrorCode --> System["System"]
RpcResultStatus --> Success["Success"]
RpcResultStatus --> MethodNotFound["MethodNotFound"]
RpcResultStatus --> FailStatus["Fail"]
RpcResultStatus --> SystemError["SystemError"]
RpcResultStatus -.maps_to.-> RpcServiceErrorCode
Error Type Relationships
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:16-17
- extensions/muxio-rpc-service/src/error.rs (referenced)
RpcServiceError
RpcServiceError is the primary error type exposed to application code when making RPC calls. It has two variants:
| Variant | Description | Contains |
|---|---|---|
Rpc | Remote service returned an error | RpcServiceErrorPayload with code and message |
Transport | Connection or framing failure | std::io::Error |
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs42
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:254-257
RpcServiceErrorCode
Application-level error codes that indicate why an RPC call failed:
| Code | Meaning | Typical Cause |
|---|---|---|
NotFound | Method does not exist | Client calls unregistered method or method ID mismatch |
Fail | Method executed but failed | Handler returned an error |
System | Internal system error | Serialization failure, internal panic, resource exhaustion |
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:193-198
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:204-210
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:216-228
RpcResultStatus
Wire-format status codes transmitted in RPC response headers. These are converted to RpcServiceErrorCode on the client side:
| Status | Wire Byte | Maps To |
|---|---|---|
Success | 0x00 | (no error) |
MethodNotFound | N/A | RpcServiceErrorCode::NotFound |
Fail | N/A | RpcServiceErrorCode::Fail |
SystemError | N/A | RpcServiceErrorCode::System |
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:120-126
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:186-232
FrameDecodeError and FrameEncodeError
Low-level errors in the binary framing protocol:
FrameDecodeError: Occurs when incoming bytes cannot be parsed as valid frames (corrupt header, invalid stream ID, etc.)FrameEncodeError: Occurs when outgoing data cannot be serialized into frames (buffer issues, invalid state, etc.)
These errors are typically wrapped in io::Error and surfaced as RpcServiceError::Transport.
Sources:
Error Propagation Through Layers
Errors flow through multiple layers before reaching application code. The propagation path depends on whether the error originates from transport, framing, RPC protocol, or service logic.
sequenceDiagram
participant App as "Application Code"
participant Caller as "RpcServiceCallerInterface"
participant Dispatcher as "RpcDispatcher"
participant Session as "RpcSession"
participant Transport as "WebSocket Transport"
Note over Transport: Transport Error
Transport->>Session: read_bytes() returns Err
Session->>Dispatcher: FrameDecodeError
Dispatcher->>Dispatcher: fail_all_pending_requests()
Dispatcher->>Caller: RpcStreamEvent::Error
Caller->>Caller: Convert to RpcServiceError::Transport
Caller->>App: Err(RpcServiceError::Transport)
Note over Transport: RPC Method Error
Transport->>Session: Valid frames, status=Fail
Session->>Dispatcher: RpcStreamEvent::Header (status byte)
Dispatcher->>Caller: status=RpcResultStatus::Fail
Caller->>Caller: Convert to RpcServiceError::Rpc
Caller->>App: Err(RpcServiceError::Rpc)
Error Flow Diagram
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-284
- src/rpc/rpc_dispatcher.rs:187-206
- src/rpc/rpc_dispatcher.rs:422-456
Streaming vs Buffered Error Delivery
The system supports two error delivery modes depending on the RPC call type:
graph LR Error["Error Occurs"] --> RecvFn["recv_fn closure"] RecvFn --> Status["Parse RpcResultStatus"] Status --> ErrorBuffer["Buffer error payload"] ErrorBuffer --> End["RpcStreamEvent::End"] End --> Send["sender.send(Err(...))"] Send --> AppCode["Application receives Err from stream"]
Streaming Error Delivery
For streaming RPC calls using call_rpc_streaming(), errors are sent through the DynamicReceiver channel as they occur:
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:136-174
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:175-244
graph LR
Stream["call_rpc_streaming"] --> Loop["while let Some(result)"]
Loop --> CheckResult{"result?"}
CheckResult -->|Ok| Accumulate["success_buf.extend"]
CheckResult -->|Err| StoreError["err = Some(e); break"]
Accumulate --> Loop
StoreError --> Return["Err(rpc_service_error)"]
Loop -->|None| Decode["decode(success_buf)"]
Decode --> ReturnOk["Ok(T)"]
Buffered Error Delivery
For prebuffered RPC calls using call_rpc_buffered(), errors are accumulated until the stream ends, then returned as a Result<T, RpcServiceError>:
Sources:
graph TB
RecvFn["recv_fn(RpcStreamEvent)"]
Header["Header Event"]
Payload["PayloadChunk Event"]
End["End Event"]
Error["Error Event"]
RecvFn --> Header
RecvFn --> Payload
RecvFn --> End
RecvFn --> Error
Header --> ParseStatus["Parse RpcResultStatus from metadata"]
ParseStatus --> StoreStatus["Store in status Mutex"]
StoreStatus --> SendReady["Send readiness signal"]
Payload --> CheckStatus{"status?"}
CheckStatus -->|Success| SendChunk["sender.send(Ok(bytes))"]
CheckStatus -->|Error status| BufferError["error_buffer.extend(bytes)"]
End --> FinalStatus{"final status?"}
FinalStatus -->|MethodNotFound| SendNotFound["sender.send(Err(NotFound))"]
FinalStatus -->|Fail| SendFail["sender.send(Err(Fail))"]
FinalStatus -->|SystemError| SendSystem["sender.send(Err(SystemError))"]
FinalStatus -->|Success| Close["Close channel normally"]
Error --> CreateError["Create Transport error"]
CreateError --> SendError["sender.send(Err(Transport))"]
SendError --> DropSender["Drop sender"]
Error Handling in recv_fn Closure
The recv_fn closure in RpcServiceCallerInterface is the primary mechanism for receiving and transforming RPC stream events into application-level errors. It handles four event types:
RpcStreamEvent Processing
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:91-287
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:119-135
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:136-174
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:175-244
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:246-284
Error Buffering Logic
When a non-Success status is received, payload chunks are buffered into error_buffer instead of being sent to the application. This allows the complete error message to be assembled:
| Event Sequence | Status | Action |
|---|---|---|
| Header arrives | MethodNotFound | Store status, buffer subsequent payloads |
| PayloadChunk arrives | MethodNotFound | Append to error_buffer |
| PayloadChunk arrives | MethodNotFound | Append to error_buffer |
| End arrives | MethodNotFound | Decode error_buffer as error message, send Err(...) |
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:152-159
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:177-181
Disconnection and Transport Errors
Transport-level failures require special handling to prevent hanging requests and ensure prompt error delivery.
Connection State Checks
Before initiating any RPC call, the caller checks connection state:
Sources:
graph TB
DispatcherCall["dispatcher.call()"] --> WaitReady["ready_rx.await"]
WaitReady --> CheckResult{"Result?"}
CheckResult -->|Ok| ReturnEncoder["Return (encoder, rx)"]
CheckResult -->|Err| ReturnError["Return Err(Transport)"]
TransportFail["Transport fails"] --> SendError["ready_tx.send(Err(io::Error))"]
SendError --> WaitReady
ChannelDrop["Handler drops ready_tx"] --> ChannelClosed["ready_rx returns Err"]
ChannelClosed --> CheckResult
Readiness Channel Errors
The call_rpc_streaming() method uses a oneshot channel to signal when the RPC call is ready (header received). If this channel closes prematurely, it indicates a transport failure:
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:78-80
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:333-347
graph TB FrameError["FrameDecodeError occurs"] --> CreateEvent["RpcStreamEvent::Error"] CreateEvent --> RecvFn["recv_fn(Error event)"] RecvFn --> WrapError["Wrap as io::Error::ConnectionAborted"] WrapError --> NotifyReady["Send to ready_tx if pending"] NotifyReady --> NotifyStream["Send to DynamicSender"] NotifyStream --> Drop["Drop sender, close channel"]
RpcStreamEvent::Error Handling
When a FrameDecodeError occurs during stream processing, the session generates an RpcStreamEvent::Error:
Sources:
Critical Failure Modes
Certain failures are considered unrecoverable and result in immediate panic or cleanup.
graph TB
LockAttempt["queue.lock()"] --> CheckResult{"Result?"}
CheckResult -->|Ok| ProcessEvent["Process RpcStreamEvent"]
CheckResult -->|Err poisoned| Panic["panic!()"]
Panic --> CrashMsg["'Request queue mutex poisoned'"]
CrashMsg --> Note["Note: Prevents data corruption\nand undefined behavior"]
Mutex Poisoning
The rpc_request_queue in RpcDispatcher is protected by a Mutex. If a thread panics while holding this lock, the mutex becomes “poisoned.” This is treated as a critical failure:
Mutex Poisoning Handling
The rationale for panicking on mutex poisoning is documented in src/rpc/rpc_dispatcher.rs:85-97:
If the lock is poisoned, it likely means another thread panicked while holding the mutex. The internal state of the request queue may now be inconsistent or partially mutated. Continuing execution could result in incorrect dispatch behavior, undefined state transitions, or silent data loss. This should be treated as a critical failure and escalated appropriately.
Sources:
graph LR
ReadBytes["read_bytes()"] --> SessionRead["rpc_respondable_session.read_bytes()"]
SessionRead --> LockQueue["rpc_request_queue.lock()"]
LockQueue --> CheckLock{"lock()?"}
CheckLock -->|Ok| ReturnIds["Ok(active_request_ids)"]
CheckLock -->|Err poisoned| CorruptFrame["Err(FrameDecodeError::CorruptFrame)"]
FrameDecodeError as Critical Failure
When read_bytes() returns a FrameDecodeError, the dispatcher may also fail to lock the queue and return FrameDecodeError::CorruptFrame:
Sources:
graph TB
ConnDrop["Connection Dropped"] --> FailAll["fail_all_pending_requests(error)"]
FailAll --> TakeHandlers["mem::take(response_handlers)"]
TakeHandlers --> Iterate["For each (request_id, handler)"]
Iterate --> CreateSynthetic["Create synthetic Error event"]
CreateSynthetic --> CallHandler["handler(error_event)"]
CallHandler --> WakesFuture["Wakes waiting Future/stream"]
WakesFuture --> Iterate
Iterate --> Done["All handlers notified"]
fail_all_pending_requests Cleanup
When a connection drops, all pending RPC requests must be notified to prevent hanging futures. The fail_all_pending_requests() method performs this cleanup:
Cleanup Flow
The synthetic error event structure:
RpcStreamEvent::Error {
rpc_header: None,
rpc_request_id: Some(request_id),
rpc_method_id: None,
frame_decode_error: error.clone(),
}
Sources:
Handler Cleanup Guarantee
Taking ownership of the handlers (mem::take) ensures:
- The
response_handlersmap is immediately cleared - No new events can be routed to removed handlers
- Each handler is called exactly once with the error
- Waiting futures/streams are unblocked promptly
Sources:
graph TB
Prebuffering{"prebuffer_response?"}
Prebuffering -->|true| AccumulateMode["Accumulate mode"]
Prebuffering -->|false| StreamMode["Stream mode"]
AccumulateMode --> HeaderEvt["Header Event"]
HeaderEvt --> CallHandler["Call handler with Header"]
HeaderEvt --> PayloadEvt["PayloadChunk Events"]
PayloadEvt --> BufferBytes["buffer.extend_from_slice(bytes)"]
BufferBytes --> PayloadEvt
PayloadEvt --> EndEvt["End Event"]
EndEvt --> SendAll["Send entire buffer at once"]
SendAll --> CallEndHandler["Call handler with End"]
StreamMode --> StreamHeader["Header Event"]
StreamHeader --> StreamPayload["PayloadChunk Events"]
StreamPayload --> CallHandlerImmediate["Call handler for each chunk"]
CallHandlerImmediate --> StreamPayload
StreamPayload --> StreamEnd["End Event"]
Error Handling in Prebuffering
The RpcRespondableSession supports prebuffering mode where response payloads are accumulated before delivery. Error handling in this mode differs from streaming:
Prebuffering Error Accumulation
In prebuffering mode, if an error status is detected, the entire error payload is still buffered until the End event, then delivered as a single chunk.
Sources:
Standard Error Handling Patterns
Pattern 1: Immediate Rejection on Disconnect
Always check connection state before starting expensive operations:
Sources:
Pattern 2: Error Conversion at Boundaries
Convert lower-level errors to RpcServiceError at API boundaries:
Sources:
Pattern 3: Synchronous Error Handling in Callbacks
The recv_fn closure is synchronous and uses StdMutex to avoid async context issues:
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:75-77
- extensions/muxio-rpc-service-caller/src/caller_interface.rs112
Pattern 4: Tracing for Error Diagnosis
All error paths include structured logging using tracing:
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:249-252
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:268-273
Summary Table of Error Types and Handling
| Error Type | Layer | Handling Strategy | Recoverable? |
|---|---|---|---|
RpcServiceError::Rpc | Application | Return to caller | Yes |
RpcServiceError::Transport | Transport | Return to caller, cleanup handlers | No (requires reconnect) |
FrameDecodeError | Framing | Wrapped in io::Error, propagated up | No |
FrameEncodeError | Framing | Wrapped in io::Error, propagated up | No |
| Mutex poisoning | Internal | panic!() | No |
| Connection closed | Transport | fail_all_pending_requests() | No (requires reconnect) |
| Method not found | RPC Protocol | RpcServiceErrorCode::NotFound | Yes |
| Handler failure | Application | RpcServiceErrorCode::Fail or System | Yes |
Sources:
- extensions/muxio-rpc-service-caller/src/caller_interface.rs:16-17
- src/rpc/rpc_dispatcher.rs1
- src/rpc/rpc_dispatcher.rs:85-118
- src/rpc/rpc_dispatcher.rs:422-456