הראה קוד מקור ל alert_forwarder

"""
Forward incoming Alertmanager alerts to Slack and/or Telegram.

Environment variables used:
- SLACK_WEBHOOK_URL: Incoming webhook URL for Slack (optional)
- ALERT_TELEGRAM_BOT_TOKEN: Bot token for Telegram (optional)
- ALERT_TELEGRAM_CHAT_ID: Chat ID (or channel) to send alerts to (optional)
- ALERT_TELEGRAM_SUPPRESS_ALERTS: Comma-separated alert names to skip on Telegram (optional)

If none are configured, alerts will still be emitted as structured events.
"""
from __future__ import annotations

import math
import os
import re
from dataclasses import dataclass
from threading import Lock, Timer
from time import monotonic
from typing import Any, Dict, List, Optional

# Graceful degradation for HTTP client: prefer pooled http_sync for retries/backoff,
# fallback to plain requests when pooler is unavailable.
try:  # pragma: no cover
    from http_sync import request as _pooled_request  # type: ignore
except Exception:  # pragma: no cover
    _pooled_request = None  # type: ignore
try:  # pragma: no cover
    import requests as _requests  # type: ignore
except Exception:  # pragma: no cover
    _requests = None  # type: ignore

try:
    from observability import emit_event  # type: ignore
