wrongmove/rec/circuit_breaker.py

189 lines
6.4 KiB
Python
Raw Normal View History

"""Circuit breaker pattern for protecting against cascading failures."""
from __future__ import annotations
import enum
import logging
import time
from dataclasses import dataclass
from rec.exceptions import CircuitBreakerOpenError
logger = logging.getLogger("uvicorn.error")
class CircuitState(enum.Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Too many failures, blocking requests
HALF_OPEN = "half_open" # Testing if service recovered
@dataclass
class CircuitBreaker:
"""Circuit breaker for protecting against cascading failures.
Implements the circuit breaker pattern:
- CLOSED: Requests pass through normally, failures are counted
- OPEN: After N consecutive failures, circuit opens and blocks all requests
- HALF_OPEN: After recovery timeout, allow one request to test if service recovered
Attributes:
failure_threshold: Number of consecutive failures before opening.
recovery_timeout: Seconds to wait before attempting half-open state.
state: Current circuit state.
failure_count: Count of consecutive failures.
last_failure_time: Timestamp of last failure.
last_state_change: Timestamp of last state change.
"""
failure_threshold: int
recovery_timeout: float
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: float = 0.0
last_state_change: float = 0.0
def __post_init__(self) -> None:
"""Initialize state change timestamp."""
self.last_state_change = time.time()
def call(self) -> None:
"""Check if a request should be allowed.
Raises:
CircuitBreakerOpenError: If circuit is open and blocking requests.
"""
current_time = time.time()
if self.state == CircuitState.OPEN:
# Check if we should transition to half-open
if current_time - self.last_failure_time >= self.recovery_timeout:
self._transition_to_half_open()
else:
raise CircuitBreakerOpenError(
f"Circuit breaker is open. "
f"Waiting {self.recovery_timeout - (current_time - self.last_failure_time):.1f}s "
f"before retry."
)
# Allow request to proceed (CLOSED or HALF_OPEN)
def record_success(self) -> None:
"""Record a successful request."""
if self.state == CircuitState.HALF_OPEN:
# Service has recovered, close the circuit
self._transition_to_closed()
# Reset failure count on success
self.failure_count = 0
def record_failure(self) -> None:
"""Record a failed request."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# Test request failed, reopen circuit
self._transition_to_open()
elif self.state == CircuitState.CLOSED:
# Check if we should open the circuit
if self.failure_count >= self.failure_threshold:
self._transition_to_open()
def _transition_to_open(self) -> None:
"""Transition to OPEN state."""
self.state = CircuitState.OPEN
self.last_state_change = time.time()
logger.warning(
f"Circuit breaker OPENED after {self.failure_count} consecutive failures. "
f"Will retry in {self.recovery_timeout}s"
)
def _transition_to_half_open(self) -> None:
"""Transition to HALF_OPEN state."""
self.state = CircuitState.HALF_OPEN
self.last_state_change = time.time()
logger.info("Circuit breaker entering HALF_OPEN state, testing service recovery")
def _transition_to_closed(self) -> None:
"""Transition to CLOSED state."""
self.state = CircuitState.CLOSED
self.last_state_change = time.time()
self.failure_count = 0
logger.info("Circuit breaker CLOSED, service recovered")
def reset(self) -> None:
"""Manually reset the circuit breaker to CLOSED state."""
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0.0
self.last_state_change = time.time()
logger.info("Circuit breaker manually reset to CLOSED state")
@property
def is_open(self) -> bool:
"""Check if circuit is currently open."""
return self.state == CircuitState.OPEN
@property
def is_closed(self) -> bool:
"""Check if circuit is currently closed."""
return self.state == CircuitState.CLOSED
@property
def is_half_open(self) -> bool:
"""Check if circuit is currently half-open."""
return self.state == CircuitState.HALF_OPEN
@property
def state_as_int(self) -> int:
"""Return the current state as an integer for metrics.
0 = closed, 1 = half_open, 2 = open.
"""
return {
CircuitState.CLOSED: 0,
CircuitState.HALF_OPEN: 1,
CircuitState.OPEN: 2,
}[self.state]
# ---------------------------------------------------------------------------
# Global circuit breaker instance used by the scraper
# ---------------------------------------------------------------------------
_global_circuit_breaker: CircuitBreaker | None = None
def get_circuit_breaker() -> CircuitBreaker | None:
"""Return the global circuit breaker, if one has been set."""
return _global_circuit_breaker
def set_global_circuit_breaker(cb: CircuitBreaker) -> None:
"""Set the global circuit breaker instance (called during scraper init)."""
global _global_circuit_breaker
_global_circuit_breaker = cb
def register_circuit_breaker_gauge() -> None:
"""Register an ObservableGauge that reports the circuit breaker state."""
try:
from opentelemetry.metrics import get_meter
meter = get_meter(__name__)
def _observe_cb_state(options: object) -> list: # type: ignore[type-arg]
from opentelemetry.sdk.metrics._internal.measurement import Measurement
cb = get_circuit_breaker()
value = cb.state_as_int if cb is not None else 0
return [Measurement(value)]
meter.create_observable_gauge(
"circuit_breaker_state",
callbacks=[_observe_cb_state],
description="Circuit breaker state: 0=closed, 1=half_open, 2=open",
)
except Exception:
pass # Metrics not initialised