הראה קוד מקור ל alert_manager

"""
Adaptive alert manager for Smart Observability v4.

- Maintains rolling 3h window of request samples (status, latency)
- Recomputes adaptive thresholds every 5 minutes based on mean + 3*sigma
- Emits critical alerts when current 5m stats breach adaptive thresholds
- Sends critical alerts to Telegram and Grafana annotations and logs dispatches

Environment variables used (optional; fail-open if missing):

- ALERT_TELEGRAM_BOT_TOKEN, ALERT_TELEGRAM_CHAT_ID
- GRAFANA_URL (e.g. https://grafana.example.com)
- GRAFANA_API_TOKEN (Bearer token)
- ALERT_COOLDOWN_SEC (override global cooldown between alerts)
- ALERT_THRESHOLD_SCALE / ALERT_ERROR_THRESHOLD_SCALE / ALERT_LATENCY_THRESHOLD_SCALE
  (multiply adaptive thresholds to widen the "normal" band)
- ALERT_MIN_SAMPLE_COUNT / ALERT_ERROR_MIN_SAMPLE_COUNT / ALERT_LATENCY_MIN_SAMPLE_COUNT
  (require a minimum number of samples in the evaluation window before alerting)
"""
from __future__ import annotations

from collections import Counter, defaultdict, deque
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Deque, Dict, List, Tuple, Optional
import asyncio
import math
import os
import threading
import time
import uuid

try:
    from http_sync import request  # type: ignore
except Exception:  # pragma: no cover
    request = None  # type: ignore

try:  # structured events (optional)
    from observability import emit_event  # type: ignore
