"""
Forward incoming Alertmanager alerts to Slack and/or Telegram.
Environment variables used:
- SLACK_WEBHOOK_URL: Incoming webhook URL for Slack (optional)
- ALERT_TELEGRAM_BOT_TOKEN: Bot token for Telegram (optional)
- ALERT_TELEGRAM_CHAT_ID: Chat ID (or channel) to send alerts to (optional)
- ALERT_TELEGRAM_SUPPRESS_ALERTS: Comma-separated alert names to skip on Telegram (optional)
If none are configured, alerts will still be emitted as structured events.
"""
from __future__ import annotations
import math
import os
import re
from dataclasses import dataclass
from threading import Lock, Timer
from time import monotonic
from typing import Any, Dict, List, Optional
# Graceful degradation for HTTP client: prefer pooled http_sync for retries/backoff,
# fallback to plain requests when pooler is unavailable.
try: # pragma: no cover
from http_sync import request as _pooled_request # type: ignore
except Exception: # pragma: no cover
_pooled_request = None # type: ignore
try: # pragma: no cover
import requests as _requests # type: ignore
except Exception: # pragma: no cover
_requests = None # type: ignore
try:
from observability import emit_event # type: ignore
except Exception: # pragma: no cover
[תיעוד]
def emit_event(event: str, severity: str = "info", **fields): # type: ignore
return None
_PUBLIC_WEBAPP_URL_DEFAULT = "https://code-keeper-webapp.onrender.com"
_DASHBOARD_PATH = "/admin/observability"
_TELEGRAM_DASHBOARD_BUTTON_TEXT = "Open Dashboard"
# Startup grace period: suppress noisy performance alerts right after deploy/cold start.
_MODULE_START_MONOTONIC = monotonic()
_DEFAULT_STARTUP_GRACE_PERIOD_SECONDS = 1200.0 # 20 minutes
try:
_STARTUP_GRACE_PERIOD_SECONDS = float(
os.getenv(
"ALERT_STARTUP_GRACE_PERIOD_SECONDS",
str(int(_DEFAULT_STARTUP_GRACE_PERIOD_SECONDS)),
)
or _DEFAULT_STARTUP_GRACE_PERIOD_SECONDS
)
except Exception:
_STARTUP_GRACE_PERIOD_SECONDS = float(_DEFAULT_STARTUP_GRACE_PERIOD_SECONDS)
_STARTUP_GRACE_PERIOD_SECONDS = max(0.0, float(_STARTUP_GRACE_PERIOD_SECONDS))
_IGNORED_ON_STARTUP = {
# Alerts that commonly flap during cold start / initial DB connection warmup
"AppLatencyEWMARegression",
"MongoLatencyPredictiveSpike",
"SlowMongoConnection",
"HighLatency",
}
_DEFAULT_TELEGRAM_SUPPRESS_ALERTS = {
# Noisy anomaly alert: keep in Slack, suppress in Telegram by default.
"AppLatencyEWMARegression",
}
_ANOMALY_WINDOW_SECONDS = max(
0.0,
float(os.getenv("ALERT_ANOMALY_BATCH_WINDOW_SECONDS", "180") or 180),
)
_ANOMALY_LOCK: Lock = Lock()
_DIGIT_RE = re.compile(r"\d+")
def _normalize_alert_name(value: Any) -> str:
try:
return str(value or "").strip().lower()
except Exception:
return ""
def _parse_alert_list(raw: Optional[str], *, default: Optional[set[str]] = None) -> set[str]:
if raw is None:
items = {_normalize_alert_name(item) for item in (default or set())}
return {item for item in items if item}
try:
value = str(raw or "")
except Exception:
value = ""
value = value.strip()
if not value:
return set()
parts = value.replace("\n", ",").replace(";", ",").split(",")
items: set[str] = set()
for part in parts:
name = part.strip()
if name:
normalized = _normalize_alert_name(name)
if normalized:
items.add(normalized)
return items
_TELEGRAM_SUPPRESS_ALERTS = _parse_alert_list(
os.getenv("ALERT_TELEGRAM_SUPPRESS_ALERTS"),
default=_DEFAULT_TELEGRAM_SUPPRESS_ALERTS,
)
def _is_telegram_suppressed(alert_name: str) -> bool:
name = _normalize_alert_name(alert_name)
return bool(name and name in _TELEGRAM_SUPPRESS_ALERTS)
@dataclass
class _AnomalyBatch:
key: str
representative_alert: Dict[str, Any]
count: int
started_at: float
window_seconds: float
timer: Timer
_ANOMALY_BATCHES: Dict[str, _AnomalyBatch] = {}
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(float(str(value).strip()))
except Exception:
return int(default)
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(str(value).strip().replace("%", "").replace("s", ""))
except Exception:
return float(default)
def _build_public_dashboard_url() -> str:
"""Build a public URL to the observability dashboard (never internal generatorURL)."""
try:
public_url = str(os.getenv("PUBLIC_URL", _PUBLIC_WEBAPP_URL_DEFAULT) or "").strip()
except Exception:
public_url = ""
if not public_url:
public_url = _PUBLIC_WEBAPP_URL_DEFAULT
public_url = public_url.rstrip("/")
return f"{public_url}{_DASHBOARD_PATH}"
def _fmt_minutes(window_seconds: Any) -> str:
sec = _safe_int(window_seconds, 0)
if sec <= 0:
return "—"
minutes = max(1, int(math.ceil(sec / 60.0)))
return f"{minutes} minutes"
def _clean_sentry_noise(text: str) -> str:
"""Remove very noisy Sentry/driver fragments for chat notifications."""
try:
t = str(text or "")
except Exception:
return ""
# Drop TopologyDescription tail (very long) without regex (avoid ReDoS warnings)
try:
lowered = t.lower()
marker = "topology description:"
idx = lowered.find(marker)
if idx >= 0:
t = t[:idx].rstrip(" ,")
except Exception:
pass
# Trim excessive whitespace without regex
try:
t = " ".join(t.split()).strip()
except Exception:
pass
# Hard cap to avoid huge messages
if len(t) > 260:
t = t[:259] + "…"
return t
def _extract_host_from_text(text: str) -> Optional[str]:
# Avoid regex: scan for ".mongodb.net" suffix and expand token leftwards.
try:
s = str(text or "")
except Exception:
return None
lowered = s.lower()
suffix = ".mongodb.net"
idx = lowered.find(suffix)
if idx < 0:
return None
end = idx + len(suffix)
# Expand left until non-host character
allowed = set("abcdefghijklmnopqrstuvwxyz0123456789-._")
i = idx - 1
while i >= 0:
ch = lowered[i]
if ch in allowed:
i -= 1
continue
break
start = i + 1
host = s[start:end].strip()
return host or None
def _extract_connect_timeout_ms(text: str) -> Optional[int]:
# Avoid regex: locate "connectTimeoutMS:" and parse number until "ms"
try:
s = str(text or "")
except Exception:
return None
lowered = s.lower()
key = "connecttimeoutms:"
idx = lowered.find(key)
if idx < 0:
return None
j = idx + len(key)
# Skip whitespace
while j < len(s) and s[j].isspace():
j += 1
# Read number (digits + dot)
k = j
while k < len(s) and (s[k].isdigit() or s[k] == "."):
k += 1
num = s[j:k].strip()
if not num:
return None
try:
return int(float(num))
except Exception:
return None
def _parse_avg_threshold_from_summary(summary: str) -> tuple[Optional[float], Optional[float]]:
"""Parse `avg_rt=3.737s (threshold 3.000s)` style summaries."""
try:
s = str(summary or "")
except Exception:
return None, None
def _extract_float_after(label: str) -> Optional[float]:
try:
low = s.lower()
idx = low.find(label.lower())
if idx < 0:
return None
j = idx + len(label)
while j < len(s) and s[j].isspace():
j += 1
if j < len(s) and s[j] == "=":
j += 1
while j < len(s) and s[j].isspace():
j += 1
k = j
while k < len(s) and (s[k].isdigit() or s[k] == "."):
k += 1
num = s[j:k].strip()
return float(num) if num else None
except Exception:
return None
avg = _extract_float_after("avg_rt")
thr = _extract_float_after("threshold")
return avg, thr
def _parse_percent_pair_from_summary(summary: str) -> tuple[Optional[float], Optional[float]]:
"""Parse `...=60.00% > threshold=20.00%` style summaries."""
try:
s = str(summary or "")
except Exception:
return None, None
def _extract_percent_after(label: str) -> Optional[float]:
try:
low = s.lower()
idx = low.find(label.lower())
if idx < 0:
return None
j = idx + len(label)
while j < len(s) and s[j].isspace():
j += 1
if j < len(s) and s[j] == "=":
j += 1
while j < len(s) and s[j].isspace():
j += 1
k = j
while k < len(s) and (s[k].isdigit() or s[k] == "."):
k += 1
num = s[j:k].strip()
# Require a following '%' to reduce false positives
if not num:
return None
if "%" not in s[k : k + 3]:
# tolerate "60.0%," or "60.0% "
pass
return float(num)
except Exception:
return None
# Current is typically the first "...=NN%" token; use the first '=' occurrence.
cur = None
try:
eq = s.find("=")
if eq >= 0:
tmp = s[eq + 1 :]
k = 0
while k < len(tmp) and tmp[k].isspace():
k += 1
j = k
while j < len(tmp) and (tmp[j].isdigit() or tmp[j] == "."):
j += 1
num = tmp[k:j].strip()
if num:
cur = float(num)
except Exception:
cur = None
thr = _extract_percent_after("threshold")
return cur, thr
def _format_external_service_degraded(
*,
service: Optional[str],
current_percent: Optional[float],
threshold_percent: Optional[float],
sample_count: int,
error_count: int,
window_seconds: int,
) -> str:
svc = service or "Unknown"
cur = f"{current_percent:.0f}%" if current_percent is not None else "—"
thr = f"{threshold_percent:.0f}%" if threshold_percent is not None else "—"
window = _fmt_minutes(window_seconds)
return "\n".join(
[
"⚠️ External Service Degraded",
f"Service: {svc}",
f"📉 Error Rate: {cur} (Threshold: {thr})",
"📊 Stats:",
f"• Errors: {error_count} / {max(0, sample_count)} requests",
f"• Window: {window}",
"🛑 Action Required: Check external provider status page.",
]
)
def _parse_slow_endpoints_compact(value: Optional[str]) -> List[Dict[str, Any]]:
if not value:
return []
try:
raw = str(value)
except Exception:
return []
entries: List[Dict[str, Any]] = []
for chunk in [c.strip() for c in raw.split(";") if c.strip()]:
# Format from internal_alerts: "METHOD endpoint: 10.53s (n=1)" (avoid regex)
try:
parts = chunk.split(None, 1)
if len(parts) != 2:
continue
method = str(parts[0] or "").strip().upper()
rest = str(parts[1] or "").strip()
if not method or not rest:
continue
if ": " not in rest:
continue
endpoint_part, tail = rest.rsplit(": ", 1)
endpoint = endpoint_part.strip()
if not endpoint:
continue
# tail: "10.53s (n=1)"
if "s" not in tail:
continue
sec_text = tail.split("s", 1)[0].strip()
seconds = float(sec_text)
count = 1
n_idx = tail.find("(n=")
if n_idx >= 0:
close = tail.find(")", n_idx)
if close > n_idx:
n_text = tail[n_idx + 3 : close].strip()
if n_text.isdigit():
count = int(n_text)
entries.append({"method": method, "endpoint": endpoint, "seconds": seconds, "count": count})
except Exception:
continue
return entries
def _format_anomaly_alert(
*,
avg_rt: Optional[float],
threshold: Optional[float],
top_slow_endpoint: Optional[str],
slow_endpoints_compact: Optional[str],
active_requests: Optional[str],
memory_mb: Optional[str],
recent_errors_5m: Optional[str],
) -> str:
avg_s = f"{avg_rt:.2f}s" if avg_rt is not None else "—"
thr_s = f"{threshold:.2f}s" if threshold is not None else "—"
entries = _parse_slow_endpoints_compact(slow_endpoints_compact)
main = entries[0] if entries else None
others = entries[1:4] if len(entries) > 1 else []
# Fallback: parse "GET index (10.525s)" when compact list is missing
if main is None and top_slow_endpoint:
try:
raw = str(top_slow_endpoint or "").strip()
if raw:
parts = raw.split(None, 1)
if len(parts) == 2:
method = parts[0].strip().upper()
rest = parts[1].strip()
lpar = rest.rfind("(")
rpar = rest.rfind(")")
if lpar >= 0 and rpar > lpar:
endpoint = rest[:lpar].strip()
dur_raw = rest[lpar + 1 : rpar].strip()
if dur_raw.endswith("s"):
dur_raw = dur_raw[:-1].strip()
seconds = float(dur_raw)
main = {"method": method, "endpoint": endpoint, "seconds": seconds, "count": 1}
except Exception:
main = None
health_lines: List[str] = []
if active_requests is not None:
health_lines.append(f"• Active Req: {active_requests}")
if memory_mb is not None:
try:
mem = float(str(memory_mb).strip())
health_lines.append(f"• Memory: {mem:.0f} MB")
except Exception:
health_lines.append(f"• Memory: {memory_mb} MB")
if recent_errors_5m is not None:
health_lines.append(f"• Errors (5m): {recent_errors_5m}")
lines = [
"🐢 System Anomaly Detected",
f"Avg Response: {avg_s} (Threshold: {thr_s})",
]
if main:
lines.extend(
[
"🐌 Main Bottleneck:",
f"{main.get('method')} {main.get('endpoint')}",
f"⏱️ {float(main.get('seconds') or 0.0):.2f}s",
]
)
if others:
lines.append("📉 Also Slow in this Window:")
for item in others:
try:
lines.append(f"• {item.get('method')} {item.get('endpoint')}: {float(item.get('seconds') or 0.0):.2f}s")
except Exception:
continue
if health_lines:
lines.append("📊 Resource Usage:")
lines.extend(health_lines)
return "\n".join(lines)
def _format_sentry_issue(
*,
short_id: Optional[str],
summary: str,
last_seen: Optional[str],
sentry_link: Optional[str],
) -> str:
raw = str(summary or "")
cleaned = _clean_sentry_noise(raw)
sid = short_id or "—"
lowered = raw.lower()
# Heuristics for clearer titles
if "_operationcancelled" in lowered or "operation cancelled" in lowered:
title = "🧹 DB Pool Maintenance Warning"
body = "Background cleanup task failed due to network instability."
host = _extract_host_from_text(raw)
if host:
body += f"\nTarget: {host}"
text = "\n".join([title, f"Source: Sentry ({sid})", f"Error: Operation Cancelled", body])
elif "ssl handshake failed" in lowered:
title = "🔐 SSL Handshake Failed"
host = _extract_host_from_text(raw) or "Unknown Host"
timeout_ms = _extract_connect_timeout_ms(raw)
timeout_line = f"⏱️ Timeout: {timeout_ms}ms ({(timeout_ms/1000.0):.0f}s)" if timeout_ms else "⏱️ Timeout: —"
text = "\n".join(
[
title,
f"Source: Sentry ({sid})",
f"Target: {host}",
timeout_line,
_clean_sentry_noise(raw),
]
)
elif ("replicasetnoprimary" in lowered) or ("primary()" in lowered) or ("no replica set members match selector" in lowered):
title = "🚨 Critical Database Error"
text = "\n".join(
[
title,
f"Source: Sentry ({sid})",
"💀 Issue: MongoDB Connection Failed",
cleaned or "—",
]
)
else:
title = "🚨 Sentry Alert"
text = "\n".join([title, f"Source: Sentry ({sid})", cleaned or "—"])
if last_seen:
text += f"\n📅 Last Seen: {last_seen}"
if sentry_link:
text += f"\n🔗 Action: {sentry_link}"
return text
def _format_alert_text(alert: Dict[str, Any], *, include_dashboard_link: bool = True) -> str:
labels = alert.get("labels", {}) or {}
annotations = alert.get("annotations", {}) or {}
status = str(alert.get("status") or "firing")
severity = str(labels.get("severity") or labels.get("level") or "info").upper()
name = labels.get("alertname") or labels.get("name") or "Alert"
# Short summary/description
summary = annotations.get("summary") or annotations.get("description") or ""
dashboard_line = f"📊 Dashboard: {_build_public_dashboard_url()}"
def _with_dashboard(text: str) -> str:
if include_dashboard_link:
return f"{text}\n{dashboard_line}"
return text
# Useful context (allowlist of common labels)
def _first(keys: List[str]) -> Optional[str]:
for k in keys:
v = labels.get(k)
if v:
return str(v)
v = annotations.get(k)
if v:
return str(v)
return None
service = _first(["service", "app", "application", "job", "component"])
environment = _first(["env", "environment", "namespace", "cluster"]) # loose mapping
instance = _first(["instance", "pod", "hostname"]) # k8s or host
request_id = _first(["request_id", "request-id", "x-request-id", "x_request_id"]) # if present
# --- Specialized templates for high-signal alerts ---
try:
if str(name or "").strip() == "External Service Degraded":
cur, thr = _parse_percent_pair_from_summary(str(summary or ""))
# Prefer explicit annotations from internal_alerts; fallback to details_preview parsing
cur = _safe_float(_first(["current_percent"])) if _first(["current_percent"]) else cur
thr = _safe_float(_first(["threshold_percent"])) if _first(["threshold_percent"]) else thr
sample_count = _safe_int(_first(["sample_count"]), 0)
error_count = _safe_int(_first(["error_count"]), 0)
window_seconds = _safe_int(_first(["window_seconds"]), 0)
msg = _format_external_service_degraded(
service=service,
current_percent=cur,
threshold_percent=thr,
sample_count=sample_count,
error_count=error_count,
window_seconds=window_seconds,
)
sentry_link = _build_sentry_link(
direct_url=_first(["sentry_permalink", "sentry_url", "sentry"]),
request_id=request_id,
error_signature=_first(["error_signature", "signature"]),
)
if sentry_link:
msg += f"\nSentry: {sentry_link}"
return _with_dashboard(msg)
if str(name or "").strip().lower() == "anomaly_detected":
avg, thr = _parse_avg_threshold_from_summary(str(summary or ""))
msg = _format_anomaly_alert(
avg_rt=avg,
threshold=thr,
top_slow_endpoint=_first(["top_slow_endpoint"]),
slow_endpoints_compact=_first(["slow_endpoints_compact"]),
active_requests=_first(["active_requests"]),
memory_mb=_first(["avg_memory_usage_mb"]),
recent_errors_5m=_first(["recent_errors_5m"]),
)
return _with_dashboard(msg)
# Only treat as a Sentry-origin alert when the alert itself is named as such.
# Regular alerts may still include a Sentry permalink for convenience.
if str(name or "").strip().lower().startswith("sentry:"):
msg = _format_sentry_issue(
short_id=_first(["sentry_short_id"]) or (str(name).split(":", 1)[-1].strip() if ":" in str(name) else None),
summary=str(summary or ""),
last_seen=_first(["sentry_last_seen"]),
sentry_link=_build_sentry_link(
direct_url=_first(["sentry_permalink", "sentry_url", "sentry"]),
request_id=request_id,
error_signature=_first(["error_signature", "signature"]),
),
)
return _with_dashboard(msg)
except Exception:
# If specialized formatting fails, fall back to generic.
pass
parts = [f"🔔 {name} ({severity})"]
if summary:
parts.append(str(summary))
if service:
parts.append(f"service: {service}")
if environment:
parts.append(f"env: {environment}")
if instance:
parts.append(f"instance: {instance}")
if request_id:
parts.append(f"request_id: {request_id}")
detail_preview = annotations.get("details_preview") or annotations.get("details")
if detail_preview:
parts.append(str(detail_preview))
# Best-effort: add Sentry link (issue/events search) when configured
sentry_link = _build_sentry_link(
direct_url=_first(["sentry_permalink", "sentry_url", "sentry"]),
request_id=request_id,
error_signature=_first(["error_signature", "signature"]),
)
if sentry_link:
parts.append(f"Sentry: {sentry_link}")
# Always prefer a stable public dashboard link over alertmanager's generatorURL
if include_dashboard_link:
parts.append(dashboard_line)
return "\n".join(parts)
def _build_sentry_link(
direct_url: Optional[str] = None,
request_id: Optional[str] = None,
error_signature: Optional[str] = None,
) -> Optional[str]:
"""Build a Sentry UI link.
Priority:
1) If a direct permalink is provided – return it.
2) If request_id is present – build an events/issues query for that request_id.
3) Else, if error_signature is present – build a query by signature.
Returns None when Sentry env is not configured.
"""
try:
# 1) Direct URL from alert annotations/labels
if direct_url:
return str(direct_url)
# Derive UI base from explicit dashboard URL or DSN + ORG
dashboard = os.getenv("SENTRY_DASHBOARD_URL") or os.getenv("SENTRY_PROJECT_URL")
if dashboard:
base_url = dashboard.rstrip("/")
# Ensure we point to org scope when given a project URL
# We'll still append ?query=... which both endpoints accept.
else:
dsn = os.getenv("SENTRY_DSN") or ""
host = None
if dsn:
try:
from urllib.parse import urlparse
parsed = urlparse(dsn)
raw_host = parsed.hostname or ""
except Exception:
raw_host = ""
# Preserve regional subdomain when present, e.g. o123.ingest.eu.sentry.io -> eu.sentry.io
if ".ingest." in raw_host:
try:
host = raw_host.split(".ingest.", 1)[1]
except Exception:
host = None
elif raw_host.startswith("ingest."):
host = raw_host[len("ingest."):]
elif raw_host == "sentry.io" or raw_host.endswith(".sentry.io"):
host = "sentry.io"
else:
host = raw_host or None
host = host or "sentry.io"
org = os.getenv("SENTRY_ORG") or os.getenv("SENTRY_ORG_SLUG")
if not org:
return None
base_url = f"https://{host}/organizations/{org}/issues"
from urllib.parse import quote_plus
if request_id:
q = quote_plus(f'request_id:"{request_id}"')
return f"{base_url}/?query={q}&statsPeriod=24h"
if error_signature:
q = quote_plus(f'error_signature:"{error_signature}"')
return f"{base_url}/?query={q}&statsPeriod=24h"
return None
except Exception:
return None
def _format_duration_label(seconds: float) -> str:
try:
seconds = float(seconds)
except Exception:
seconds = 60.0
minutes = max(1, int(math.ceil(seconds / 60.0)))
return f"{minutes} דקות"
def _format_anomaly_batch_text(
alert: Dict[str, Any],
count: int,
duration_seconds: float,
*,
include_dashboard_link: bool,
) -> str:
base = _format_alert_text(alert, include_dashboard_link=include_dashboard_link)
duration = _format_duration_label(duration_seconds)
return f"{base}\n{count} מופעים ב-{duration}"
def _anomaly_bucket_key(alert: Dict[str, Any]) -> str:
labels = alert.get("labels", {}) or {}
annotations = alert.get("annotations", {}) or {}
def _pick(sources: List[Dict[str, Any]], keys: List[str]) -> str:
for key in keys:
for src in sources:
try:
val = src.get(key)
except Exception:
continue
if val not in (None, ""):
return str(val).strip().lower()
return ""
alert_name = _pick([labels], ["alertname", "name"]) or "anomaly"
service = _pick([labels], ["service", "app", "application", "job", "component"])
environment = _pick([labels], ["env", "environment", "namespace", "cluster"])
signature = _pick([annotations, labels], ["error_signature", "signature"])
if not signature:
summary = _pick([annotations], ["summary", "description"])
if summary:
signature = _DIGIT_RE.sub("0", summary.lower())[:80]
parts = [alert_name]
if service:
parts.append(service)
if environment:
parts.append(environment)
if signature:
parts.append(signature)
return "|".join(parts)
def _queue_anomaly_alert(alert: Dict[str, Any]) -> bool:
if _ANOMALY_WINDOW_SECONDS <= 0:
return False
key = _anomaly_bucket_key(alert)
timer: Timer | None = None
with _ANOMALY_LOCK:
batch = _ANOMALY_BATCHES.get(key)
if batch:
batch.count += 1
batch.representative_alert = alert
return True
timer = Timer(_ANOMALY_WINDOW_SECONDS, _flush_anomaly_batch, args=(key,))
timer.daemon = True
_ANOMALY_BATCHES[key] = _AnomalyBatch(
key=key,
representative_alert=alert,
count=1,
started_at=monotonic(),
window_seconds=_ANOMALY_WINDOW_SECONDS,
timer=timer,
)
if timer is not None:
timer.start()
return True
def _flush_anomaly_batch(key: str) -> None:
with _ANOMALY_LOCK:
batch = _ANOMALY_BATCHES.pop(key, None)
if not batch:
return
duration_seconds = max(batch.window_seconds, monotonic() - batch.started_at)
text = _format_anomaly_batch_text(
batch.representative_alert,
batch.count,
duration_seconds,
# Telegram: keep body clean, rely on inline button only.
include_dashboard_link=False,
)
_post_to_telegram(text)
def _reset_anomaly_batches_for_tests() -> None:
with _ANOMALY_LOCK:
for batch in _ANOMALY_BATCHES.values():
try:
batch.timer.cancel()
except Exception:
pass
_ANOMALY_BATCHES.clear()
def _is_monkeypatched_pooled() -> bool:
"""Detect if _pooled_request was monkeypatched by tests.
If _pooled_request is a callable and is not the original http_sync.request,
we consider it monkeypatched and prefer using it.
"""
try:
from http_sync import request as _original # type: ignore
except Exception: # pragma: no cover
_original = None # type: ignore
try:
return callable(_pooled_request) and (_pooled_request is not _original)
except Exception:
return False
def _use_pooled_http() -> bool:
"""Whether to prefer pooled HTTP client over requests.
- Explicit opt-in via ALERTS_USE_POOLED_HTTP (1/true/yes/on)
- Or when tests monkeypatch _pooled_request to a stub
"""
val = str(os.getenv("ALERTS_USE_POOLED_HTTP", "")).strip().lower()
if val in {"1", "true", "yes", "on"}:
return True
return _is_monkeypatched_pooled()
def _post_to_slack(text: str) -> None:
url = os.getenv("SLACK_WEBHOOK_URL")
if not url:
return
try:
prefer_pooled = _use_pooled_http()
if prefer_pooled and _pooled_request is not None:
_pooled_request('POST', url, json={"text": text}, timeout=5)
elif _requests is not None:
_requests.post(url, json={"text": text}, timeout=5)
elif _pooled_request is not None:
_pooled_request('POST', url, json={"text": text}, timeout=5)
else:
raise RuntimeError("no http client available")
except Exception:
emit_event("alert_forward_slack_error", severity="warn")
def _post_to_telegram(text: str) -> None:
token = os.getenv("ALERT_TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID")
if not token or not chat_id:
return
try:
api = f"https://api.telegram.org/bot{token}/sendMessage"
action_url = _build_public_dashboard_url()
payload = {
"chat_id": chat_id,
"text": text,
"disable_web_page_preview": True,
"reply_markup": {
"inline_keyboard": [[{"text": _TELEGRAM_DASHBOARD_BUTTON_TEXT, "url": action_url}]],
},
}
prefer_pooled = _use_pooled_http()
from telegram_api import parse_telegram_json_from_response, require_telegram_ok
if prefer_pooled and _pooled_request is not None:
resp = _pooled_request('POST', api, json=payload, timeout=5)
elif _requests is not None:
resp = _requests.post(api, json=payload, timeout=5)
elif _pooled_request is not None:
resp = _pooled_request('POST', api, json=payload, timeout=5)
else:
raise RuntimeError("no http client available")
body = parse_telegram_json_from_response(resp, url=api)
require_telegram_ok(body, url=api)
except Exception as e:
try:
from telegram_api import TelegramAPIError
if isinstance(e, TelegramAPIError):
emit_event(
"telegram_api_error",
severity="warn",
handled=True,
context="alert_forwarder.sendMessage",
error_code=getattr(e, "error_code", None),
description=getattr(e, "description", None),
)
except Exception:
pass
emit_event("alert_forward_telegram_error", severity="warn")
def _severity_rank(value: str | None) -> int:
try:
v = (value or "").strip().lower()
if v in {"critical", "fatal", "crit"}:
return 4
if v in {"error", "err", "errors"}:
return 3
if v in {"warning", "warn"}:
return 2
if v in {"info", "notice", "anomaly"}:
return 1
if v in {"debug", "trace"}:
return 0
except Exception:
return 1
return 1
def _min_telegram_severity_rank() -> int:
# Default: send all severities unless explicitly raised via env
raw = os.getenv("ALERT_TELEGRAM_MIN_SEVERITY", "info")
return _severity_rank(raw)
[תיעוד]
def forward_alerts(alerts: List[Dict[str, Any]]) -> None:
"""Forward a list of Alertmanager alerts to configured sinks, respecting silences (best-effort)."""
if not isinstance(alerts, list):
return
min_tg_rank = _min_telegram_severity_rank()
is_startup = (
_STARTUP_GRACE_PERIOD_SECONDS > 0
and (monotonic() - _MODULE_START_MONOTONIC) < _STARTUP_GRACE_PERIOD_SECONDS
)
for alert in alerts:
try:
labels = alert.get("labels", {}) or {}
alert_name = str(labels.get("alertname") or labels.get("name") or "")
if is_startup and alert_name in _IGNORED_ON_STARTUP:
try:
emit_event(
"alert_suppressed_startup",
severity="info",
alertname=alert_name,
grace_seconds=float(_STARTUP_GRACE_PERIOD_SECONDS),
handled=True,
)
except Exception:
pass
continue
# Clean Telegram UI: rely on inline button only (no dashboard URL in body).
text_telegram = _format_alert_text(alert, include_dashboard_link=False)
# Slack (and similar sinks): include dashboard URL in the message text.
text_slack = _format_alert_text(alert, include_dashboard_link=True)
severity = str(labels.get("severity") or labels.get("level") or "info")
# Emit the base receipt event consistently as anomaly to reflect detection,
# while preserving the original label in a separate field for observability.
mapped_severity = "anomaly"
emit_event(
"alert_received",
severity=mapped_severity,
alertname=alert_name,
severity_label=str(severity),
status=str(alert.get("status") or ""),
handled=False,
)
# Silence enforcement (pattern on name). Fail-open on errors.
try:
from monitoring.silences import is_silenced # type: ignore
name = alert_name
sev_norm = str(severity or "").lower() or None
silenced, silence_info = is_silenced(name=name, severity=sev_norm)
except Exception:
silenced, silence_info = False, None
if silenced:
try:
emit_event(
"alert_silenced",
severity="info",
name=str(labels.get("alertname") or labels.get("name") or ""),
silence_id=str((silence_info or {}).get("_id") or ""),
until=str((silence_info or {}).get("until_ts") or ""),
)
except Exception:
pass
# Do not send to sinks
continue
_post_to_slack(text_slack)
# Send to Telegram only if severity >= configured minimum
if _severity_rank(severity) >= min_tg_rank:
if _is_telegram_suppressed(alert_name):
try:
emit_event(
"alert_telegram_suppressed",
severity="info",
alertname=alert_name,
reason="suppressed_list",
)
except Exception:
pass
continue
sev_norm = str(severity or "").strip().lower()
if sev_norm == "anomaly" and _queue_anomaly_alert(alert):
continue
_post_to_telegram(text_telegram)
except Exception:
emit_event("alert_forward_error", severity="warn")