CircuitBreaker

class CircuitBreaker(val config: CircuitBreakerConfig = defaultCircuitBreakerConfig()) : FlowEventListenerImpl<CircuitBreakerEvent> (source)

The Circuit Breaker is a reactive resilience mechanism that can be used to protect a system component from overloading or failing. By monitoring the health of the system, the circuit breaker can short-circuit execution requests when it detects that the system component is not behaving as expected. After a configurable timeout, the circuit breaker allows a limited number of test requests to pass through to see if the system has recovered. Depending on the test results, the circuit breaker can resume normal operation or continue to short-circuit requests. A circuit breaker is initialized with a configuration that, through pre-configured policies, define its behaviour. The circuit breaker implements the following state machine:

                failure rate exceeds
+--------+ or equals threshold +------+
| Closed | ---------------------> | Open |
+--------+ +------+
^ | ^
| after | | failure rate
| timeout | | exceeds or
| | | equals threshold
| failure rate v |
| below threshold +-----------+
|-------------------------- | Half-Open |
+-----------+
  • In the Closed state, the circuit breaker allows calls to execute the underlying operation, while recording the success or failure of these calls. When the failure rate exceeds a (configurable) threshold, the circuit breaker transitions to the Open state.

  • In the Open state, the circuit breaker rejects all received calls for a (configurable) amount of time and then transitions to the HalfOpen state.

  • In the HalfOpen state, the circuit breaker allows a (configurable) number of calls to test if the underlying operation is still failing. After all calls have been attempted, the circuit breaker transitions back to the Open state if newly calculated failure rate exceeds or equals the threshold; otherwise, it transitions to the Closed state.

Examples of usage:

// use predefined policies
val circuitBreaker = CircuitBreaker()

// use custom policies
val circuitBreaker = CircuitBreaker(
circuitBreakerConfig {
failureRateThreshold = 0.5
slidingWindow(size = 10, minimumThroughput = 10)
waitDurationInOpenState = 10.seconds
recordResultPredicate { it is "success" }
recordExceptionPredicate { it is NetworkError }
}
)

// get the current state of the circuit breaker
val observedState = circuitBreaker.currentState()

// wire the circuit breaker
circuitBreaker.wire()

// execute an operation under the circuit breaker
val result = circuitBreaker.executeOperation {
// operation
}

// listen to specific events
circuitBreaker.onCallNotPermitted {
// action
}

// listen to all events
circuitBreaker.onEvent {
// action
}

// cancel all registered listeners
circuitBreaker.cancelListeners()

// manually:
// - override the circuit breaker state
circuitBreaker.transitionToOpen()
// - reset the circuit breaker
circuitBreaker.reset()
// - record an operation success
circuitBreaker.recordSuccess()
// - record an operation failure
circuitBreaker.recordFailure()

Constructors

Link copied to clipboard
constructor(config: CircuitBreakerConfig = defaultCircuitBreakerConfig())

Properties

Link copied to clipboard
Link copied to clipboard
open override val events: MutableSharedFlow<CircuitBreakerEvent>

A mutable shared flow that emits events of type Event. Such events can be listened to by:

Link copied to clipboard
open override val scope: CoroutineScope

The scope in which each listener will be launched upon registration.

Link copied to clipboard
Link copied to clipboard

Functions

Link copied to clipboard
open override fun cancelListeners()

Cancels all listeners registered. Subsequent registrations should not be affected.

Link copied to clipboard

Returns the current state of the circuit breaker as a snapshot.

Link copied to clipboard
private inline suspend fun <R> executeAndDispatch(block: Supplier<R>): R

Executes the given operation and dispatches the necessary events based on its result and the configuration.

Link copied to clipboard
suspend fun <R> executeOperation(block: Supplier<R>): R

Executes the given operation decorated by this circuit breaker and returns its result, while handling any possible failure and emitting the necessary events.

Link copied to clipboard
private suspend fun <R> handleFailure(throwable: Throwable): R

Handles the possible failure of an operation decorated by the circuit breaker, by emitting the necessary events and rethrowing the exception.

Link copied to clipboard
suspend fun onCallNotPermitted(action: suspend (CircuitBreakerEvent.CallNotPermitted) -> Unit): Job

Executes the given action when a request is not permitted by the circuit breaker, either because it is in the Open state or the permitted number of calls in the HalfOpen state has been exceeded and the circuit breaker is still in the HalfOpen state.

Link copied to clipboard
open suspend override fun onEvent(action: suspend (CircuitBreakerEvent) -> Unit): Job

Executes the given action when a circuit breaker event occurs. This function can be used to listen to all retry events.

Link copied to clipboard
suspend fun onOperationFailure(action: suspend (CircuitBreakerEvent.RecordedFailure) -> Unit): Job

Executes the given action when an operation fails.

Link copied to clipboard
suspend fun onOperationSuccess(action: suspend (CircuitBreakerEvent.RecordedSuccess) -> Unit): Job

Executes the given action when an operation succeeds.

Link copied to clipboard
suspend fun onReset(action: suspend (CircuitBreakerEvent.Reset) -> Unit): Job

Executes the given action when the circuit breaker resets.

Link copied to clipboard
protected inline suspend fun <EventType : CircuitBreakerEvent> onSpecificEvent(crossinline action: suspend (EventType) -> Unit): Job

Registers a listener that will be called when a specific subtype of Event is emitted.

Link copied to clipboard
suspend fun onStateTransition(action: suspend (CircuitBreakerEvent.StateTransition) -> Unit): Job

Executes the given action when the circuit breaker transitions to a new state.

Link copied to clipboard
suspend fun recordFailure()

Records a failed operation execution.

Link copied to clipboard
suspend fun recordSuccess()

Records a successful operation execution.

Link copied to clipboard
suspend fun reset()

Resets the circuit breaker to the Closed state, clearing the sliding window of any recorded results.

Link copied to clipboard
private inline suspend fun <R> safeExecute(block: Supplier<R>): R

Executes the given operation and returns its result, while handling any possible failure.

Link copied to clipboard
private suspend fun throwCallIsNotPermitted(): Nothing
Link copied to clipboard

Transitions the circuit breaker to the Closed state, maintaining the recorded results.

Link copied to clipboard

Transitions the circuit breaker to the HalfOpen state, maintaining the recorded results.

Link copied to clipboard
suspend fun transitionToOpenState()

Transitions the circuit breaker to the Open state, maintaining the recorded results.

Link copied to clipboard
suspend fun wire()

Checks the current state of the circuit breaker and decides whether the operation is allowed to proceed. If the circuit breaker is in the Open state or in the HalfOpen state and the number of calls attempted does exceed the permitted number of calls in the half-open state, a CallNotPermittedException is thrown; otherwise, the operation is allowed to proceed.