except Exception:  # pragma: no cover
[תיעוד] def emit_event(event: str, severity: str = "info", **fields): # type: ignore return None
_PUBLIC_WEBAPP_URL_DEFAULT = "https://code-keeper-webapp.onrender.com" _DASHBOARD_PATH = "/admin/observability" _TELEGRAM_DASHBOARD_BUTTON_TEXT = "Open Dashboard" # Startup grace period: suppress noisy performance alerts right after deploy/cold start. _MODULE_START_MONOTONIC = monotonic() _DEFAULT_STARTUP_GRACE_PERIOD_SECONDS = 1200.0 # 20 minutes try: _STARTUP_GRACE_PERIOD_SECONDS = float( os.getenv( "ALERT_STARTUP_GRACE_PERIOD_SECONDS", str(int(_DEFAULT_STARTUP_GRACE_PERIOD_SECONDS)), ) or _DEFAULT_STARTUP_GRACE_PERIOD_SECONDS ) except Exception: _STARTUP_GRACE_PERIOD_SECONDS = float(_DEFAULT_STARTUP_GRACE_PERIOD_SECONDS) _STARTUP_GRACE_PERIOD_SECONDS = max(0.0, float(_STARTUP_GRACE_PERIOD_SECONDS)) _IGNORED_ON_STARTUP = { # Alerts that commonly flap during cold start / initial DB connection warmup "AppLatencyEWMARegression", "MongoLatencyPredictiveSpike", "SlowMongoConnection", "HighLatency", } _DEFAULT_TELEGRAM_SUPPRESS_ALERTS = { # Noisy anomaly alert: keep in Slack, suppress in Telegram by default. "AppLatencyEWMARegression", } _ANOMALY_WINDOW_SECONDS = max( 0.0, float(os.getenv("ALERT_ANOMALY_BATCH_WINDOW_SECONDS", "180") or 180), ) _ANOMALY_LOCK: Lock = Lock() _DIGIT_RE = re.compile(r"\d+") def _normalize_alert_name(value: Any) -> str: try: return str(value or "").strip().lower() except Exception: return "" def _parse_alert_list(raw: Optional[str], *, default: Optional[set[str]] = None) -> set[str]: if raw is None: items = {_normalize_alert_name(item) for item in (default or set())} return {item for item in items if item} try: value = str(raw or "") except Exception: value = "" value = value.strip() if not value: return set() parts = value.replace("\n", ",").replace(";", ",").split(",") items: set[str] = set() for part in parts: name = part.strip() if name: normalized = _normalize_alert_name(name) if normalized: items.add(normalized) return items _TELEGRAM_SUPPRESS_ALERTS = _parse_alert_list( os.getenv("ALERT_TELEGRAM_SUPPRESS_ALERTS"), default=_DEFAULT_TELEGRAM_SUPPRESS_ALERTS, ) def _is_telegram_suppressed(alert_name: str) -> bool: name = _normalize_alert_name(alert_name) return bool(name and name in _TELEGRAM_SUPPRESS_ALERTS) @dataclass class _AnomalyBatch: key: str representative_alert: Dict[str, Any] count: int started_at: float window_seconds: float timer: Timer _ANOMALY_BATCHES: Dict[str, _AnomalyBatch] = {} def _safe_int(value: Any, default: int = 0) -> int: try: return int(float(str(value).strip())) except Exception: return int(default) def _safe_float(value: Any, default: float = 0.0) -> float: try: return float(str(value).strip().replace("%", "").replace("s", "")) except Exception: return float(default) def _build_public_dashboard_url() -> str: """Build a public URL to the observability dashboard (never internal generatorURL).""" try: public_url = str(os.getenv("PUBLIC_URL", _PUBLIC_WEBAPP_URL_DEFAULT) or "").strip() except Exception: public_url = "" if not public_url: public_url = _PUBLIC_WEBAPP_URL_DEFAULT public_url = public_url.rstrip("/") return f"{public_url}{_DASHBOARD_PATH}" def _fmt_minutes(window_seconds: Any) -> str: sec = _safe_int(window_seconds, 0) if sec <= 0: return "—" minutes = max(1, int(math.ceil(sec / 60.0))) return f"{minutes} minutes" def _clean_sentry_noise(text: str) -> str: """Remove very noisy Sentry/driver fragments for chat notifications.""" try: t = str(text or "") except Exception: return "" # Drop TopologyDescription tail (very long) without regex (avoid ReDoS warnings) try: lowered = t.lower() marker = "topology description:" idx = lowered.find(marker) if idx >= 0: t = t[:idx].rstrip(" ,") except Exception: pass # Trim excessive whitespace without regex try: t = " ".join(t.split()).strip() except Exception: pass # Hard cap to avoid huge messages if len(t) > 260: t = t[:259] + "…" return t def _extract_host_from_text(text: str) -> Optional[str]: # Avoid regex: scan for ".mongodb.net" suffix and expand token leftwards. try: s = str(text or "") except Exception: return None lowered = s.lower() suffix = ".mongodb.net" idx = lowered.find(suffix) if idx < 0: return None end = idx + len(suffix) # Expand left until non-host character allowed = set("abcdefghijklmnopqrstuvwxyz0123456789-._") i = idx - 1 while i >= 0: ch = lowered[i] if ch in allowed: i -= 1 continue break start = i + 1 host = s[start:end].strip() return host or None def _extract_connect_timeout_ms(text: str) -> Optional[int]: # Avoid regex: locate "connectTimeoutMS:" and parse number until "ms" try: s = str(text or "") except Exception: return None lowered = s.lower() key = "connecttimeoutms:" idx = lowered.find(key) if idx < 0: return None j = idx + len(key) # Skip whitespace while j < len(s) and s[j].isspace(): j += 1 # Read number (digits + dot) k = j while k < len(s) and (s[k].isdigit() or s[k] == "."): k += 1 num = s[j:k].strip() if not num: return None try: return int(float(num)) except Exception: return None def _parse_avg_threshold_from_summary(summary: str) -> tuple[Optional[float], Optional[float]]: """Parse `avg_rt=3.737s (threshold 3.000s)` style summaries.""" try: s = str(summary or "") except Exception: return None, None def _extract_float_after(label: str) -> Optional[float]: try: low = s.lower() idx = low.find(label.lower()) if idx < 0: return None j = idx + len(label) while j < len(s) and s[j].isspace(): j += 1 if j < len(s) and s[j] == "=": j += 1 while j < len(s) and s[j].isspace(): j += 1 k = j while k < len(s) and (s[k].isdigit() or s[k] == "."): k += 1 num = s[j:k].strip() return float(num) if num else None except Exception: return None avg = _extract_float_after("avg_rt") thr = _extract_float_after("threshold") return avg, thr def _parse_percent_pair_from_summary(summary: str) -> tuple[Optional[float], Optional[float]]: """Parse `...=60.00% > threshold=20.00%` style summaries.""" try: s = str(summary or "") except Exception: return None, None def _extract_percent_after(label: str) -> Optional[float]: try: low = s.lower() idx = low.find(label.lower()) if idx < 0: return None j = idx + len(label) while j < len(s) and s[j].isspace(): j += 1 if j < len(s) and s[j] == "=": j += 1 while j < len(s) and s[j].isspace(): j += 1 k = j while k < len(s) and (s[k].isdigit() or s[k] == "."): k += 1 num = s[j:k].strip() # Require a following '%' to reduce false positives if not num: return None if "%" not in s[k : k + 3]: # tolerate "60.0%," or "60.0% " pass return float(num) except Exception: return None # Current is typically the first "...=NN%" token; use the first '=' occurrence. cur = None try: eq = s.find("=") if eq >= 0: tmp = s[eq + 1 :] k = 0 while k < len(tmp) and tmp[k].isspace(): k += 1 j = k while j < len(tmp) and (tmp[j].isdigit() or tmp[j] == "."): j += 1 num = tmp[k:j].strip() if num: cur = float(num) except Exception: cur = None thr = _extract_percent_after("threshold") return cur, thr def _format_external_service_degraded( *, service: Optional[str], current_percent: Optional[float], threshold_percent: Optional[float], sample_count: int, error_count: int, window_seconds: int, ) -> str: svc = service or "Unknown" cur = f"{current_percent:.0f}%" if current_percent is not None else "—" thr = f"{threshold_percent:.0f}%" if threshold_percent is not None else "—" window = _fmt_minutes(window_seconds) return "\n".join( [ "⚠️ External Service Degraded", f"Service: {svc}", f"📉 Error Rate: {cur} (Threshold: {thr})", "📊 Stats:", f"• Errors: {error_count} / {max(0, sample_count)} requests", f"• Window: {window}", "🛑 Action Required: Check external provider status page.", ] ) def _parse_slow_endpoints_compact(value: Optional[str]) -> List[Dict[str, Any]]: if not value: return [] try: raw = str(value) except Exception: return [] entries: List[Dict[str, Any]] = [] for chunk in [c.strip() for c in raw.split(";") if c.strip()]: # Format from internal_alerts: "METHOD endpoint: 10.53s (n=1)" (avoid regex) try: parts = chunk.split(None, 1) if len(parts) != 2: continue method = str(parts[0] or "").strip().upper() rest = str(parts[1] or "").strip() if not method or not rest: continue if ": " not in rest: continue endpoint_part, tail = rest.rsplit(": ", 1) endpoint = endpoint_part.strip() if not endpoint: continue # tail: "10.53s (n=1)" if "s" not in tail: continue sec_text = tail.split("s", 1)[0].strip() seconds = float(sec_text) count = 1 n_idx = tail.find("(n=") if n_idx >= 0: close = tail.find(")", n_idx) if close > n_idx: n_text = tail[n_idx + 3 : close].strip() if n_text.isdigit(): count = int(n_text) entries.append({"method": method, "endpoint": endpoint, "seconds": seconds, "count": count}) except Exception: continue return entries def _format_anomaly_alert( *, avg_rt: Optional[float], threshold: Optional[float], top_slow_endpoint: Optional[str], slow_endpoints_compact: Optional[str], active_requests: Optional[str], memory_mb: Optional[str], recent_errors_5m: Optional[str], ) -> str: avg_s = f"{avg_rt:.2f}s" if avg_rt is not None else "—" thr_s = f"{threshold:.2f}s" if threshold is not None else "—" entries = _parse_slow_endpoints_compact(slow_endpoints_compact) main = entries[0] if entries else None others = entries[1:4] if len(entries) > 1 else [] # Fallback: parse "GET index (10.525s)" when compact list is missing if main is None and top_slow_endpoint: try: raw = str(top_slow_endpoint or "").strip() if raw: parts = raw.split(None, 1) if len(parts) == 2: method = parts[0].strip().upper() rest = parts[1].strip() lpar = rest.rfind("(") rpar = rest.rfind(")") if lpar >= 0 and rpar > lpar: endpoint = rest[:lpar].strip() dur_raw = rest[lpar + 1 : rpar].strip() if dur_raw.endswith("s"): dur_raw = dur_raw[:-1].strip() seconds = float(dur_raw) main = {"method": method, "endpoint": endpoint, "seconds": seconds, "count": 1} except Exception: main = None health_lines: List[str] = [] if active_requests is not None: health_lines.append(f"• Active Req: {active_requests}") if memory_mb is not None: try: mem = float(str(memory_mb).strip()) health_lines.append(f"• Memory: {mem:.0f} MB") except Exception: health_lines.append(f"• Memory: {memory_mb} MB") if recent_errors_5m is not None: health_lines.append(f"• Errors (5m): {recent_errors_5m}") lines = [ "🐢 System Anomaly Detected", f"Avg Response: {avg_s} (Threshold: {thr_s})", ] if main: lines.extend( [ "🐌 Main Bottleneck:", f"{main.get('method')} {main.get('endpoint')}", f"⏱️ {float(main.get('seconds') or 0.0):.2f}s", ] ) if others: lines.append("📉 Also Slow in this Window:") for item in others: try: lines.append(f"• {item.get('method')} {item.get('endpoint')}: {float(item.get('seconds') or 0.0):.2f}s") except Exception: continue if health_lines: lines.append("📊 Resource Usage:") lines.extend(health_lines) return "\n".join(lines) def _format_sentry_issue( *, short_id: Optional[str], summary: str, last_seen: Optional[str], sentry_link: Optional[str], ) -> str: raw = str(summary or "") cleaned = _clean_sentry_noise(raw) sid = short_id or "—" lowered = raw.lower() # Heuristics for clearer titles if "_operationcancelled" in lowered or "operation cancelled" in lowered: title = "🧹 DB Pool Maintenance Warning" body = "Background cleanup task failed due to network instability." host = _extract_host_from_text(raw) if host: body += f"\nTarget: {host}" text = "\n".join([title, f"Source: Sentry ({sid})", f"Error: Operation Cancelled", body]) elif "ssl handshake failed" in lowered: title = "🔐 SSL Handshake Failed" host = _extract_host_from_text(raw) or "Unknown Host" timeout_ms = _extract_connect_timeout_ms(raw) timeout_line = f"⏱️ Timeout: {timeout_ms}ms ({(timeout_ms/1000.0):.0f}s)" if timeout_ms else "⏱️ Timeout: —" text = "\n".join( [ title, f"Source: Sentry ({sid})", f"Target: {host}", timeout_line, _clean_sentry_noise(raw), ] ) elif ("replicasetnoprimary" in lowered) or ("primary()" in lowered) or ("no replica set members match selector" in lowered): title = "🚨 Critical Database Error" text = "\n".join( [ title, f"Source: Sentry ({sid})", "💀 Issue: MongoDB Connection Failed", cleaned or "—", ] ) else: title = "🚨 Sentry Alert" text = "\n".join([title, f"Source: Sentry ({sid})", cleaned or "—"]) if last_seen: text += f"\n📅 Last Seen: {last_seen}" if sentry_link: text += f"\n🔗 Action: {sentry_link}" return text def _format_alert_text(alert: Dict[str, Any], *, include_dashboard_link: bool = True) -> str: labels = alert.get("labels", {}) or {} annotations = alert.get("annotations", {}) or {} status = str(alert.get("status") or "firing") severity = str(labels.get("severity") or labels.get("level") or "info").upper() name = labels.get("alertname") or labels.get("name") or "Alert" # Short summary/description summary = annotations.get("summary") or annotations.get("description") or "" dashboard_line = f"📊 Dashboard: {_build_public_dashboard_url()}" def _with_dashboard(text: str) -> str: if include_dashboard_link: return f"{text}\n{dashboard_line}" return text # Useful context (allowlist of common labels) def _first(keys: List[str]) -> Optional[str]: for k in keys: v = labels.get(k) if v: return str(v) v = annotations.get(k) if v: return str(v) return None service = _first(["service", "app", "application", "job", "component"]) environment = _first(["env", "environment", "namespace", "cluster"]) # loose mapping instance = _first(["instance", "pod", "hostname"]) # k8s or host request_id = _first(["request_id", "request-id", "x-request-id", "x_request_id"]) # if present # --- Specialized templates for high-signal alerts --- try: if str(name or "").strip() == "External Service Degraded": cur, thr = _parse_percent_pair_from_summary(str(summary or "")) # Prefer explicit annotations from internal_alerts; fallback to details_preview parsing cur = _safe_float(_first(["current_percent"])) if _first(["current_percent"]) else cur thr = _safe_float(_first(["threshold_percent"])) if _first(["threshold_percent"]) else thr sample_count = _safe_int(_first(["sample_count"]), 0) error_count = _safe_int(_first(["error_count"]), 0) window_seconds = _safe_int(_first(["window_seconds"]), 0) msg = _format_external_service_degraded( service=service, current_percent=cur, threshold_percent=thr, sample_count=sample_count, error_count=error_count, window_seconds=window_seconds, ) sentry_link = _build_sentry_link( direct_url=_first(["sentry_permalink", "sentry_url", "sentry"]), request_id=request_id, error_signature=_first(["error_signature", "signature"]), ) if sentry_link: msg += f"\nSentry: {sentry_link}" return _with_dashboard(msg) if str(name or "").strip().lower() == "anomaly_detected": avg, thr = _parse_avg_threshold_from_summary(str(summary or "")) msg = _format_anomaly_alert( avg_rt=avg, threshold=thr, top_slow_endpoint=_first(["top_slow_endpoint"]), slow_endpoints_compact=_first(["slow_endpoints_compact"]), active_requests=_first(["active_requests"]), memory_mb=_first(["avg_memory_usage_mb"]), recent_errors_5m=_first(["recent_errors_5m"]), ) return _with_dashboard(msg) # Only treat as a Sentry-origin alert when the alert itself is named as such. # Regular alerts may still include a Sentry permalink for convenience. if str(name or "").strip().lower().startswith("sentry:"): msg = _format_sentry_issue( short_id=_first(["sentry_short_id"]) or (str(name).split(":", 1)[-1].strip() if ":" in str(name) else None), summary=str(summary or ""), last_seen=_first(["sentry_last_seen"]), sentry_link=_build_sentry_link( direct_url=_first(["sentry_permalink", "sentry_url", "sentry"]), request_id=request_id, error_signature=_first(["error_signature", "signature"]), ), ) return _with_dashboard(msg) except Exception: # If specialized formatting fails, fall back to generic. pass parts = [f"🔔 {name} ({severity})"] if summary: parts.append(str(summary)) if service: parts.append(f"service: {service}") if environment: parts.append(f"env: {environment}") if instance: parts.append(f"instance: {instance}") if request_id: parts.append(f"request_id: {request_id}") detail_preview = annotations.get("details_preview") or annotations.get("details") if detail_preview: parts.append(str(detail_preview)) # Best-effort: add Sentry link (issue/events search) when configured sentry_link = _build_sentry_link( direct_url=_first(["sentry_permalink", "sentry_url", "sentry"]), request_id=request_id, error_signature=_first(["error_signature", "signature"]), ) if sentry_link: parts.append(f"Sentry: {sentry_link}") # Always prefer a stable public dashboard link over alertmanager's generatorURL if include_dashboard_link: parts.append(dashboard_line) return "\n".join(parts) def _build_sentry_link( direct_url: Optional[str] = None, request_id: Optional[str] = None, error_signature: Optional[str] = None, ) -> Optional[str]: """Build a Sentry UI link. Priority: 1) If a direct permalink is provided – return it. 2) If request_id is present – build an events/issues query for that request_id. 3) Else, if error_signature is present – build a query by signature. Returns None when Sentry env is not configured. """ try: # 1) Direct URL from alert annotations/labels if direct_url: return str(direct_url) # Derive UI base from explicit dashboard URL or DSN + ORG dashboard = os.getenv("SENTRY_DASHBOARD_URL") or os.getenv("SENTRY_PROJECT_URL") if dashboard: base_url = dashboard.rstrip("/") # Ensure we point to org scope when given a project URL # We'll still append ?query=... which both endpoints accept. else: dsn = os.getenv("SENTRY_DSN") or "" host = None if dsn: try: from urllib.parse import urlparse parsed = urlparse(dsn) raw_host = parsed.hostname or "" except Exception: raw_host = "" # Preserve regional subdomain when present, e.g. o123.ingest.eu.sentry.io -> eu.sentry.io if ".ingest." in raw_host: try: host = raw_host.split(".ingest.", 1)[1] except Exception: host = None elif raw_host.startswith("ingest."): host = raw_host[len("ingest."):] elif raw_host == "sentry.io" or raw_host.endswith(".sentry.io"): host = "sentry.io" else: host = raw_host or None host = host or "sentry.io" org = os.getenv("SENTRY_ORG") or os.getenv("SENTRY_ORG_SLUG") if not org: return None base_url = f"https://{host}/organizations/{org}/issues" from urllib.parse import quote_plus if request_id: q = quote_plus(f'request_id:"{request_id}"') return f"{base_url}/?query={q}&statsPeriod=24h" if error_signature: q = quote_plus(f'error_signature:"{error_signature}"') return f"{base_url}/?query={q}&statsPeriod=24h" return None except Exception: return None def _format_duration_label(seconds: float) -> str: try: seconds = float(seconds) except Exception: seconds = 60.0 minutes = max(1, int(math.ceil(seconds / 60.0))) return f"{minutes} דקות" def _format_anomaly_batch_text( alert: Dict[str, Any], count: int, duration_seconds: float, *, include_dashboard_link: bool, ) -> str: base = _format_alert_text(alert, include_dashboard_link=include_dashboard_link) duration = _format_duration_label(duration_seconds) return f"{base}\n{count} מופעים ב-{duration}" def _anomaly_bucket_key(alert: Dict[str, Any]) -> str: labels = alert.get("labels", {}) or {} annotations = alert.get("annotations", {}) or {} def _pick(sources: List[Dict[str, Any]], keys: List[str]) -> str: for key in keys: for src in sources: try: val = src.get(key) except Exception: continue if val not in (None, ""): return str(val).strip().lower() return "" alert_name = _pick([labels], ["alertname", "name"]) or "anomaly" service = _pick([labels], ["service", "app", "application", "job", "component"]) environment = _pick([labels], ["env", "environment", "namespace", "cluster"]) signature = _pick([annotations, labels], ["error_signature", "signature"]) if not signature: summary = _pick([annotations], ["summary", "description"]) if summary: signature = _DIGIT_RE.sub("0", summary.lower())[:80] parts = [alert_name] if service: parts.append(service) if environment: parts.append(environment) if signature: parts.append(signature) return "|".join(parts) def _queue_anomaly_alert(alert: Dict[str, Any]) -> bool: if _ANOMALY_WINDOW_SECONDS <= 0: return False key = _anomaly_bucket_key(alert) timer: Timer | None = None with _ANOMALY_LOCK: batch = _ANOMALY_BATCHES.get(key) if batch: batch.count += 1 batch.representative_alert = alert return True timer = Timer(_ANOMALY_WINDOW_SECONDS, _flush_anomaly_batch, args=(key,)) timer.daemon = True _ANOMALY_BATCHES[key] = _AnomalyBatch( key=key, representative_alert=alert, count=1, started_at=monotonic(), window_seconds=_ANOMALY_WINDOW_SECONDS, timer=timer, ) if timer is not None: timer.start() return True def _flush_anomaly_batch(key: str) -> None: with _ANOMALY_LOCK: batch = _ANOMALY_BATCHES.pop(key, None) if not batch: return duration_seconds = max(batch.window_seconds, monotonic() - batch.started_at) text = _format_anomaly_batch_text( batch.representative_alert, batch.count, duration_seconds, # Telegram: keep body clean, rely on inline button only. include_dashboard_link=False, ) _post_to_telegram(text) def _reset_anomaly_batches_for_tests() -> None: with _ANOMALY_LOCK: for batch in _ANOMALY_BATCHES.values(): try: batch.timer.cancel() except Exception: pass _ANOMALY_BATCHES.clear() def _is_monkeypatched_pooled() -> bool: """Detect if _pooled_request was monkeypatched by tests. If _pooled_request is a callable and is not the original http_sync.request, we consider it monkeypatched and prefer using it. """ try: from http_sync import request as _original # type: ignore except Exception: # pragma: no cover _original = None # type: ignore try: return callable(_pooled_request) and (_pooled_request is not _original) except Exception: return False def _use_pooled_http() -> bool: """Whether to prefer pooled HTTP client over requests. - Explicit opt-in via ALERTS_USE_POOLED_HTTP (1/true/yes/on) - Or when tests monkeypatch _pooled_request to a stub """ val = str(os.getenv("ALERTS_USE_POOLED_HTTP", "")).strip().lower() if val in {"1", "true", "yes", "on"}: return True return _is_monkeypatched_pooled() def _post_to_slack(text: str) -> None: url = os.getenv("SLACK_WEBHOOK_URL") if not url: return try: prefer_pooled = _use_pooled_http() if prefer_pooled and _pooled_request is not None: _pooled_request('POST', url, json={"text": text}, timeout=5) elif _requests is not None: _requests.post(url, json={"text": text}, timeout=5) elif _pooled_request is not None: _pooled_request('POST', url, json={"text": text}, timeout=5) else: raise RuntimeError("no http client available") except Exception: emit_event("alert_forward_slack_error", severity="warn") def _post_to_telegram(text: str) -> None: token = os.getenv("ALERT_TELEGRAM_BOT_TOKEN") chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID") if not token or not chat_id: return try: api = f"https://api.telegram.org/bot{token}/sendMessage" action_url = _build_public_dashboard_url() payload = { "chat_id": chat_id, "text": text, "disable_web_page_preview": True, "reply_markup": { "inline_keyboard": [[{"text": _TELEGRAM_DASHBOARD_BUTTON_TEXT, "url": action_url}]], }, } prefer_pooled = _use_pooled_http() from telegram_api import parse_telegram_json_from_response, require_telegram_ok if prefer_pooled and _pooled_request is not None: resp = _pooled_request('POST', api, json=payload, timeout=5) elif _requests is not None: resp = _requests.post(api, json=payload, timeout=5) elif _pooled_request is not None: resp = _pooled_request('POST', api, json=payload, timeout=5) else: raise RuntimeError("no http client available") body = parse_telegram_json_from_response(resp, url=api) require_telegram_ok(body, url=api) except Exception as e: try: from telegram_api import TelegramAPIError if isinstance(e, TelegramAPIError): emit_event( "telegram_api_error", severity="warn", handled=True, context="alert_forwarder.sendMessage", error_code=getattr(e, "error_code", None), description=getattr(e, "description", None), ) except Exception: pass emit_event("alert_forward_telegram_error", severity="warn") def _severity_rank(value: str | None) -> int: try: v = (value or "").strip().lower() if v in {"critical", "fatal", "crit"}: return 4 if v in {"error", "err", "errors"}: return 3 if v in {"warning", "warn"}: return 2 if v in {"info", "notice", "anomaly"}: return 1 if v in {"debug", "trace"}: return 0 except Exception: return 1 return 1 def _min_telegram_severity_rank() -> int: # Default: send all severities unless explicitly raised via env raw = os.getenv("ALERT_TELEGRAM_MIN_SEVERITY", "info") return _severity_rank(raw)
[תיעוד] def forward_alerts(alerts: List[Dict[str, Any]]) -> None: """Forward a list of Alertmanager alerts to configured sinks, respecting silences (best-effort).""" if not isinstance(alerts, list): return min_tg_rank = _min_telegram_severity_rank() is_startup = ( _STARTUP_GRACE_PERIOD_SECONDS > 0 and (monotonic() - _MODULE_START_MONOTONIC) < _STARTUP_GRACE_PERIOD_SECONDS ) for alert in alerts: try: labels = alert.get("labels", {}) or {} alert_name = str(labels.get("alertname") or labels.get("name") or "") if is_startup and alert_name in _IGNORED_ON_STARTUP: try: emit_event( "alert_suppressed_startup", severity="info", alertname=alert_name, grace_seconds=float(_STARTUP_GRACE_PERIOD_SECONDS), handled=True, ) except Exception: pass continue # Clean Telegram UI: rely on inline button only (no dashboard URL in body). text_telegram = _format_alert_text(alert, include_dashboard_link=False) # Slack (and similar sinks): include dashboard URL in the message text. text_slack = _format_alert_text(alert, include_dashboard_link=True) severity = str(labels.get("severity") or labels.get("level") or "info") # Emit the base receipt event consistently as anomaly to reflect detection, # while preserving the original label in a separate field for observability. mapped_severity = "anomaly" emit_event( "alert_received", severity=mapped_severity, alertname=alert_name, severity_label=str(severity), status=str(alert.get("status") or ""), handled=False, ) # Silence enforcement (pattern on name). Fail-open on errors. try: from monitoring.silences import is_silenced # type: ignore name = alert_name sev_norm = str(severity or "").lower() or None silenced, silence_info = is_silenced(name=name, severity=sev_norm) except Exception: silenced, silence_info = False, None if silenced: try: emit_event( "alert_silenced", severity="info", name=str(labels.get("alertname") or labels.get("name") or ""), silence_id=str((silence_info or {}).get("_id") or ""), until=str((silence_info or {}).get("until_ts") or ""), ) except Exception: pass # Do not send to sinks continue _post_to_slack(text_slack) # Send to Telegram only if severity >= configured minimum if _severity_rank(severity) >= min_tg_rank: if _is_telegram_suppressed(alert_name): try: emit_event( "alert_telegram_suppressed", severity="info", alertname=alert_name, reason="suppressed_list", ) except Exception: pass continue sev_norm = str(severity or "").strip().lower() if sev_norm == "anomaly" and _queue_anomaly_alert(alert): continue _post_to_telegram(text_telegram) except Exception: emit_event("alert_forward_error", severity="warn")