הראה קוד מקור ל internal_alerts

"""
Internal alerts system for CodeBot (post-MVP).

- Stores recent alerts in-memory for ChatOps consumption
- Emits structured events via observability.emit_event
- Optionally forwards alerts to sinks via alert_forwarder (Slack/Telegram)
  or directly to Telegram if alert_forwarder is unavailable.

Environment variables used by the Telegram fallback:
- ALERT_TELEGRAM_BOT_TOKEN
- ALERT_TELEGRAM_CHAT_ID

This module is intentionally lightweight and fail-open. It should never raise.
"""
from __future__ import annotations

from collections import deque
from datetime import datetime, timezone
from typing import Any, Dict, List
import os
import logging

try:
    from prometheus_client import Counter, REGISTRY
except Exception:  # pragma: no cover
    Counter = REGISTRY = None  # type: ignore

logger = logging.getLogger(__name__)

try:  # runtime optional
    from http_sync import request  # type: ignore
except Exception:  # pragma: no cover
    request = None  # type: ignore

try:  # observability event emission (optional)
    from observability import emit_event  # type: ignore
except Exception:  # pragma: no cover
[תיעוד] def emit_event(event: str, severity: str = "info", **fields): # type: ignore return None
# Optional sink forwarder (Slack/Telegram) – used when available try: from alert_forwarder import forward_alerts # type: ignore except Exception: # pragma: no cover forward_alerts = None # type: ignore _MAX = int(os.getenv("INTERNAL_ALERTS_BUFFER", "200") or 200) _ALERTS: "deque[Dict[str, Any]]" = deque(maxlen=max(10, _MAX)) def _get_prom_counter(name: str, documentation: str, labelnames: List[str]): if Counter is None: return None if REGISTRY is not None: try: existing = getattr(REGISTRY, "_names_to_collectors", {}).get(name) except Exception: existing = None if existing is not None: return existing try: return Counter(name, documentation, labelnames) except ValueError: if REGISTRY is not None: try: return getattr(REGISTRY, "_names_to_collectors", {}).get(name) except Exception: return None return None internal_alerts_total = _get_prom_counter( "internal_alerts_total", "Total internal alerts emitted", ["name", "severity"], ) _SENSITIVE_DETAIL_KEYS = {"token", "password", "secret", "authorization", "auth"} _PROMOTED_DETAIL_KEYS = { "service", "component", "app", "application", "env", "environment", "namespace", "cluster", "request_id", "request-id", "x-request-id", "x_request_id", "instance", "pod", "hostname", "host", } _SENTRY_META_KEYS = {"sentry", "sentry_url", "sentry-permalink", "sentry_permalink"} def _is_empty_promoted_value(value: Any) -> bool: """Return True when a detail value is "empty" and should NOT be promoted. Observability dashboard consumers treat top-level alert fields as "high-signal". We intentionally avoid promoting empty meta fields (None/""/[]/{}/False) since they create noisy rows like error_signature/is_new_error that hide real content. """ if value is None: return True # Important: do not promote False (especially is_new_error=False) if value is False: return True if isinstance(value, str) and not value.strip(): return True if isinstance(value, dict) and not value: return True if isinstance(value, list) and not value: return True return False def _coerce_str(value: Any) -> str: try: return str(value) except Exception: return "" def _first_detail(details: Dict[str, Any], keys) -> Any: for key in keys: try: if key in details: val = details.get(key) if val not in (None, ""): return _coerce_str(val) except Exception: continue return None def _details_preview(details: Dict[str, Any]) -> str | None: if not isinstance(details, dict): return None safe_items: List[str] = [] def _short(val: Any, *, limit: int = 180) -> str: try: text = _coerce_str(val) except Exception: text = "" if len(text) > limit: return text[: max(0, limit - 1)] + "…" return text def _compact_labels(val: Any) -> str | None: if not isinstance(val, dict): return None # Keep only high-signal fields used by anomaly alerts keys = ("top_slow_endpoint", "active_requests", "recent_errors_5m", "avg_memory_usage_mb") parts: List[str] = [] for k in keys: try: v = val.get(k) except Exception: v = None if v in (None, ""): continue parts.append(f"{k}={_short(v, limit=80)}") if parts: return ", ".join(parts) return None def _compact_slow_endpoints(val: Any) -> str | None: if not isinstance(val, list) or not val: return None # Prefer max_duration and sort slowest-first for stable messaging items: List[Dict[str, Any]] = [x for x in val if isinstance(x, dict)] try: items.sort(key=lambda x: float(x.get("max_duration") or x.get("avg_duration") or 0.0), reverse=True) except Exception: pass parts: List[str] = [] for item in items[:4]: if not isinstance(item, dict): continue try: method = str(item.get("method") or "GET").upper() endpoint = str(item.get("endpoint") or "unknown") count = int(item.get("count") or 0) mx = float(item.get("max_duration") or item.get("avg_duration") or 0.0) parts.append(f"{method} {endpoint}: {mx:.2f}s (n={max(1, count)})") except Exception: continue if parts: return "; ".join(parts) return None for key, value in details.items(): try: lk = str(key).lower() except Exception: continue if lk in _SENSITIVE_DETAIL_KEYS or lk in _PROMOTED_DETAIL_KEYS or lk in _SENTRY_META_KEYS: continue # Special compaction for noisy structured fields if lk == "labels": compact = _compact_labels(value) if compact: safe_items.append(compact) if len(safe_items) >= 6: break # Never fall back to raw labels dict in preview (too noisy) continue if lk == "slow_endpoints": compact = _compact_slow_endpoints(value) if compact: safe_items.append(f"slow_endpoints={compact}") if len(safe_items) >= 6: break # Never fall back to raw slow_endpoints list in preview (too noisy) continue safe_items.append(f"{key}={_coerce_str(value)}") if len(safe_items) >= 6: break if safe_items: return ", ".join(safe_items) return None def _build_forward_payload(name: str, severity: str, summary: str, details: Dict[str, Any]) -> Dict[str, Any]: alert = { "status": "firing", "labels": {"alertname": str(name or "InternalAlert"), "severity": str(severity or "warn")}, "annotations": {"summary": str(summary or "")}, } if not isinstance(details, dict) or not details: return alert service = _first_detail(details, ("service", "component", "app", "application")) environment = _first_detail(details, ("env", "environment", "namespace", "cluster")) instance = _first_detail(details, ("instance", "pod", "hostname", "host")) request_id = _first_detail(details, ("request_id", "request-id", "x-request-id", "x_request_id")) if service: alert["labels"]["service"] = service if environment: alert["labels"]["env"] = environment if instance: alert["labels"]["instance"] = instance if request_id: alert["labels"]["request_id"] = request_id generator_url = _first_detail(details, ("generator_url", "grafana_url", "dashboard_url")) if generator_url: alert["generatorURL"] = generator_url sentry_direct = _first_detail(details, ("sentry_permalink", "sentry-permalink", "sentry_url", "sentry")) if sentry_direct: alert["annotations"]["sentry_permalink"] = sentry_direct error_signature = _first_detail(details, ("error_signature", "signature")) if error_signature: alert["annotations"]["error_signature"] = error_signature # Promote a small allowlist of numeric/meta fields into annotations for better formatting downstream. # Keep values as strings to match Alertmanager annotations behavior. try: promote_keys = ( # Generic metric fields (alert_manager / internal) "current_percent", "threshold_percent", "sample_count", "error_count", "window_seconds", "current_seconds", "threshold_seconds", "source", "metric", "graph_metric", # Sentry meta (webhook/poll) "sentry_short_id", "sentry_issue_id", "sentry_last_seen", "sentry_first_seen", "project", "level", "action", ) for k in promote_keys: if k not in details: continue try: v = details.get(k) except Exception: v = None if v in (None, ""): continue alert["annotations"][k] = _coerce_str(v) except Exception: pass # Anomaly formatting helpers: flatten nested labels + compact slow endpoints list. try: if str(name or "").strip().lower() == "anomaly_detected": raw_labels = details.get("labels") if isinstance(details, dict) else None if isinstance(raw_labels, dict): for k in ("top_slow_endpoint", "active_requests", "recent_errors_5m", "avg_memory_usage_mb"): try: v = raw_labels.get(k) except Exception: v = None if v in (None, ""): continue alert["annotations"][k] = _coerce_str(v) raw_slow = details.get("slow_endpoints") if isinstance(details, dict) else None if isinstance(raw_slow, list) and raw_slow: items: List[Dict[str, Any]] = [x for x in raw_slow if isinstance(x, dict)] try: items.sort(key=lambda x: float(x.get("max_duration") or x.get("avg_duration") or 0.0), reverse=True) except Exception: pass compact_parts: List[str] = [] for item in items[:6]: if not isinstance(item, dict): continue try: method = str(item.get("method") or "GET").upper() endpoint = str(item.get("endpoint") or "unknown") count = int(item.get("count") or 0) mx = float(item.get("max_duration") or item.get("avg_duration") or 0.0) compact_parts.append(f"{method} {endpoint}: {mx:.2f}s (n={max(1, count)})") except Exception: continue if compact_parts: alert["annotations"]["slow_endpoints_compact"] = "; ".join(compact_parts) except Exception: pass preview = _details_preview(details) if preview: alert["annotations"]["details_preview"] = preview return alert def _severity_rank(value: str | None) -> int: try: v = (value or "").strip().lower() if v in {"critical", "fatal", "crit"}: return 4 if v in {"error", "err", "errors"}: return 3 if v in {"warning", "warn"}: return 2 if v in {"info", "notice", "anomaly"}: return 1 if v in {"debug", "trace"}: return 0 except Exception: return 1 return 1 def _min_direct_telegram_rank() -> int: raw = os.getenv("ALERT_TELEGRAM_MIN_SEVERITY", "info") return _severity_rank(raw) def _should_send_direct_telegram(severity: str) -> bool: try: return _severity_rank(severity) >= _min_direct_telegram_rank() except Exception: return True def _format_text(name: str, severity: str, summary: str, details: Dict[str, Any]) -> str: parts = [f"[{severity.upper()}] {name}"] if summary: parts.append(str(summary)) if details: # Keep it short, avoid leaking sensitive data safe = {k: v for k, v in details.items() if k.lower() not in {"token", "password", "secret", "authorization"}} if safe: parts.append("details=" + ", ".join(f"{k}={v}" for k, v in list(safe.items())[:6])) # Add Sentry Link if configured sentry_url = os.getenv("SENTRY_DASHBOARD_URL") sentry_org = os.getenv("SENTRY_ORG") if sentry_url: parts.append(f"\n[פתח ב-Sentry]({sentry_url})") elif sentry_org and os.getenv("SENTRY_DSN"): # Construct search link import urllib.parse query = urllib.parse.quote(f"is:unresolved {name}") link = f"https://{sentry_org}.sentry.io/issues/?query={query}" parts.append(f"\n[פתח ב-Sentry]({link})") return "\n".join(parts) def _send_telegram(text: str, severity: str = "info") -> None: if not _should_send_direct_telegram(severity): return token = os.getenv("ALERT_TELEGRAM_BOT_TOKEN") chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID") if not token or not chat_id or request is None: return try: api = f"https://api.telegram.org/bot{token}/sendMessage" payload = {"chat_id": chat_id, "text": text} from telegram_api import parse_telegram_json_from_response, require_telegram_ok resp = request('POST', api, json=payload, timeout=5) body = parse_telegram_json_from_response(resp, url=api) require_telegram_ok(body, url=api) except Exception as e: # Don't raise try: from telegram_api import TelegramAPIError if isinstance(e, TelegramAPIError): emit_event( "telegram_api_error", severity="warn", handled=True, context="internal_alerts.sendMessage", error_code=getattr(e, "error_code", None), description=getattr(e, "description", None), ) except Exception: pass return
[תיעוד] def emit_internal_alert(name: str, severity: str = "info", summary: str = "", **details: Any) -> None: """Emit an internal alert: store in-memory, forward to sinks, and log. severity: "info" | "warn" | "error" | "critical" | "anomaly" """ try: # תמיכה לאחור: יש קריאות שמעבירות details=dict(...) במקום kwargs (למשל tests) details_payload: Dict[str, Any] = {} try: if isinstance(details, dict) and "details" in details and isinstance(details.get("details"), dict): nested = details.get("details") or {} if len(details) == 1: details_payload = dict(nested) else: # מיזוג: nested details + שאר ה-kwargs (שיישארו כ-"details" אמיתיים) details_payload = dict(nested) for k, v in details.items(): if k == "details": continue details_payload[k] = v else: details_payload = dict(details or {}) except Exception: details_payload = dict(details or {}) def _is_drill(d: Any) -> bool: if not isinstance(d, dict): return False try: if bool(d.get("is_drill")): return True except Exception: pass try: meta = d.get("metadata") if isinstance(meta, dict) and bool(meta.get("is_drill")): return True except Exception: pass return False rec = { "ts": datetime.now(timezone.utc).isoformat(), "name": str(name), "severity": str(severity), "summary": str(summary), } if details_payload: def _coerce_detail_value(v: Any) -> Any: # חשוב: לא להפוך dict/list למחרוזות — זה שובר את תצוגת ה-Observability. if v is None or isinstance(v, (dict, list, int, float, bool)): return v return str(v) rec["details"] = {k: _coerce_detail_value(v) for k, v in details_payload.items()} _ALERTS.append(rec) is_drill = _is_drill(details_payload) if internal_alerts_total is not None: try: internal_alerts_total.labels(str(name or "InternalAlert"), str(severity or "info")).inc() except Exception: pass # Emit structured log/event as well # חשוב: אל תשלח את חומרת ההתראה כ-severity של emit_event, # אחרת observability.emit_event יתעד זאת כ-error/critical ב-Sentry וייצור Issues "internal_alert". # במקום זה, משדרים תמיד כאנומליה (לצורך תיעוד/דאשבורד) ושומרים את החומרה המקורית בשדה נפרד. emit_event( "internal_alert", severity="anomaly", alert_severity=str(severity), name=str(name), summary=str(summary), is_drill=bool(is_drill), handled=True if is_drill else None, ) # 🔧 הערכת כללים ויזואליים לפני שליחה try: from services.rules_evaluator import evaluate_alert_rules, execute_matched_actions # חשוב: `details` חייב לשמור על המטען המקורי כפי שנבנה בתחילת הפונקציה. # לא מחליפים את `alert_payload["details"]` במילון אחר (למשל מטא של Sentry), # ורק לאחר יצירת ה-payload מבצעים "קידום" שדות לרמה העליונה. safe_details_payload: Dict[str, Any] = dict(details_payload or {}) alert_payload = { "name": str(name), "rule_name": str(name), "severity": str(severity), "summary": str(summary), "details": safe_details_payload, # ברירת מחדל, אבל מאפשרים למקור שהגיע ב-details לשלוט (למשל "sentry_poll") "source": _coerce_str(safe_details_payload.get("source")) or "internal_alerts", "silenced": False, } # חשוב: מנוע הכללים/לוגים מצפים לעיתים לשדות בטופ-לבל (למשל alert_type), # לכן אנחנו "מקדמים" את פרטי ה-alert גם לרמה העליונה – בלי לדרוס שדות ליבה. try: for k, v in safe_details_payload.items(): if _is_empty_promoted_value(v): continue # Never allow a nested "details" key to overwrite the original details payload. # This was causing the original details to disappear (and only Sentry fields remained). if k == "details": continue # Allow details to override specific routing keys at top-level (e.g. source). if k in alert_payload and k not in {"source"}: continue # Avoid overriding even routing keys with empty values. if k == "source" and _is_empty_promoted_value(v): continue alert_payload[k] = v except Exception: pass evaluation = evaluate_alert_rules(alert_payload) if evaluation: execute_matched_actions(evaluation) # אם הכלל דרש suppress, לא נשלח if alert_payload.get("silenced"): try: logger.info( "Alert silenced by rule: %s", alert_payload.get("silenced_by_rule"), ) except Exception: pass return # דלג על שליחה except Exception as e: try: logger.warning("Rules evaluation failed: %s", e) except Exception: pass # Note: Do not persist here to avoid double counting with alert_manager. # For critical alerts – use alert_manager for Telegram + Grafana annotations with dispatch log if str(severity).lower() == "critical": # Safety switch: תרגול לא יוצא לסינקים קריטיים (טלגרם/גרפנה/אוטומציות) if is_drill: try: from monitoring.alerts_storage import record_alert # type: ignore record_alert( alert_id=None, name=str(name), severity="critical", summary=str(summary), source="internal_alerts", details=details_payload if isinstance(details_payload, dict) else None, ) except Exception: pass return try: from alert_manager import forward_critical_alert # type: ignore forward_critical_alert(name=str(name), summary=str(summary), **(details_payload or {})) except Exception: # Fallback to Telegram only try: _send_telegram(_format_text(name, severity, summary, details_payload), severity=str(severity)) except Exception: pass # Best-effort: persist critical alert when fallback path used try: from monitoring.alerts_storage import record_alert # type: ignore record_alert( alert_id=None, name=str(name), severity="critical", summary=str(summary), source="internal_alerts", details=details_payload if isinstance(details_payload, dict) else None, ) except Exception: pass else: # Prefer alert_forwarder when available; otherwise Telegram fallback # Safety switch: תרגול לא יוצא לסינקים if is_drill: try: from monitoring.alerts_storage import record_alert # type: ignore record_alert( alert_id=None, name=str(name), severity=str(severity), summary=str(summary), source="internal_alerts", details=details_payload if isinstance(details_payload, dict) else None, ) except Exception: pass return try: if forward_alerts is not None: alert = _build_forward_payload(name, severity, summary, details_payload) forward_alerts([alert]) else: _send_telegram(_format_text(name, severity, summary, details_payload), severity=str(severity)) except Exception: # Never break on sinks; try fallback Telegram try: _send_telegram(_format_text(name, severity, summary, details_payload), severity=str(severity)) except Exception: pass # Best-effort: persist non-critical alert (single write) try: from monitoring.alerts_storage import record_alert # type: ignore record_alert( alert_id=None, name=str(name), severity=str(severity), summary=str(summary), source="internal_alerts", details=details_payload if isinstance(details_payload, dict) else None, ) except Exception: pass except Exception: return
[תיעוד] def get_recent_alerts(limit: int = 5) -> List[Dict[str, Any]]: try: if limit <= 0: return [] return list(_ALERTS)[-limit:] except Exception: return []