except Exception:  # pragma: no cover
[תיעוד] def emit_event(event: str, severity: str = "info", **fields): # type: ignore return None
# --- Configuration helpers --- def _parse_float_env(name: str, default: float) -> float: try: raw = os.getenv(name) if raw is None or not str(raw).strip(): return float(default) return float(raw) except Exception: return float(default) def _parse_int_env(name: str, default: int) -> int: try: raw = os.getenv(name) if raw is None or not str(raw).strip(): return int(default) return int(float(raw)) except Exception: return int(default) def _get_positive_float(name: str, default: float) -> float: val = _parse_float_env(name, default) return val if val > 0 else float(default) def _get_non_negative_int(name: str, default: int) -> int: val = _parse_int_env(name, default) return val if val >= 0 else int(default) # --- Startup warmup gate (avoid noisy internal alerts during deploy/cold start) --- # בזמן warmup אנחנו חוסמים את ה-monitor הפנימי לחלוטין כדי למנוע רעשי דיפלוי. _DEFAULT_STARTUP_GRACE_PERIOD_SECONDS = 1200 # 20 minutes _STARTUP_GRACE_PERIOD_SECONDS = _get_non_negative_int( "ALERT_STARTUP_GRACE_PERIOD_SECONDS", _DEFAULT_STARTUP_GRACE_PERIOD_SECONDS, ) _START_TIME = time.time() # --- In-memory state --- _WINDOW_SEC = 3 * 60 * 60 # 3 hours _RECOMPUTE_EVERY_SEC = 5 * 60 # 5 minutes _COOLDOWN_SEC = 5 * 60 # minimum gap between alerts of same type _COOLDOWN_SEC = _get_non_negative_int("ALERT_COOLDOWN_SEC", _COOLDOWN_SEC) _DEFAULT_THRESHOLD_SCALE = _get_positive_float("ALERT_THRESHOLD_SCALE", 1.0) _THRESHOLD_SCALES = { "error_rate_percent": _get_positive_float("ALERT_ERROR_THRESHOLD_SCALE", _DEFAULT_THRESHOLD_SCALE), "latency_seconds": _get_positive_float("ALERT_LATENCY_THRESHOLD_SCALE", _DEFAULT_THRESHOLD_SCALE), } # Optional minimum floors to prevent zero/near-zero thresholds causing noise bursts _MIN_ERR_RATE_FLOOR = max(0.0, _parse_float_env("ALERT_MIN_ERROR_RATE_PERCENT", 5.0)) _MIN_LATENCY_FLOOR = max(0.0, _parse_float_env("ALERT_MIN_LATENCY_SECONDS", 1.0)) _DEFAULT_MIN_SAMPLE_COUNT = _get_non_negative_int("ALERT_MIN_SAMPLE_COUNT", 15) _ERROR_MIN_SAMPLE_COUNT = _get_non_negative_int("ALERT_ERROR_MIN_SAMPLE_COUNT", _DEFAULT_MIN_SAMPLE_COUNT) _LATENCY_MIN_SAMPLE_COUNT = _get_non_negative_int("ALERT_LATENCY_MIN_SAMPLE_COUNT", _DEFAULT_MIN_SAMPLE_COUNT) _MIN_SAMPLE_REQUIREMENTS = { "error_rate_percent": _ERROR_MIN_SAMPLE_COUNT, "latency_seconds": _LATENCY_MIN_SAMPLE_COUNT, } _EXTERNAL_WARNING_THRESHOLD = max(0.0, _parse_float_env("ALERT_EXTERNAL_ERROR_RATE_THRESHOLD", 20.0)) _EXTERNAL_WARNING_MIN_SAMPLE_COUNT = max(1, _get_non_negative_int("ALERT_EXTERNAL_MIN_SAMPLE_COUNT", 5)) _Sample = Tuple[float, bool, float, str] _samples: Deque[_Sample] = deque(maxlen=200_000) # (ts, is_error, latency_s, source) _error_contexts: Deque[Tuple[float, Dict[str, Any]]] = deque(maxlen=5_000) _ERROR_CONTEXT_WINDOW_SEC = 5 * 60 _REQUEST_CONTEXTS: Deque[Tuple[float, Dict[str, Any]]] = deque(maxlen=20_000) _REQUEST_CONTEXT_WINDOW_SEC = max(60, int(os.getenv("REQUEST_CONTEXT_WINDOW_SECONDS", "900") or 900)) _last_recompute_ts: float = 0.0 # Guard for lazy state initialization (avoid races under concurrent calls). _MONGO_SERVERSTATUS_STATE_LOCK = threading.Lock() @dataclass class _MetricThreshold: mean: float = 0.0 std: float = 0.0 threshold: float = 0.0 # mean + 3*sigma updated_at_ts: float = 0.0 _thresholds = { "error_rate_percent": _MetricThreshold(), "latency_seconds": _MetricThreshold(), } _last_alert_ts: Dict[str, float] = defaultdict(float) # Log of dispatches for audit/observability v4 _DISPATCH_LOG_MAX = int(os.getenv("ALERT_DISPATCH_LOG_MAX", "500") or 500) _dispatch_log: Deque[Dict[str, Any]] = deque(maxlen=max(50, _DISPATCH_LOG_MAX))
[תיעוד] def reset_state_for_tests() -> None: """Clear in-memory buffers (for unit tests). Not intended for production use. """ _samples.clear() # Make sure unit tests are not blocked by startup warmup. global _START_TIME try: _START_TIME = time.time() - float(_STARTUP_GRACE_PERIOD_SECONDS or 0) - 1.0 except Exception: _START_TIME = time.time() - 3600.0 global _last_recompute_ts _last_recompute_ts = 0.0 for k in list(_thresholds.keys()): _thresholds[k] = _MetricThreshold() _dispatch_log.clear() _last_alert_ts.clear() _error_contexts.clear() _REQUEST_CONTEXTS.clear()
def _now() -> float: return time.time() def _normalize_sample_source(source: Optional[str]) -> str: try: text = str(source or "").strip().lower() except Exception: text = "" if text.startswith("external"): return "external" return "internal"
[תיעוד] def note_request( status_code: int, duration_seconds: float, ts: Optional[float] = None, *, context: Optional[Dict[str, Any]] = None, source: Optional[str] = None, ) -> None: """Record a single request sample into the 3h rolling window. ts: optional epoch seconds (for tests). Defaults to current time. """ try: t = float(ts if ts is not None else _now()) is_error = int(status_code) >= 500 normalized_source = _normalize_sample_source(source) _samples.append((t, bool(is_error), max(0.0, float(duration_seconds)), normalized_source)) if is_error and context: _record_error_context(t, context) if context: _record_request_context(t, duration_seconds, normalized_source, context) _evict_older_than(t - _WINDOW_SEC) _recompute_if_due(t) except Exception: return
def _record_request_context( ts: float, duration_seconds: float, source: str, context: Dict[str, Any], ) -> None: """Keep a small rolling buffer of request contexts for quick-fix enrichment. This is intentionally best-effort and keeps only a minimal subset of keys to avoid memory bloat and accidental PII retention. """ try: # Only keep contexts that can help explain slowness/root cause. queue_delay_ms = None for key in ("queue_delay_ms", "queue_delay", "queue_time_ms", "queue_ms"): try: raw = context.get(key) except Exception: raw = None if raw in (None, ""): continue try: queue_delay_ms = int(float(raw)) except Exception: queue_delay_ms = None if queue_delay_ms is not None: queue_delay_ms = max(0, queue_delay_ms) break duration_ms = int(max(0.0, float(duration_seconds)) * 1000.0) kept: Dict[str, Any] = { "source": str(source or ""), "duration_ms": duration_ms, } if queue_delay_ms is not None: kept["queue_delay_ms"] = int(queue_delay_ms) # Minimal routing context (no message text) for key in ("path", "endpoint", "route", "method", "handler", "command", "component", "request_id"): try: val = context.get(key) except Exception: val = None if val in (None, ""): continue try: kept[key] = str(val)[:256] except Exception: continue _REQUEST_CONTEXTS.append((float(ts), kept)) cutoff = float(ts) - float(_REQUEST_CONTEXT_WINDOW_SEC) while _REQUEST_CONTEXTS and _REQUEST_CONTEXTS[0][0] < cutoff: _REQUEST_CONTEXTS.popleft() except Exception: return def _evict_older_than(min_ts: float) -> None: try: while _samples and _samples[0][0] < min_ts: _samples.popleft() except Exception: return def _recompute_if_due(now_ts: Optional[float] = None) -> None: global _last_recompute_ts t = float(now_ts if now_ts is not None else _now()) if (t - _last_recompute_ts) < _RECOMPUTE_EVERY_SEC: return _last_recompute_ts = t try: _recompute_thresholds(t) except Exception: return def _recompute_thresholds(now_ts: float) -> None: # Build per-minute buckets for last 3 hours start_ts = now_ts - _WINDOW_SEC per_minute: Dict[int, Dict[str, float]] = defaultdict(lambda: {"count": 0.0, "errors": 0.0, "lat_sum": 0.0}) for ts, is_err, lat, sample_source in list(_samples): if ts < start_ts: continue if sample_source != "internal": continue minute = int(ts // 60) b = per_minute[minute] b["count"] += 1.0 b["errors"] += 1.0 if is_err else 0.0 b["lat_sum"] += float(lat) # If no data, keep thresholds at zero if not per_minute: for k in _thresholds: _thresholds[k] = _MetricThreshold(updated_at_ts=now_ts) _update_gauges(None, None, None, None) return # Compute per-minute error rate (%) and avg latency (s) err_rates: List[float] = [] avg_lats: List[float] = [] for _, b in sorted(per_minute.items()): c = b["count"] if c <= 0: continue err_pct = (b["errors"] / c) * 100.0 err_rates.append(err_pct) avg_lats.append(b["lat_sum"] / c) def _mean_std(values: List[float]) -> Tuple[float, float]: if not values: return 0.0, 0.0 mean = sum(values) / float(len(values)) if len(values) == 1: return mean, 0.0 var = sum((v - mean) ** 2 for v in values) / float(len(values)) return mean, math.sqrt(var) err_mean, err_std = _mean_std(err_rates) lat_mean, lat_std = _mean_std(avg_lats) err_thr = max(0.0, err_mean + 3.0 * err_std) * _THRESHOLD_SCALES["error_rate_percent"] lat_thr = max(0.0, lat_mean + 3.0 * lat_std) * _THRESHOLD_SCALES["latency_seconds"] # Apply safety floors (configurable via env) err_thr = max(err_thr, _MIN_ERR_RATE_FLOOR) lat_thr = max(lat_thr, _MIN_LATENCY_FLOOR) _thresholds["error_rate_percent"] = _MetricThreshold( mean=err_mean, std=err_std, threshold=err_thr, updated_at_ts=now_ts ) _thresholds["latency_seconds"] = _MetricThreshold( mean=lat_mean, std=lat_std, threshold=lat_thr, updated_at_ts=now_ts ) # Update Prometheus gauges (best-effort, lazy import to avoid cycles) try: # שמור עקביות: ספים מחושבים רק על דגימות פנימיות, ולכן גם ה-"current" gauges # צריכים לשקף את אותו מקור (ולא לכלול תקלות חיצוניות/Circuit Breaker). cur_err = get_current_error_rate_percent(window_sec=5 * 60, source="internal") cur_lat = get_current_avg_latency_seconds(window_sec=5 * 60, source="internal") _update_gauges(err_thr, lat_thr, cur_err, cur_lat) except Exception: _update_gauges(err_thr, lat_thr, None, None) def _update_gauges( err_thr_pct: Optional[float], lat_thr_sec: Optional[float], cur_err_pct: Optional[float], cur_lat_sec: Optional[float], ) -> None: try: from metrics import set_adaptive_observability_gauges # type: ignore set_adaptive_observability_gauges( error_rate_threshold_percent=err_thr_pct, latency_threshold_seconds=lat_thr_sec, current_error_rate_percent=cur_err_pct, current_latency_avg_seconds=cur_lat_sec, ) except Exception: return def _update_external_error_rate_gauge(value: Optional[float]) -> None: try: from metrics import set_external_error_rate_percent # type: ignore set_external_error_rate_percent(value) except Exception: return def _collect_recent_samples(window_sec: int, source_filter: Optional[str] = None) -> Tuple[int, int, float]: try: now_ts = _now() start = now_ts - max(1, int(window_sec)) total = 0 errors = 0 sum_lat = 0.0 target_source = None if source_filter: lowered = str(source_filter).strip().lower() if lowered in {"internal", "external"}: target_source = lowered for ts, is_err, lat, sample_source in reversed(_samples): if ts < start: break if target_source is not None and sample_source != target_source: continue total += 1 if is_err: errors += 1 sum_lat += float(lat) return total, errors, sum_lat except Exception: return 0, 0, 0.0 def _sanitize_error_context(context: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: if not isinstance(context, dict): return None allowed_keys = ("command", "handler", "path", "method", "request_id", "user_id", "source", "component") sanitized: Dict[str, str] = {} for key in allowed_keys: try: value = context.get(key) except Exception: continue if value in (None, ""): continue try: text = str(value).strip() except Exception: continue if not text: continue limit = 64 if key in {"request_id", "user_id"} else 200 sanitized[key] = text[:limit] return sanitized or None def _record_error_context(ts: float, context: Optional[Dict[str, Any]]) -> None: sanitized = _sanitize_error_context(context) if not sanitized: return try: _error_contexts.append((ts, sanitized)) except Exception: pass def _recent_error_contexts(now_ts: float, window_sec: int = _ERROR_CONTEXT_WINDOW_SEC) -> List[Dict[str, Any]]: cutoff = now_ts - max(1, int(window_sec)) while _error_contexts and _error_contexts[0][0] < cutoff: try: _error_contexts.popleft() except Exception: break return [ctx for _, ctx in list(_error_contexts)] def _queue_delay_stats(now_ts: float, *, window_sec: int = 300, source: Optional[str] = None) -> Dict[str, Any]: """Compute best-effort queue-delay stats from recent request contexts.""" try: window = max(1, int(window_sec)) except Exception: window = 300 cutoff = float(now_ts) - float(window) target_source = None if source: try: s = str(source).strip().lower() except Exception: s = "" if s in {"internal", "external"}: target_source = s # Evict old entries (best-effort) try: while _REQUEST_CONTEXTS and _REQUEST_CONTEXTS[0][0] < cutoff: _REQUEST_CONTEXTS.popleft() except Exception: pass values: List[int] = [] try: for ts, ctx in list(_REQUEST_CONTEXTS): if ts < cutoff: continue if not isinstance(ctx, dict): continue if target_source is not None: try: if str(ctx.get("source") or "").strip().lower() != target_source: continue except Exception: continue q = ctx.get("queue_delay_ms") if q in (None, ""): continue try: values.append(max(0, int(float(q)))) except Exception: continue except Exception: return {} if not values: return {} values.sort() n = len(values) avg = int(round(sum(values) / float(n))) # P95 via "nearest-rank" (avoid underestimating for small samples). # Example: n=10 -> ceil(0.95*10)-1 = 9 (max element), not 8. try: p95_idx = int(math.ceil(0.95 * float(n))) - 1 except Exception: p95_idx = n - 1 p95_idx = max(0, min(n - 1, int(p95_idx))) p95 = int(values[p95_idx]) mx = int(values[-1]) return { "queue_delay_ms_avg": avg, "queue_delay_ms_p95": p95, "queue_delay_ms_max": mx, "queue_delay_samples": int(n), } def _system_resource_snapshot() -> Dict[str, Any]: """Best-effort resource snapshot (CPU/RAM + in-flight requests).""" out: Dict[str, Any] = {} try: import psutil # type: ignore try: out["cpu_percent"] = float(psutil.cpu_percent(interval=None)) except Exception: pass try: vm = psutil.virtual_memory() out["memory_percent"] = float(getattr(vm, "percent", 0.0) or 0.0) except Exception: pass except Exception: pass # Active requests + RSS are tracked in metrics (best-effort, avoid hard dependency) try: from metrics import get_active_requests_count, get_current_memory_usage # type: ignore try: out["active_requests"] = int(get_active_requests_count()) except Exception: pass try: out["memory_mb"] = round(float(get_current_memory_usage()), 2) except Exception: pass except Exception: pass return {k: v for k, v in out.items() if v not in (None, "")} def _mongo_connections_snapshot() -> Dict[str, Any]: """Best-effort MongoDB connections snapshot. NOTE: This is *not* PyMongo client-side pool utilization; it's server-reported connections.current/available which is still a useful pressure signal. """ # חשוב: פונקציה זו נקראת מנתיבי בקשה (דרך metrics.record_request_outcome -> check_and_emit_alerts), # ולכן אסור לה לבצע IO חוסם. אנחנו מחזירים Cache ומרעננים ברקע (best-effort). out: Dict[str, Any] = {} try: from database import db as db_manager # type: ignore except Exception: return {} try: client = getattr(db_manager, "client", None) except Exception: client = None if client is None: return {} # --- cache + refresh scheduling --- # Module-level cache to avoid blocking the main thread / event loop. # We keep the state on the function object to minimize global clutter and preserve backwards compatibility. state = getattr(_mongo_connections_snapshot, "_state", None) if state is None: # Avoid creating multiple independent state dicts under concurrency. with _MONGO_SERVERSTATUS_STATE_LOCK: state = getattr(_mongo_connections_snapshot, "_state", None) if state is None: state = { "lock": threading.Lock(), "ts": 0.0, "data": {}, "inflight": False, } setattr(_mongo_connections_snapshot, "_state", state) def _refresh_interval_seconds() -> float: # Default 5s to match guides. Allow tuning via env. try: return max(1.0, float(os.getenv("MONGO_SERVERSTATUS_REFRESH_SEC", "5") or "5")) except Exception: return 5.0 def _parse_status(status_obj: Any) -> Dict[str, Any]: try: connections = status_obj.get("connections") if isinstance(status_obj, dict) else None except Exception: connections = None if not isinstance(connections, dict): return {} current = connections.get("current") available = connections.get("available") try: cur_i = int(current) avail_i = int(available) except Exception: return {} total = max(0, cur_i) + max(0, avail_i) util = (float(max(0, cur_i)) / float(total) * 100.0) if total > 0 else 0.0 return { "db_connections_current": int(max(0, cur_i)), "db_connections_available": int(max(0, avail_i)), "db_pool_utilization_pct": round(util, 1), } def _set_cache(data: Dict[str, Any]) -> None: try: with state["lock"]: state["data"] = dict(data or {}) state["ts"] = float(time.time()) except Exception: return def _mark_inflight(val: bool) -> None: try: with state["lock"]: state["inflight"] = bool(val) except Exception: return def _should_refresh(now_ts: float) -> bool: try: with state["lock"]: last_ts = float(state.get("ts") or 0.0) inflight = bool(state.get("inflight")) except Exception: last_ts = 0.0 inflight = False age = max(0.0, float(now_ts - last_ts)) return (not inflight) and (age >= _refresh_interval_seconds()) def _schedule_refresh() -> None: # Atomically check + mark inflight under the same lock to prevent TOCTOU races # that can trigger multiple concurrent refreshes. now_ts = time.time() try: with state["lock"]: last_ts = float(state.get("ts") or 0.0) inflight = bool(state.get("inflight")) age = max(0.0, float(now_ts - last_ts)) if inflight or age < _refresh_interval_seconds(): return state["inflight"] = True except Exception: return # Try async Motor first (if admin.command is awaitable), otherwise offload sync PyMongo to executor/thread. try: cmd = getattr(getattr(client, "admin", None), "command", None) except Exception: cmd = None def _finalize() -> None: _mark_inflight(False) # Async path (Motor-like) try: if callable(cmd): try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop is not None: try: maybe_coro = cmd("serverStatus") # type: ignore[misc] except Exception: maybe_coro = None if maybe_coro is not None and asyncio.iscoroutine(maybe_coro): try: task = loop.create_task(maybe_coro) except Exception: # Prevent "coroutine was never awaited" warnings when task creation fails # (e.g. loop is closing). try: maybe_coro.close() # type: ignore[attr-defined] except Exception: pass raise def _done_cb(fut: "asyncio.Future[Any]") -> None: # type: ignore[name-defined] try: res = fut.result() _set_cache(_parse_status(res)) except Exception: pass finally: _finalize() task.add_done_callback(_done_cb) return except Exception: # If anything fails here, fall back to sync executor below. pass # Sync path (PyMongo-like): run in executor if loop exists, otherwise spawn a daemon thread. def _job_sync() -> None: try: status_obj = client.admin.command("serverStatus") # type: ignore[attr-defined] _set_cache(_parse_status(status_obj)) except Exception: pass finally: _finalize() try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop is not None: try: loop.run_in_executor(None, _job_sync) return except Exception: pass try: threading.Thread(target=_job_sync, name="mongo-serverstatus-refresh", daemon=True).start() except Exception: # If we can't even spawn a thread, just mark done and move on. _finalize() # Kick a refresh if stale, but always return cached snapshot immediately. try: _schedule_refresh() except Exception: pass try: with state["lock"]: cached = dict(state.get("data") or {}) except Exception: cached = {} return cached def _derive_feature_from_command(command: Optional[str]) -> Optional[str]: if not command: return None try: text = str(command).strip() except Exception: return None if not text: return None if ":" in text: candidate = text.split(":")[-1] else: candidate = text candidate = candidate.strip() return candidate or None def _format_error_rate_meta_summary( label: Optional[str], current_pct: float, threshold_pct: float, request_id: Optional[str], ) -> str: parts: List[str] = [] if label: parts.append(label) parts.append(f"error_rate {current_pct:.2f}% > {max(threshold_pct, 0.0):.2f}%") if request_id: short = request_id[:8] parts.append(f"request {short}") return " · ".join(parts) def _build_high_error_rate_meta( current_pct: float, threshold_pct: float, sample_count: int, now_ts: float, window_sec: int, preferred_source: Optional[str] = None, ) -> Dict[str, Any]: contexts = _recent_error_contexts(now_ts, window_sec) normalized_pref = (preferred_source or "").strip().lower() or None if normalized_pref: filtered = [ctx for ctx in contexts if (ctx.get("source") or "internal").lower() == normalized_pref] if filtered: contexts = filtered meta: Dict[str, Any] = { "alert_type": "high_error_rate", "metric": "error_rate_percent", "metric_name": "error_rate_percent", "graph_metric": "error_rate_percent", "error_rate_percent": round(current_pct, 4), "threshold_percent": round(threshold_pct, 4), "sample_count": int(sample_count), "window_seconds": int(window_sec), "recent_error_context_count": len(contexts), } dominant_label = None if contexts: commands = [ctx.get("command") for ctx in contexts if ctx.get("command")] dominant_command = None if commands: counts = Counter(commands) dominant_command = counts.most_common(1)[0][0] if dominant_command: meta["command"] = dominant_command feature = _derive_feature_from_command(dominant_command) if feature: meta["feature"] = feature dominant_label = feature else: dominant_label = dominant_command latest = contexts[-1] endpoint = latest.get("path") or latest.get("handler") if endpoint: meta["endpoint"] = endpoint if dominant_label is None: dominant_label = endpoint method = latest.get("method") if method: meta["method"] = method source = latest.get("source") if source and "source" not in meta: meta["source"] = source component = latest.get("component") if component: meta["component"] = component request_id = latest.get("request_id") if request_id: meta["request_id"] = request_id user_id = latest.get("user_id") if user_id: meta["user_id"] = user_id else: request_id = None if "request_id" not in meta: request_id = None else: request_id = str(meta["request_id"]) label = dominant_label or "high_error_rate" meta["meta_summary"] = _format_error_rate_meta_summary(label, current_pct, threshold_pct, request_id) return meta def _enrich_alert_details(key: str, details: Dict[str, Any], now_ts: float) -> Dict[str, Any]: if str(key) == "latency_seconds": enriched = dict(details or {}) # Normalize alert_type so dashboards/runbooks can match reliably. enriched.setdefault("alert_type", "slow_response") # Provide a consistent duration_ms signal for quick-fix decisioning. if "duration_ms" not in enriched: try: cur = float(enriched.get("current_seconds") or 0.0) if cur > 0: enriched["duration_ms"] = int(round(cur * 1000.0)) except Exception: pass # Best-effort queue-delay stats from recent request contexts. try: window_sec = int(enriched.get("window_seconds") or 300) except Exception: window_sec = 300 try: enriched.update(_queue_delay_stats(now_ts, window_sec=window_sec, source=enriched.get("source"))) except Exception: pass # Best-effort resource snapshot (cpu/mem + active_requests). try: enriched.update(_system_resource_snapshot()) except Exception: pass # Best-effort DB "pool" pressure snapshot (connections utilization). try: enriched.update(_mongo_connections_snapshot()) except Exception: pass return enriched if str(key) != "error_rate_percent": return details try: current_pct = float(details.get("current_percent") or details.get("error_rate_percent") or 0.0) except Exception: current_pct = 0.0 try: threshold_pct = float(details.get("threshold_percent") or _thresholds.get("error_rate_percent", _MetricThreshold()).threshold or 0.0) except Exception: threshold_pct = 0.0 try: sample_count = int(details.get("sample_count") or 0) except Exception: sample_count = 0 try: window_sec = int(details.get("window_seconds") or _ERROR_CONTEXT_WINDOW_SEC) except Exception: window_sec = _ERROR_CONTEXT_WINDOW_SEC preferred_source = None try: src = details.get("source") if src: preferred_source = str(src).strip().lower() or None except Exception: preferred_source = None meta = _build_high_error_rate_meta( current_pct, threshold_pct, sample_count, now_ts, window_sec, preferred_source=preferred_source, ) existing_source = details.get("source") details.update(meta) if existing_source is not None: details["source"] = existing_source return details
[תיעוד] def get_current_error_rate_percent(window_sec: int = 300, *, source: Optional[str] = None) -> float: """Return error rate percent for the last window (default 5 minutes).""" try: total, errors, _ = _collect_recent_samples(window_sec, source_filter=source) if total <= 0: return 0.0 return (float(errors) / float(total)) * 100.0 except Exception: return 0.0
[תיעוד] def get_current_avg_latency_seconds(window_sec: int = 300, *, source: Optional[str] = None) -> float: """Return average latency in seconds for the last window (default 5 minutes).""" try: total, _errors, sum_lat = _collect_recent_samples(window_sec, source_filter=source) if total <= 0: return 0.0 return float(sum_lat) / float(total) except Exception: return 0.0
[תיעוד] def get_request_stats_between( start_dt: datetime, end_dt: datetime, *, source: Optional[str] = None, percentiles: Tuple[int, ...] = (50, 95, 99), ) -> Dict[str, float]: """Best-effort request stats for an arbitrary time window מתוך ה-buffer בזיכרון. מחזיר: - total (int as float) - errors (int as float) - p50/p95/p99 (seconds) אם יש דגימות הערה: היסטוריה זמינה רק עד ~3 שעות (WINDOW_SEC). """ try: # Normalize inputs to UTC timestamps if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=timezone.utc) else: start_dt = start_dt.astimezone(timezone.utc) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=timezone.utc) else: end_dt = end_dt.astimezone(timezone.utc) except Exception: start_dt = start_dt.replace(tzinfo=timezone.utc) end_dt = end_dt.replace(tzinfo=timezone.utc) start_ts = float(start_dt.timestamp()) end_ts = float(end_dt.timestamp()) if end_ts <= start_ts: return {"total": 0.0, "errors": 0.0} src_norm = _normalize_sample_source(source) if source else None total = 0 errors = 0 lats: List[float] = [] try: for ts, is_err, lat, sample_source in list(_samples): if ts < start_ts or ts > end_ts: continue if src_norm and sample_source != src_norm: continue total += 1 if is_err: errors += 1 try: lats.append(float(lat)) except Exception: continue except Exception: return {"total": float(total), "errors": float(errors)} out: Dict[str, float] = {"total": float(total), "errors": float(errors)} if not lats: return out lats.sort() def _nearest_rank(p: int) -> float: n = len(lats) if n <= 0: return 0.0 try: k = int(math.ceil((float(p) / 100.0) * float(n))) - 1 except Exception: k = 0 k = max(0, min(n - 1, k)) return float(lats[k]) try: pcts = tuple(int(p) for p in (percentiles or (50, 95, 99))) except Exception: pcts = (50, 95, 99) for p in pcts: if 0 < int(p) < 100: out[f"p{int(p)}"] = _nearest_rank(int(p)) return out
[תיעוד] def bump_threshold(kind: str, factor: float = 1.2) -> None: """Multiply the current adaptive threshold by a factor and refresh gauges (best-effort).""" try: thr = _thresholds.get(kind) if not thr: return current = float(thr.threshold or 0.0) if current <= 0.0: return thr.threshold = current * float(factor) thr.updated_at_ts = _now() try: cur_err = get_current_error_rate_percent(window_sec=5 * 60, source="internal") cur_lat = get_current_avg_latency_seconds(window_sec=5 * 60, source="internal") except Exception: cur_err = None cur_lat = None if kind == "error_rate_percent": _update_gauges(thr.threshold, None, cur_err, cur_lat) elif kind == "latency_seconds": _update_gauges(None, thr.threshold, cur_err, cur_lat) except Exception: return
[תיעוד] def get_thresholds_snapshot() -> Dict[str, Dict[str, float]]: """Return the latest thresholds for external consumers/tests.""" out: Dict[str, Dict[str, float]] = {} try: for k, v in _thresholds.items(): out[k] = { "mean": float(v.mean or 0.0), "std": float(v.std or 0.0), "threshold": float(v.threshold or 0.0), "updated_at_ts": float(v.updated_at_ts or 0.0), } except Exception: pass return out
def _should_fire(kind: str, value: float, sample_count: Optional[int] = None) -> bool: thr = _thresholds.get(kind) if not thr: return False if sample_count is not None: try: min_required = int(_MIN_SAMPLE_REQUIREMENTS.get(kind, 0)) except Exception: min_required = 0 if sample_count < max(0, min_required): return False return value > max(0.0, float(thr.threshold or 0.0)) def _emit_warning_once(key: str, name: str, summary: str, details: Dict[str, Any], now_ts: float) -> None: try: details = dict(details or {}) except Exception: details = {} last = _last_alert_ts.get(key, 0.0) if (now_ts - last) < _COOLDOWN_SEC: return _last_alert_ts[key] = now_ts try: from internal_alerts import emit_internal_alert # type: ignore emit_internal_alert(name=name, severity="warning", summary=summary, **details) except Exception: try: emit_event("external_warning", severity="warning", name=name, summary=summary, **details) except Exception: pass
[תיעוד] def check_and_emit_alerts(now_ts: Optional[float] = None) -> None: """Evaluate current stats vs adaptive thresholds and emit critical alerts when breached. Cooldowns ensure we do not spam more than once per 5 minutes per alert type. """ try: uptime = time.time() - float(_START_TIME or 0.0) if float(_STARTUP_GRACE_PERIOD_SECONDS or 0) > 0 and uptime < float(_STARTUP_GRACE_PERIOD_SECONDS): return except Exception: # Fail-open: if anything goes wrong, do not block alerting. pass t = float(now_ts if now_ts is not None else _now()) try: _recompute_if_due(t) window_sec = 5 * 60 internal_total, internal_errors, internal_latency_sum = _collect_recent_samples(window_sec, source_filter="internal") external_total, external_errors, _ = _collect_recent_samples(window_sec, source_filter="external") # Error rate (internal only) cur_err_pct = (float(internal_errors) / float(internal_total) * 100.0) if internal_total > 0 else 0.0 try: err_threshold = float(_thresholds.get("error_rate_percent", _MetricThreshold()).threshold or 0.0) except Exception: err_threshold = 0.0 if internal_total > 0 and _should_fire("error_rate_percent", cur_err_pct, internal_total): details = { "current_percent": round(cur_err_pct, 4), "sample_count": int(internal_total), "error_count": int(internal_errors), "threshold_percent": round(err_threshold, 4), "window_seconds": int(window_sec), "metric": "error_rate_percent", "graph_metric": "error_rate_percent", "source": "internal", } if external_total: details["external_sample_count"] = int(external_total) details["external_error_count"] = int(external_errors) _emit_critical_once( key="error_rate_percent", name="High Error Rate", summary=f"error_rate={cur_err_pct:.2f}% > threshold={_thresholds['error_rate_percent'].threshold:.2f}%", details=details, now_ts=t, ) # Latency cur_lat = (float(internal_latency_sum) / float(internal_total)) if internal_total > 0 else 0.0 if internal_total > 0 and _should_fire("latency_seconds", cur_lat, internal_total): _emit_critical_once( key="latency_seconds", name="High Latency", summary=f"avg_latency={cur_lat:.3f}s > threshold={_thresholds['latency_seconds'].threshold:.3f}s", details={ "current_seconds": round(cur_lat, 4), "sample_count": int(internal_total), "window_seconds": int(window_sec), "source": "internal", }, now_ts=t, ) # External tracking (warning only) external_err_pct = (float(external_errors) / float(external_total) * 100.0) if external_total > 0 else 0.0 _update_external_error_rate_gauge(external_err_pct if external_total > 0 else 0.0) if ( external_total >= _EXTERNAL_WARNING_MIN_SAMPLE_COUNT and external_err_pct >= _EXTERNAL_WARNING_THRESHOLD ): warning_details = { "current_percent": round(external_err_pct, 4), "sample_count": int(external_total), "error_count": int(external_errors), "window_seconds": int(window_sec), "threshold_percent": round(_EXTERNAL_WARNING_THRESHOLD, 4), "source": "external", } # Best-effort: derive "which service" from recent external error contexts. # This relies on metrics.record_request_outcome(...) feeding alert_manager.note_request(...) # with a context that includes component/command/path. If unavailable, we keep it empty. try: contexts = _recent_error_contexts(t, window_sec) ext = [c for c in (contexts or []) if str(c.get("source") or "").lower().startswith("external")] if ext: def _most_common(key: str) -> Optional[str]: vals = [str(c.get(key)).strip() for c in ext if c.get(key)] if not vals: return None return Counter(vals).most_common(1)[0][0] # Prefer component → command → path/handler service_label = ( _most_common("component") or _most_common("command") or _most_common("path") or _most_common("handler") ) if service_label: warning_details["service"] = service_label except Exception: pass _emit_warning_once( key="external_error_rate_percent", name="External Service Degraded", summary=f"external_error_rate={external_err_pct:.2f}% > threshold={_EXTERNAL_WARNING_THRESHOLD:.2f}%", details=warning_details, now_ts=t, ) except Exception: return
def _emit_critical_once(key: str, name: str, summary: str, details: Dict[str, Any], now_ts: float) -> None: try: details = dict(details or {}) except Exception: details = {} try: details = _enrich_alert_details(key, details, now_ts) except Exception: pass last = _last_alert_ts.get(key, 0.0) if (now_ts - last) < _COOLDOWN_SEC: return _last_alert_ts[key] = now_ts # Auto-remediation & incident logging (best-effort) # Safety switch: אל תפעיל remediation על Drill is_drill = False try: is_drill = bool(details.get("is_drill")) or bool((details.get("metadata") or {}).get("is_drill")) except Exception: is_drill = False if is_drill: try: emit_event( "drill_remediation_skipped", severity="anomaly", handled=True, name=str(name), metric=str(key), ) except Exception: pass else: try: from remediation_manager import handle_critical_incident # type: ignore try: if key == "error_rate_percent": current_val = float(details.get("current_percent", 0.0) or 0.0) elif key == "latency_seconds": current_val = float(details.get("current_seconds", 0.0) or 0.0) else: current_val = 0.0 except Exception: current_val = 0.0 try: thr_val = float(_thresholds.get(key, _MetricThreshold()).threshold or 0.0) except Exception: thr_val = 0.0 handle_critical_incident(name=name, metric=key, value=current_val, threshold=thr_val, details=details) except Exception: pass try: from internal_alerts import emit_internal_alert # type: ignore except Exception: emit_internal_alert = None # type: ignore try: if emit_internal_alert is not None: # internal_alerts will forward critical alerts via alert_manager.forward_critical_alert # to avoid duplicate dispatches, do not call _notify_critical_external when this path is taken emit_internal_alert(name=name, severity="critical", summary=summary, **details) else: # Fallback: ensure external delivery directly _notify_critical_external(name=name, summary=summary, details=details) except Exception: # Still attempt external sinks for visibility try: _notify_critical_external(name=name, summary=summary, details=details) except Exception: pass
[תיעוד] def forward_critical_alert(name: str, summary: str, **details: Any) -> None: """Public API: forward a critical alert to sinks and log dispatches.""" _notify_critical_external(name=name, summary=summary, details=details)
def _notify_critical_external(name: str, summary: str, details: Dict[str, Any]) -> None: """Send critical alert to sinks with silence enforcement and log dispatches. Enforcement: - If a matching active silence exists (pattern on name, optional severity), do not send to sinks but still record the alert in DB with silenced=true for transparency. """ # Safety switch: Drill לא יוצא לסינקים, אבל כן נשמר ב-DB לצורך צפייה ב-Observability. try: is_drill = bool(details.get("is_drill")) or bool((details.get("metadata") or {}).get("is_drill")) except Exception: is_drill = False if is_drill: try: from monitoring.alerts_storage import record_alert # type: ignore record_alert( alert_id=str(uuid.uuid4()), name=str(name), severity="critical", summary=str(summary), source="drill", silenced=True, details=details if isinstance(details, dict) else None, ) except Exception: pass try: emit_event( "drill_critical_blocked", severity="anomaly", handled=True, name=str(name), ) except Exception: pass return alert_id = str(uuid.uuid4()) text = _format_text(name=name, severity="CRITICAL", summary=summary, details=details) # Silences check (best-effort) silenced = False silence_info: Optional[Dict[str, Any]] = None try: from monitoring.silences import is_silenced # type: ignore silenced, silence_info = is_silenced(name=name, severity="critical") except Exception: silenced, silence_info = False, None # Persist to MongoDB (best-effort) for unified counts across services; include silenced flag try: from monitoring.alerts_storage import record_alert # type: ignore record_alert( alert_id=alert_id, name=str(name), severity="critical", summary=str(summary), source="alert_manager", silenced=bool(silenced), details=details if isinstance(details, dict) else None, ) except Exception: pass if silenced: # Emit observability event that alert was silenced (best-effort) try: emit_event( "alert_silenced", severity="info", alert_id=alert_id, name=name, silence_id=str((silence_info or {}).get("_id") or ""), until=str((silence_info or {}).get("until_ts") or ""), ) except Exception: pass else: # Telegram _dispatch("telegram", alert_id, _send_telegram, text) # Grafana annotation _dispatch("grafana", alert_id, _send_grafana_annotation, name, summary) try: emit_event( "critical_alert_dispatched", severity="anomaly", alert_id=alert_id, name=name, handled=(not silenced), ) except Exception: pass def _dispatch(sink: str, alert_id: str, fn, *args) -> None: # type: ignore[no-untyped-def] ts = datetime.now(timezone.utc).isoformat() ok = False try: fn(*args) ok = True except Exception: ok = False finally: _dispatch_log.append({"ts": ts, "alert_id": alert_id, "sink": sink, "ok": bool(ok)}) def _format_text(name: str, severity: str, summary: str, details: Dict[str, Any]) -> str: parts = [f"[{severity}] {name}"] if summary: parts.append(str(summary).strip()) # Short allowlist of useful context def _get(d: Dict[str, Any], key: str) -> Optional[str]: try: v = d.get(key) return str(v) if v is not None and v != "" else None except Exception: return None service = _get(details, "service") or _get(details, "component") environment = _get(details, "env") or _get(details, "environment") request_id = _get(details, "request_id") or _get(details, "request-id") or _get(details, "x-request-id") if service: parts.append(f"service: {service}") if environment: parts.append(f"env: {environment}") if request_id: parts.append(f"request_id: {request_id}") # Append short sanitized details preview (without fields we already promoted) if details: sensitive = {"token", "password", "secret", "authorization"} promoted = {"service", "component", "env", "environment", "request_id", "request-id", "x-request-id"} sentry_meta = {"sentry", "sentry_url", "sentry-permalink", "sentry_permalink"} def _is_allowed(k: str) -> bool: lk = k.lower() return lk not in sensitive and lk not in promoted and lk not in sentry_meta safe_items = [(k, v) for k, v in details.items() if _is_allowed(str(k))] if safe_items: parts.append( ", ".join(f"{k}={v}" for k, v in safe_items[:6]) ) # Best-effort Sentry link (permalink or derived from request_id) sentry_direct = _get(details, "sentry_permalink") or _get(details, "sentry_url") or _get(details, "sentry") link = _build_sentry_link(direct_url=sentry_direct, request_id=request_id, error_signature=_get(details, "error_signature")) if link: parts.append(f"Sentry: {link}") # Best-effort: include Quick Fix suggestion (derived from runbooks + dynamic rules). try: from services.observability_dashboard import get_quick_fix_actions # type: ignore snapshot = { "name": str(name or ""), "severity": str(severity or "").lower(), "summary": str(summary or ""), "timestamp": datetime.now(timezone.utc).isoformat(), "alert_type": str(details.get("alert_type") or ""), "metadata": details if isinstance(details, dict) else {}, } actions = get_quick_fix_actions(snapshot, ui_context="bot_alert") if actions: top = actions[0] if isinstance(actions[0], dict) else None if top: label = str(top.get("label") or "").strip() hint = "" if str(top.get("type") or "").lower() == "copy" and top.get("payload"): hint = f" → {top.get('payload')}" elif top.get("href"): hint = f" → {top.get('href')}" if label: parts.append(f"Quick Fix: {label}{hint}") except Exception: pass return "\n".join([p for p in parts if p]) def _build_sentry_link( direct_url: Optional[str] = None, request_id: Optional[str] = None, error_signature: Optional[str] = None, ) -> Optional[str]: try: if direct_url: return str(direct_url) dashboard = os.getenv("SENTRY_DASHBOARD_URL") or os.getenv("SENTRY_PROJECT_URL") if dashboard: base_url = dashboard.rstrip("/") else: dsn = os.getenv("SENTRY_DSN") or "" host = None if dsn: try: from urllib.parse import urlparse parsed = urlparse(dsn) raw_host = parsed.hostname or "" except Exception: raw_host = "" # Preserve region when present: o123.ingest.eu.sentry.io -> eu.sentry.io if ".ingest." in raw_host: try: host = raw_host.split(".ingest.", 1)[1] except Exception: host = None elif raw_host.startswith("ingest."): host = raw_host[len("ingest."):] elif raw_host == "sentry.io" or raw_host.endswith(".sentry.io"): host = "sentry.io" else: host = raw_host or None host = host or "sentry.io" org = os.getenv("SENTRY_ORG") or os.getenv("SENTRY_ORG_SLUG") if not org: return None base_url = f"https://{host}/organizations/{org}/issues" from urllib.parse import quote_plus if request_id: q = quote_plus(f'request_id:"{request_id}"') return f"{base_url}/?query={q}&statsPeriod=24h" if error_signature: q = quote_plus(f'error_signature:"{error_signature}"') return f"{base_url}/?query={q}&statsPeriod=24h" return None except Exception: return None def _send_telegram(text: str) -> None: token = os.getenv("ALERT_TELEGRAM_BOT_TOKEN") chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID") if not token or not chat_id or request is None: return api = f"https://api.telegram.org/bot{token}/sendMessage" try: from telegram_api import parse_telegram_json_from_response, require_telegram_ok resp = request('POST', api, json={"chat_id": chat_id, "text": text}, timeout=5) body = parse_telegram_json_from_response(resp, url=api) require_telegram_ok(body, url=api) except Exception as e: # do not raise try: from telegram_api import TelegramAPIError if isinstance(e, TelegramAPIError): emit_event( "telegram_api_error", severity="warn", handled=True, context="alert_manager.sendMessage", error_code=getattr(e, "error_code", None), description=getattr(e, "description", None), ) except Exception: pass pass def _send_grafana_annotation(name: str, summary: str) -> None: base = os.getenv("GRAFANA_URL") token = os.getenv("GRAFANA_API_TOKEN") if not base or not token or request is None: return url = base.rstrip("/") + "/api/annotations" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} payload = { "time": int(_now() * 1000), "tags": ["codebot", "alert", "critical"], "text": f"{name}: {summary}", } try: request('POST', url, json=payload, headers=headers, timeout=5) except Exception: # swallow errors pass
[תיעוד] def get_dispatch_log(limit: int = 50) -> List[Dict[str, Any]]: try: if limit <= 0: return [] return list(_dispatch_log)[-limit:] except Exception: return []