RateLimiter

class RateLimiter(val config: RateLimiterConfig = defaultRateLimiterConfig(), val semaphoreState: SemaphoreState = InMemorySemaphoreState()) : FlowEventListenerImpl<RateLimiterEvent> , SuspendableSemaphore, AutoCloseable(source)

The Rate Limiter is a proactive resilience mechanism that can be used to limit the number of requests that can be made to a system component, thereby controlling the consumption of resources and protecting the system from overloading. A rate limiter is initialized with a configuration that, through pre-configured policies, defines its behaviour.

In this implementation, the rate limiter uses a counting semaphore synchronization primitive to control the number of permits available for requests and an internal queue to store the excess requests that are waiting for permits to be available (if configured). Since a rate limiter can be used in distributed architectures, the semaphore state can be stored in a shared data store, such as a database, by implementing the SemaphoreState interface. The rate limiter represents a resource that must be closed when it is no longer needed, as it may hold resources that need to be released (e.g., semaphore state if stored externally).

Note: How long a request holds n permits is determined by the duration of the suspending function that the rate limiter decorates, therefore is not controlled by the rate limiter itself. It is the responsibility of the caller to ensure proper timeout handling to avoid a request holding permits indefinitely.

Parameters

config

The configuration for the rate limiter mechanism.

semaphoreState

The state of the semaphore. Defaults to an in-memory semaphore state.

See also

Constructors

Link copied to clipboard
constructor(config: RateLimiterConfig = defaultRateLimiterConfig(), semaphoreState: SemaphoreState = InMemorySemaphoreState())

Properties

Link copied to clipboard
Link copied to clipboard
open override val events: MutableSharedFlow<RateLimiterEvent>

A mutable shared flow that emits events of type Event. Such events can be listened to by:

Link copied to clipboard
open override val scope: CoroutineScope

The scope in which each listener will be launched upon registration.

Link copied to clipboard
Link copied to clipboard
Link copied to clipboard
private val wasDisposed: AtomicBoolean

Functions

Link copied to clipboard
open suspend override fun acquire(permits: Int, timeout: Duration)

Acquires n permits from the semaphore or suspends for the specified timeout if the permits are not available.

Link copied to clipboard
inline suspend fun <R> call(permits: Int = 1, timeout: Duration = config.baseTimeoutDuration, block: Supplier<R>): R

Decorates a Supplier with this rate limiter.

Link copied to clipboard
open override fun cancelListeners()

Cancels all listeners registered. Subsequent registrations should not be affected.

Link copied to clipboard
Link copied to clipboard
open override fun close()
Link copied to clipboard
open suspend override fun onEvent(action: suspend (RateLimiterEvent) -> Unit): Job

Registers a listener that will be called when an event of type Event is emitted.

Link copied to clipboard
protected inline suspend fun <EventType : RateLimiterEvent> onSpecificEvent(crossinline action: suspend (EventType) -> Unit): Job

Registers a listener that will be called when a specific subtype of Event is emitted.

Link copied to clipboard
open suspend override fun release(permits: Int)

Releases n permits back to the semaphore.