"""
Persistent GitHub backoff state management.
Backoff is a global switch to reduce GitHub API traffic when nearing
rate limits or during maintenance. State is persisted in the DB so it
survives restarts, with an in-memory cache for fast reads.
"""
from __future__ import annotations
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
try:
from observability import emit_event # type: ignore
except Exception: # pragma: no cover
[תיעוד]
def emit_event(event: str, severity: str = "info", **fields): # type: ignore
return None
_LOCK = threading.RLock()
[תיעוד]
@dataclass
class BackoffInfo:
enabled: bool
reason: str = ""
updated_at: datetime = datetime.now(timezone.utc)
expires_at: Optional[datetime] = None
[תיעוד]
def is_active(self) -> bool:
if not self.enabled:
return False
if self.expires_at is not None and datetime.now(timezone.utc) >= self.expires_at:
return False
return True
[תיעוד]
class BackoffState:
"""Backoff state persisted in DB with memory cache."""
[תיעוד]
def __init__(self) -> None:
self._cached: Optional[BackoffInfo] = None
# --- DB helpers ---
def _load_from_db(self) -> Optional[BackoffInfo]:
try:
from database import db # lazy import to avoid cycles
users = getattr(db, "db", None) and getattr(db.db, "users", None)
if not users:
return None
doc = users.find_one({"_id": "__global_state__"})
if not isinstance(doc, dict):
return None
gh = (doc.get("github") or {}).get("backoff") or {}
enabled = bool(gh.get("enabled", False))
reason = str(gh.get("reason", "") or "")
updated_at = self._parse_dt(gh.get("updated_at")) or datetime.now(timezone.utc)
expires_at = self._parse_dt(gh.get("expires_at"))
return BackoffInfo(enabled=enabled, reason=reason, updated_at=updated_at, expires_at=expires_at)
except Exception:
return None
def _save_to_db(self, info: BackoffInfo) -> None:
try:
from database import db # lazy import
users = getattr(db, "db", None) and getattr(db.db, "users", None)
if not users:
return
payload = {
"github": {
"backoff": {
"enabled": bool(info.enabled),
"reason": str(info.reason or ""),
"updated_at": info.updated_at,
"expires_at": info.expires_at,
}
}
}
# Persist under the same key used by _load_from_db
users.update_one({"_id": "__global_state__"}, {"$set": payload}, upsert=True)
except Exception:
# Non-fatal: remain with memory cache
pass
@staticmethod
def _parse_dt(val: Any) -> Optional[datetime]:
if val is None:
return None
if isinstance(val, datetime):
return val
try:
return datetime.fromisoformat(str(val))
except Exception:
return None
# --- Public API ---
[תיעוד]
def get(self, refresh: bool = False) -> BackoffInfo:
with _LOCK:
if refresh or self._cached is None:
loaded = self._load_from_db()
if loaded is not None:
self._cached = loaded
elif self._cached is None:
self._cached = BackoffInfo(enabled=False)
# Auto-deactivate if expired
if self._cached.expires_at and datetime.now(timezone.utc) >= self._cached.expires_at:
# Auto-disable and persist so restart/refresh won't re-enable
self._cached.enabled = False
try:
self._save_to_db(self._cached)
except Exception:
pass
return self._cached
[תיעוד]
def enable(self, *, reason: str = "", ttl_minutes: Optional[int] = None) -> BackoffInfo:
with _LOCK:
expires_at = None
if ttl_minutes is not None and ttl_minutes > 0:
expires_at = datetime.now(timezone.utc) + timedelta(minutes=int(ttl_minutes))
info = BackoffInfo(enabled=True, reason=reason or "manual", updated_at=datetime.now(timezone.utc), expires_at=expires_at)
self._cached = info
self._save_to_db(info)
emit_event("github_backoff_enabled", severity="warn", reason=info.reason, ttl_minutes=int(ttl_minutes or 0))
return info
[תיעוד]
def disable(self, *, reason: str = "") -> BackoffInfo:
with _LOCK:
info = BackoffInfo(enabled=False, reason=reason or "manual", updated_at=datetime.now(timezone.utc), expires_at=None)
self._cached = info
self._save_to_db(info)
emit_event("github_backoff_disabled", severity="info", reason=info.reason)
return info
[תיעוד]
def toggle(self, *, reason: str = "", ttl_minutes: Optional[int] = None) -> BackoffInfo:
cur = self.get()
return self.disable(reason=reason or "toggle") if cur.is_active() else self.enable(reason=reason or "toggle", ttl_minutes=ttl_minutes)
# Global singleton
state = BackoffState()