"""
Auto-remediation manager for critical incidents.
- Persists incidents to data/incidents_log.json (append-only JSON lines)
- Triggers remediation actions based on incident type
- Adds Grafana annotations for visibility (best-effort)
- Implements simple 15-minute recurrence detection and adaptive threshold bump hook
Environment variables (optional):
- GRAFANA_URL, GRAFANA_API_TOKEN for annotations
Notes:
- File I/O is constrained under data/ per workspace safety rules
- All operations are best-effort; never raise from public APIs
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, Optional, List, Protocol, runtime_checkable, cast
import hashlib
import json
import os
import threading
import uuid
try: # optional dependency
from http_sync import request as _http_request # type: ignore
except Exception: # pragma: no cover
_http_request = None # type: ignore
# Prefer a module-level 'requests' attribute so tests can monkeypatch it easily.
# Import is optional; tests can override 'requests' on this module regardless.
try: # optional dependency in runtime; present in project requirements
import requests # type: ignore
except Exception: # pragma: no cover
requests = None # type: ignore
def _emit_event(event: str, severity: str = "info", **fields) -> None:
"""Dynamic event emitter to honor test-time monkeypatching.
Imports observability at call-time so stubs in sys.modules are respected.
"""
try:
from importlib import import_module
obs = import_module('observability')
fn = getattr(obs, 'emit_event', None)
if callable(fn):
fn(event, severity=severity, **fields)
except Exception:
return
@runtime_checkable
class _CacheLike(Protocol):
def clear_all(self) -> int: ...
def delete_pattern(self, pattern: str) -> int: ...
# Optional cache manager for remediation (typed via Protocol)
try:
from cache_manager import cache as _cache_instance
_cache: Optional[_CacheLike] = cast("Optional[_CacheLike]", _cache_instance)
except Exception: # pragma: no cover
_cache = None
# Optional DB reconnection
try:
from database.manager import DatabaseManager # type: ignore
except Exception: # pragma: no cover
DatabaseManager = None # type: ignore
_LOCK = threading.Lock()
_DATA_DIR = os.path.join("data")
_INCIDENTS_FILE = os.path.join(_DATA_DIR, "incidents_log.json")
_FIFTEEN_MIN = 15 * 60
def _ensure_dirs() -> None:
try:
os.makedirs(_DATA_DIR, exist_ok=True)
except Exception:
pass
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _hash_incident_key(name: str, metric: str, details: Dict[str, Any]) -> str:
# Recurrence grouping should ignore volatile details like current values
# to ensure the same incident type groups together across occurrences.
base = json.dumps({
"name": name,
"metric": metric,
}, sort_keys=True, ensure_ascii=False)
return hashlib.sha1(base.encode("utf-8")).hexdigest()[:12]
def _write_incident(record: Dict[str, Any]) -> None:
try:
_ensure_dirs()
line = json.dumps(record, ensure_ascii=False)
# Append-only JSONL for simplicity and robustness
with _LOCK:
with open(_INCIDENTS_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception:
pass
def _read_incidents(limit: Optional[int] = None) -> List[Dict[str, Any]]:
try:
with _LOCK:
if not os.path.exists(_INCIDENTS_FILE):
return []
items: List[Dict[str, Any]] = []
with open(_INCIDENTS_FILE, "r", encoding="utf-8") as f:
for line in f:
t = (line or "").strip()
if not t:
continue
try:
items.append(json.loads(t))
except Exception:
continue
return items[-limit:] if (limit and limit > 0) else items
except Exception:
return []
def _grafana_annotate(text: str) -> None:
base = os.getenv("GRAFANA_URL")
token = os.getenv("GRAFANA_API_TOKEN")
if not base or not token:
return
url = base.rstrip("/") + "/api/annotations"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {
"time": int(datetime.now(timezone.utc).timestamp() * 1000),
"tags": ["codebot", "incident"],
"text": text,
}
# Best-effort: prefer module-level 'requests' so tests can stub it easily.
try:
req = globals().get("requests", None)
if req is not None and hasattr(req, "post"):
try:
req.post(url, json=payload, headers=headers, timeout=5)
return
except Exception:
# Fall back to http_sync on failure
pass
# Fallback to pooled http client if available
if _http_request is not None:
try:
_http_request('POST', url, json=payload, headers=headers, timeout=5)
except Exception:
pass
except Exception:
# Never raise from annotation path
pass
def _restart_service(name: str) -> bool:
# Placeholder: in this project we don't manage processes; emit event only
try:
_emit_event("service_restart_attempt", severity="anomaly", service=str(name), handled=True)
return True
except Exception:
return False
def _clear_internal_cache() -> bool:
try:
if _cache is not None:
if hasattr(_cache, "clear_all"):
try:
_cache.clear_all()
except Exception:
pass
else:
try:
_cache.delete_pattern("*")
except Exception:
pass
_emit_event("cache_clear_attempt", severity="anomaly", handled=True)
return True
except Exception:
return False
def _reconnect_mongodb() -> bool:
try:
# שימוש במנהל הקיים מה-singleton (database.db) כדי לא ליצור instance חדש.
# אם צריך "לכפות" התחברות מחדש - נסגור ונפתח מחדש best-effort.
try:
from database import db as mgr # type: ignore
except Exception:
mgr = None # type: ignore
if mgr is not None:
try:
close_fn = getattr(mgr, "close_connection", None)
if callable(close_fn):
close_fn()
except Exception:
pass
try:
connect_fn = getattr(mgr, "connect", None)
if callable(connect_fn):
connect_fn()
except Exception:
pass
# Touch db attribute to initialize (best-effort)
_ = getattr(mgr, "db", None)
_emit_event("mongodb_reconnect_attempt", severity="anomaly", handled=True)
return True
except Exception:
return False
def _detect_recurring(kind_key: str, now_ts: float) -> bool:
try:
items = _read_incidents(limit=200)
cutoff = now_ts - _FIFTEEN_MIN
for rec in reversed(items):
if rec.get("kind_key") == kind_key:
try:
ts = rec.get("ts") or rec.get("timestamp")
t = datetime.fromisoformat(str(ts)).timestamp()
except Exception:
continue
if t >= cutoff:
return True
if t < cutoff:
return False
return False
except Exception:
return False
def _bump_adaptive_thresholds(metric: str, factor: float = 1.2) -> None:
try:
# Defer to alert_manager thresholds if available
from alert_manager import get_thresholds_snapshot # type: ignore
from metrics import set_adaptive_observability_gauges # type: ignore
snap = get_thresholds_snapshot() or {}
err_thr = float(snap.get("error_rate_percent", {}).get("threshold", 0.0) or 0.0)
lat_thr = float(snap.get("latency_seconds", {}).get("threshold", 0.0) or 0.0)
if metric == "error_rate_percent" and err_thr > 0.0:
err_thr *= float(factor)
if metric == "latency_seconds" and lat_thr > 0.0:
lat_thr *= float(factor)
set_adaptive_observability_gauges(
error_rate_threshold_percent=err_thr if err_thr > 0.0 else None,
latency_threshold_seconds=lat_thr if lat_thr > 0.0 else None,
)
except Exception:
return
[תיעוד]
def handle_critical_incident(name: str, metric: str, value: float, threshold: float, details: Optional[Dict[str, Any]] = None) -> str:
"""Main entrypoint: log incident, attempt remediation, annotate Grafana, and return incident_id.
name examples: "High Error Rate", "High Latency", "DB Connection Errors"
metric examples: "error_rate_percent", "latency_seconds", "db_connection_errors"
"""
try:
now = datetime.now(timezone.utc)
now_ts = now.timestamp()
incident_id = str(uuid.uuid4())
kind_key = _hash_incident_key(str(name), str(metric), details or {})
recurring = _detect_recurring(kind_key, now_ts)
action = "none"
name_l = (name or "").lower()
metric_l = (metric or "").lower()
try:
source_label = str((details or {}).get("source") or "").strip().lower()
except Exception:
source_label = ""
skip_remediation = source_label == "external"
if not skip_remediation and ("error rate" in name_l or metric_l == "error_rate_percent"):
action = "restart_service:webapp"
_restart_service("webapp")
elif not skip_remediation and ("latency" in name_l or metric_l == "latency_seconds"):
action = "clear_cache"
_clear_internal_cache()
elif not skip_remediation and ("db" in name_l or "mongo" in name_l or metric_l == "db_connection_errors"):
action = "reconnect_mongodb"
_reconnect_mongodb()
elif skip_remediation:
action = "skipped_external"
record: Dict[str, Any] = {
"incident_id": incident_id,
"ts": now.isoformat(),
"severity": "critical",
"name": str(name),
"metric": str(metric),
"value": float(value),
"threshold": float(threshold),
"response_action": action,
"recurring_issue": bool(recurring),
"kind_key": kind_key,
"source": source_label or None,
}
_write_incident(record)
# Increment actual incidents counter for Grafana 'Predicted vs Actual'
try:
from metrics import actual_incidents_total # type: ignore
if actual_incidents_total is not None:
actual_incidents_total.labels(metric=str(metric)).inc()
except Exception:
pass
event_name = "AUTO_REMEDIATION_SKIPPED" if skip_remediation else "AUTO_REMEDIATION_EXECUTED"
event_fields = {
"incident_id": incident_id,
"name": str(name),
"handled": (not skip_remediation),
}
if skip_remediation:
event_fields["reason"] = "external_source"
_emit_event(event_name, severity="anomaly", **event_fields)
try:
_grafana_annotate(f"{name} — action={action} recurring={recurring}")
except Exception:
pass
if recurring:
# Capture pre-bump snapshot to avoid double-bump when exporting gauges
before_err = before_lat = None
try:
from alert_manager import get_thresholds_snapshot # type: ignore
snap_before = get_thresholds_snapshot() or {}
before_err = float(snap_before.get("error_rate_percent", {}).get("threshold", 0.0) or 0.0)
before_lat = float(snap_before.get("latency_seconds", {}).get("threshold", 0.0) or 0.0)
except Exception:
before_err = before_lat = None
# Bump internal thresholds by 1.2x
try:
from alert_manager import bump_threshold # type: ignore
bump_threshold(kind=str(metric), factor=1.2)
except Exception:
pass
# Export gauges that reflect exactly one bump (no double multiplication)
try:
from metrics import set_adaptive_observability_gauges # type: ignore
if str(metric) == "error_rate_percent":
base = before_err if (before_err and before_err > 0.0) else float(threshold)
new_val = (base * 1.2) if base and base > 0.0 else None
set_adaptive_observability_gauges(error_rate_threshold_percent=new_val)
elif str(metric) == "latency_seconds":
base = before_lat if (before_lat and before_lat > 0.0) else float(threshold)
new_val = (base * 1.2) if base and base > 0.0 else None
set_adaptive_observability_gauges(latency_threshold_seconds=new_val)
except Exception:
pass
return incident_id
except Exception:
return ""
[תיעוד]
def get_incidents(limit: int = 50) -> List[Dict[str, Any]]:
return _read_incidents(limit=max(1, min(500, int(limit))))