הראה קוד מקור ל monitoring.alerts_storage

"""
MongoDB-backed alerts storage with TTL and simple counters.

Design goals:
- Fail-open: never throw from public APIs
- Lazy init: connect to Mongo only on first use
- Config via env only (avoid importing global config to prevent cycles)
- TTL-based retention (default 30 days) to keep the collection bounded

Environment variables:
- ALERTS_DB_ENABLED: "true/1/yes" to enable writes (fallback to METRICS_DB_ENABLED)
- MONGODB_URL: required when enabled
- DATABASE_NAME: DB name (default: code_keeper_bot)
- ALERTS_COLLECTION: Collection name (default: alerts_log)
- ALERTS_TTL_DAYS: TTL for documents (default: 30)

Public API:
- record_alert(alert_id, name, severity, summary, source) -> None
- count_alerts_since(since_dt) -> tuple[int, int]
- count_alerts_last_hours(hours=24) -> tuple[int, int]
"""
from __future__ import annotations

from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional, Tuple
import hashlib
import os
import re


def _is_true(val: Optional[str]) -> bool:
    return str(val or "").lower() in {"1", "true", "yes", "on"}


def _write_enabled() -> bool:
    """Check if WRITING alerts to DB is enabled.

    Explicit opt-in wins over global disable to support tests and targeted writes.
    """
    if _is_true(os.getenv("ALERTS_DB_ENABLED")):
        return True
    if _is_true(os.getenv("DISABLE_DB")):
        return False
    # Fall back to metrics DB flag when explicit alerts flag is not set
    return _is_true(os.getenv("METRICS_DB_ENABLED"))


def _read_enabled() -> bool:
    """Check if READING alerts from DB is enabled.

    קריאה מה-DB נשארת פעילה תמיד (אלא אם מושבתת מפורשות),
    כדי שדשבורד ה-Observability יציג נתונים היסטוריים גם כשהכתיבה מושבתת.
    """
    if _is_true(os.getenv("DISABLE_ALERTS_READS")):
        return False
    # Explicit opt-in wins over global disable (tests often set DISABLE_DB=1 by default).
    # אם ALERTS_DB_ENABLED=true או METRICS_DB_ENABLED=true - אפשר לקרוא (גם אם DISABLE_DB=true)
    if _is_true(os.getenv("ALERTS_DB_ENABLED")):
        return True
    if _is_true(os.getenv("METRICS_DB_ENABLED")):
        return True
    if _is_true(os.getenv("DISABLE_DB")):
        return False
    # Fallback: אפשר קריאה אם יש MongoDB URL מוגדר (גם בלי הדגלים)
    return bool(os.getenv("MONGODB_URL"))


def _enabled() -> bool:
    """Legacy function - now delegates to _write_enabled for backwards compatibility."""
    return _write_enabled()


_client = None  # type: ignore
_collection = None  # type: ignore
_catalog_collection = None  # type: ignore
_init_failed = False  # True when initialization permanently failed (e.g., pymongo missing)
_write_disabled = False  # True when writes are intentionally disabled (not a failure)

_SENSITIVE_DETAIL_KEYS = {
    "token",
    "password",
    "secret",
    "authorization",
    "auth",
    "email",
    "phone",
    "session",
    "cookie",
}
_ENDPOINT_HINT_KEYS = ("endpoint", "path", "route", "url", "request_path")
_ALERT_TYPE_HINT_KEYS = ("alert_type", "type", "category", "kind")
_DETAIL_TEXT_LIMIT = 512


def _safe_str(value: Any, *, limit: int = 256) -> str:
    try:
        # חשוב: לא להשתמש ב-`value or ""` כי ערכים "שקריים" (כמו 0, False, [], {})
        # יהפכו בטעות למחרוזת ריקה ויגרמו לאיבוד מידע בלוח ה-Observability.
        if value is None:
            text = ""
        else:
            text = str(value).strip()
    except Exception:
        text = ""
    if limit and len(text) > limit:
        return text[:limit]
    return text


def _sanitize_signature(raw: Any) -> str:
    """
    Normalize a potentially user-controlled signature value into a safe identifier.

    We hash the input into a fixed-length hex string so that arbitrary content
    cannot affect the MongoDB query structure and only acts as an opaque key.
    """
    try:
        # Use a bounded, stringified representation as hashing input.
        normalized = _safe_str(raw, limit=512).strip()
        if not normalized:
            return ""
        norm_l = normalized.lower()
        # New format: 64-hex sha256
        try:
            if re.fullmatch(r"[0-9a-f]{64}", norm_l):
                return norm_l
        except Exception:
            pass
        # Legacy: 16-hex -> migrate key-space by hashing into 64-hex
        try:
            if re.fullmatch(r"[0-9a-f]{16}", norm_l):
                return hashlib.sha256(norm_l.encode("utf-8", errors="ignore")).hexdigest()
        except Exception:
            pass
        # Default: hash arbitrary input into a fixed 64-hex key
        return hashlib.sha256(norm_l.encode("utf-8", errors="ignore")).hexdigest()
    except Exception:
        # Fail-closed for sanitization: empty string means "no signature"
        return ""


def _sanitize_details(details: Optional[Dict[str, Any]]) -> Dict[str, Any]:
    """
    שמירה "Fail-open" של details עבור DB/UI, בלי למחוק שדות.

    עקרונות:
    - לא מוחקים מפתחות (כולל sentry_issue_id/labels/slow_endpoints וכו').
    - מפתחות רגישים נשמרים אבל הערך שלהם נכתב כ-<REDACTED> במקום להיעלם.
    - שומרים טיפוסים של dict/list כדי לא להפוך אותם למחרוזות ריקות (למשל []/{}).
    - מגבילים רק מחרוזות ארוכות, ובמקרים חריגים ממירים לאובייקט ניתן-ייצוג.
    """
    if not isinstance(details, dict):
        return {}

    seen: set[int] = set()

    def _sanitize_value(key_hint: str, value: Any, *, depth: int) -> Any:
        # Redact by key name (case-insensitive) but do not drop the key
        try:
            lk = str(key_hint).lower()
        except Exception:
            lk = ""
        if lk in _SENSITIVE_DETAIL_KEYS:
            return "<REDACTED>"

        # Preserve explicit None (do not delete keys)
        if value is None:
            return None

        # Keep primitives as-is (Mongo-friendly)
        if isinstance(value, bool):
            return bool(value)
        if isinstance(value, (int, float)):
            return value
        if isinstance(value, str):
            return _safe_str(value, limit=_DETAIL_TEXT_LIMIT)

        # Preserve dict/list structures (and sanitize recursively).
        # חשוב: לא לקרוא ל-_safe_str על dict/list/tuple גם במצב depth limit,
        # כדי לא להפוך אובייקטים מורכבים למחרוזות "שבורות" ב-UI.
        if isinstance(value, (dict, list, tuple)):
            if depth <= 0:
                # שומרים טיפוס ומחזירים עותק שטוח (בלי רקורסיה נוספת)
                # כדי למנוע Data Corruption: לא "לאפס" ל-{} / [] ולא להמיר ל-str().
                try:
                    if isinstance(value, dict):
                        # חשוב: גם בקצה העומק חייבים לבצע Redaction שטחי למפתחות רגישים,
                        # אחרת סודות (password/token/secret וכו') יכולים לדלוף.
                        out: Dict[str, Any] = {}
                        for k, v in value.items():
                            try:
                                sk = str(k)
                            except Exception:
                                sk = "<unprintable-key>"
                            try:
                                lk = sk.lower()
                            except Exception:
                                lk = ""
                            if lk in _SENSITIVE_DETAIL_KEYS:
                                out[sk] = "<REDACTED>"
                            else:
                                out[sk] = v
                        return out
                    if isinstance(value, list):
                        return list(value)
                    # tuple -> list (Mongo-friendly)
                    return list(value)
                except Exception:
                    # Fail-open: אם אפילו העתקה שטוחה נכשלת, נחזיר מבנה ריק בטוח
                    return {} if isinstance(value, dict) else []

            try:
                obj_id = id(value)
            except Exception:
                obj_id = 0
            if obj_id and obj_id in seen:
                return "<CYCLE>"
            if obj_id:
                seen.add(obj_id)
            try:
                if isinstance(value, dict):
                    out: Dict[str, Any] = {}
                    for k2, v2 in value.items():
                        out[str(k2)] = _sanitize_value(str(k2), v2, depth=depth - 1)
                    return out
                if isinstance(value, list):
                    return [_sanitize_value(key_hint, v2, depth=depth - 1) for v2 in value]
                # tuple -> list (Mongo-friendly)
                return [_sanitize_value(key_hint, v2, depth=depth - 1) for v2 in list(value)]
            finally:
                # Do not try to remove from seen; cycle protection is best-effort.
                pass

        if depth <= 0:
            return _safe_str(value, limit=_DETAIL_TEXT_LIMIT)

        # Fallback: safe string representation
        return _safe_str(value, limit=_DETAIL_TEXT_LIMIT)

    clean: Dict[str, Any] = {}
    for key, value in details.items():
        # Never drop keys – stringify key best-effort
        try:
            sk = str(key)
        except Exception:
            sk = "<unprintable-key>"
        clean[sk] = _sanitize_value(sk, value, depth=6)
    return clean


def _extract_endpoint(details: Dict[str, Any]) -> Optional[str]:
    for key in _ENDPOINT_HINT_KEYS:
        try:
            value = details.get(key)
        except Exception:
            continue
        if value not in (None, ""):
            text = _safe_str(value, limit=256)
            if text:
                return text
    return None


def _extract_alert_type(name: str, details: Dict[str, Any]) -> Optional[str]:
    for key in _ALERT_TYPE_HINT_KEYS:
        try:
            value = details.get(key)
        except Exception:
            continue
        if value not in (None, ""):
            return _safe_str(value, limit=128).lower()
    if name and name.lower() == "deployment_event":
        return "deployment_event"
    return None


def _extract_duration(details: Dict[str, Any]) -> Optional[float]:
    for key in ("duration_seconds", "duration", "duration_secs", "duration_ms"):
        try:
            value = details.get(key)
        except Exception:
            continue
        if value in (None, ""):
            continue
        try:
            num = float(value)
        except Exception:
            continue
        if key.endswith("_ms"):
            num = num / 1000.0
        if num >= 0:
            return num
    return None


def _build_search_blob(name: str, summary: str, details: Dict[str, Any]) -> str:
    parts = [name or "", summary or ""]
    if details:
        for key, value in details.items():
            try:
                parts.append(f"{key}:{value}")
            except Exception:
                continue
    text = " | ".join(part for part in parts if part)
    return _safe_str(text, limit=2048)


def _build_time_filter(start_dt: Optional[datetime], end_dt: Optional[datetime]) -> Dict[str, Any]:
    if not start_dt and not end_dt:
        return {}
    match: Dict[str, Any] = {}
    window: Dict[str, Any] = {}
    if start_dt:
        window["$gte"] = start_dt
    if end_dt:
        window["$lte"] = end_dt
    if window:
        match["ts_dt"] = window
    return match


def _get_collection(*, for_read: bool = True):  # pragma: no cover - exercised indirectly
    """Get the MongoDB collection for alerts storage.

    Args:
        for_read: If True, allows connection even when writes are disabled.
                  This enables the Observability dashboard to show historical data.
    """
    global _client, _collection, _init_failed, _write_disabled

    # If already initialized successfully, return the collection
    if _collection is not None:
        return _collection

    # If initialization permanently failed (e.g., pymongo missing), don't retry
    if _init_failed:
        return None

    # If writes are disabled and this is a write request, return None
    # but allow read requests to proceed with initialization
    if _write_disabled and not for_read:
        return None

    # Check if we should connect based on read/write mode
    enabled = _read_enabled() if for_read else _write_enabled()
    if not enabled:
        # Mark write_disabled (not init_failed) so reads can still work
        if not for_read:
            _write_disabled = True
        return None

    try:
        try:
            from pymongo import MongoClient  # type: ignore
            from pymongo import ASCENDING  # type: ignore
        except Exception:
            _init_failed = True
            return None

        # Allow tests/environments without explicit URL to fall back to localhost.
        # This keeps public APIs fail-open and enables unit-test fakes for pymongo.
        mongo_url = os.getenv("MONGODB_URL") or "mongodb://localhost:27017"

        db_name = os.getenv("DATABASE_NAME") or "code_keeper_bot"
        coll_name = os.getenv("ALERTS_COLLECTION") or "alerts_log"

        _client = MongoClient(
            mongo_url,
            maxPoolSize=20,
            minPoolSize=0,
            serverSelectionTimeoutMS=2000,
            socketTimeoutMS=5000,
            connectTimeoutMS=2000,
            retryWrites=True,
            retryReads=True,
        )
        db = _client[db_name]
        _collection = db[coll_name]
        # Best-effort ping
        try:
            _client.admin.command("ping")
        except Exception:
            pass

        # Ensure indexes (best-effort). TTL requires a Date field.
        try:
            try:
                ttl_days = int(os.getenv("ALERTS_TTL_DAYS", "30") or "30")
            except Exception:
                ttl_days = 30
            # TTL cannot be updated in-place; ignore errors if it already exists differently.
            if ttl_days > 0:
                _collection.create_index([("ts_dt", ASCENDING)], expireAfterSeconds=ttl_days * 24 * 3600)  # type: ignore[attr-defined]
        except Exception:
            pass
        try:
            _collection.create_index([("_key", ASCENDING)], unique=True, sparse=True)  # type: ignore[attr-defined]
        except Exception:
            pass

        return _collection
    except Exception:
        _init_failed = True
        return None


def _get_catalog_collection(*, for_read: bool = True):  # pragma: no cover - exercised indirectly
    """Return (and lazily create) the alert types catalog collection.

    Args:
        for_read: If True, allows connection even when writes are disabled.
    """
    global _catalog_collection
    if _catalog_collection is not None:
        return _catalog_collection
    if _init_failed:
        return None
    # Ensure base client is initialized (same DB/cluster settings)
    try:
        coll = _get_collection(for_read=for_read)
        if coll is None:
            return None
    except Exception:
        return None
    try:
        # Reuse the same client/db, create a separate collection
        db_name = os.getenv("DATABASE_NAME") or "code_keeper_bot"
        catalog_name = os.getenv("ALERT_TYPES_CATALOG_COLLECTION") or "alert_types_catalog"
        db = _client[db_name]  # type: ignore[index]
        _catalog_collection = db[catalog_name]
        # Best-effort indexes
        try:
            from pymongo import ASCENDING  # type: ignore

            _catalog_collection.create_index([("alert_type", ASCENDING)], unique=True)  # type: ignore[attr-defined]
            _catalog_collection.create_index([("last_seen_dt", ASCENDING)])  # type: ignore[attr-defined]
        except Exception:
            pass
        return _catalog_collection
    except Exception:
        return None


def _isoformat_utc(value: Optional[datetime]) -> Optional[str]:
    """Return ISO string with UTC tzinfo for Mongo datetimes."""
    if not isinstance(value, datetime):
        return None
    if value.tzinfo is None:
        dt = value.replace(tzinfo=timezone.utc)
    else:
        try:
            dt = value.astimezone(timezone.utc)
        except Exception:
            dt = value.replace(tzinfo=timezone.utc)
    return dt.isoformat()


def _build_key(alert_id: Optional[str], name: str, severity: str, summary: str, ts_dt: datetime) -> str:
    if alert_id:
        return f"id:{alert_id}"
    # Fallback: stable hash by minute bucket
    minute_bucket = ts_dt.replace(second=0, microsecond=0).isoformat()
    raw = "|".join([name.strip(), severity.strip().lower(), summary.strip(), minute_bucket])
    digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()
    return f"h:{digest}"


[תיעוד] def compute_error_signature(error_data: Dict[str, Any]) -> str: """ מחשב חתימה ייחודית לשגיאה. החתימה מבוססת על: - סוג השגיאה - שם הקובץ והשורה (אם יש) - 3 השורות הראשונות של ה-stack trace """ try: def _deep_find_first_string( d: Any, keys: Tuple[str, ...], *, max_depth: int = 10, max_nodes: int = 2000, ) -> str: """ Best-effort deep search for a string-like value by keys. Important: used to find sentry_issue_id even when nested under error_data/details/metadata. """ if max_depth < 0 or max_nodes <= 0: return "" keys_lower = {(_safe_str(k, limit=128) or "").lower() for k in keys if k} if not keys_lower: return "" visited: set[int] = set() stack: List[Tuple[Any, int]] = [(d, 0)] nodes = 0 while stack: current, depth = stack.pop() nodes += 1 if nodes > max_nodes: break # Cycle protection for containers if isinstance(current, (dict, list, tuple)): try: oid = id(current) except Exception: oid = 0 if oid and oid in visited: continue if oid: visited.add(oid) if isinstance(current, dict): # 1) Check direct keys try: items = list(current.items()) except Exception: items = [] for k, v in items: try: k_norm = str(k).lower() except Exception: continue if k_norm in keys_lower and v not in (None, ""): # חשוב: אל תהפוך dict/list ל-str כאן; אנחנו מחפשים מזהה יציב (string/int). if isinstance(v, (dict, list, tuple)): # נמשיך לחיפוש פנימה במקום להמיר ל-str pass else: try: stripped = str(v).strip() except Exception: stripped = "" if stripped: return stripped # 2) Recurse into nested payloads (generic, not allowlist בלבד) if depth < max_depth and isinstance(v, (dict, list, tuple)): stack.append((v, depth + 1)) elif isinstance(current, (list, tuple)): if depth >= max_depth: continue try: seq = list(current) except Exception: seq = [] for v in seq: if isinstance(v, (dict, list, tuple)): stack.append((v, depth + 1)) else: continue return "" def _normalize_error_text(text: Any) -> str: """ Normalize error/trace text before hashing to keep signatures stable across: - dynamic hex memory addresses (0x7f...) - absolute file paths that differ between environments - noisy line numbers (optional) """ if text in (None, ""): return "" try: s = str(text) except Exception: return "" # Normalize line endings s = s.replace("\r\n", "\n").replace("\r", "\n") # Replace hex memory addresses (0x7f..., 0x0000...) s = re.sub(r"0x[0-9a-fA-F]+", "<ADDR>", s) def _normalize_inline_absolute_paths(value: str) -> str: """ Replace absolute path tokens inside free-form text with their basename. Implemented as a linear scan (no regex) to avoid ReDoS/CodeQL warnings. """ exts = (".py", ".js", ".ts", ".tsx", ".java", ".go", ".rb") start_delims = set(" \t\r\n\"'([{<,;=|") # Stop token at whitespace, quotes, and common punctuation around paths in tracebacks/logs stop_chars = set(" \t\r\n\"'([{<,;|)]}>") def _has_known_ext(filename: str) -> bool: try: low = filename.lower() except Exception: return False return any(low.endswith(ext) for ext in exts) def _basename(path_like: str) -> str: # Keep only the last path segment (works for both / and \) if "/" in path_like: path_like = path_like.rsplit("/", 1)[-1] if "\\" in path_like: path_like = path_like.rsplit("\\", 1)[-1] return path_like out: List[str] = [] i = 0 n = len(value) while i < n: ch = value[i] # Detect start of absolute path token start = None if ch == "/" and (i == 0 or value[i - 1] in start_delims): start = i elif ( ch.isalpha() and i + 2 < n and value[i + 1] == ":" and value[i + 2] == "\\" and (i == 0 or value[i - 1] in start_delims) ): start = i if start is None: out.append(ch) i += 1 continue j = start while j < n and value[j] not in stop_chars: j += 1 token = value[start:j] # Handle optional trailing :<digits> (line numbers), normalize digits to <LINE> line_suffix = "" k = token.rfind(":") if k != -1 and k + 1 < len(token): tail = token[k + 1 :] if tail.isdigit(): token_base = token[:k] line_suffix = ":<LINE>" else: token_base = token else: token_base = token candidate = _basename(token_base) if _has_known_ext(candidate): out.append(candidate + line_suffix) else: # Not a filename token we recognize; keep original out.append(value[start:j]) i = j return "".join(out) # Normalize inline absolute paths without regex (safe for untrusted input) try: s = _normalize_inline_absolute_paths(s) except Exception: pass # Normalize common traceback patterns: `File "...", line 123` s = re.sub(r"\bline\s+\d+\b", "line <LINE>", s) # Normalize file:line patterns for Python files s = re.sub(r"(\b[\w.\-]+\.py):\d+\b", r"\1:<LINE>", s) # Collapse excessive spaces/tabs (keep newlines) s = re.sub(r"[ \t]+", " ", s).strip() return s def _normalize_file_name(value: Any) -> str: if value in (None, ""): return "" try: s = str(value).strip() except Exception: return "" if not s: return "" # Keep only basename for stability across environments if "/" in s: s = s.rsplit("/", 1)[-1] if "\\" in s: s = s.rsplit("\\", 1)[-1] return s # --- Sentry-first: כשמדובר בשגיאת Sentry, יש לנו מזהה יציב (Issue ID) --- # זה מונע יצירת חתימות ריקות ומאפשר עקביות גם כשאין stack/file. sentry_issue_id_raw = _deep_find_first_string( error_data or {}, ( "sentry_issue_id", "sentryIssueId", "issue_id", "issueId", "sentry_issue", ), max_depth=10, ) sentry_issue_id = _normalize_error_text(sentry_issue_id_raw or "") if sentry_issue_id: signature_input = f"sentry_issue_id:{sentry_issue_id}" return hashlib.sha256(signature_input.encode()).hexdigest()[:16] # --- Generic error signature (code/runtime errors) --- components = [ str((error_data or {}).get("error_type", "") or ""), _normalize_file_name((error_data or {}).get("file", "") or ""), str((error_data or {}).get("line", "") or ""), ] # Include summary/title when present (useful for Sentry-like alerts that don't carry stack) summary = _normalize_error_text( (error_data or {}).get("summary") or (error_data or {}).get("title") or (error_data or {}).get("message") or "" ) if summary: components.append(summary) # Include normalized message when present (improves signal when stack/file info is missing) error_message = _normalize_error_text((error_data or {}).get("error_message", "") or "") if error_message: components.append(error_message) # הוספת stack trace מנורמל stack = _normalize_error_text((error_data or {}).get("stack_trace", "") or "") if stack: # לקיחת 3 שורות ראשונות lines = [l.strip() for l in str(stack).split("\n") if l.strip()][:3] components.extend(lines) signature_input = "|".join(components) # אם אין לנו שום מידע יציב (כל הערכים ריקים) — לא נחשב חתימה, כדי לא לייצר hash קבוע # שיגרום לכל ההתראות "לחלוק" אותה חתימה ולשבור is_new_error. try: has_signal = any(bool(str(c).strip()) for c in components) except Exception: has_signal = False if not has_signal: return "" return hashlib.sha256(signature_input.encode()).hexdigest()[:16] except Exception: # Fail-open: return a stable fallback try: return hashlib.sha256(str(error_data).encode()).hexdigest()[:16] except Exception: return "0" * 16
[תיעוד] def is_new_error(signature: str) -> bool: """בודק אם השגיאה חדשה (לא נראתה ב-30 יום האחרונים).""" # CodeQL/NoSQL injection: # - cast to str and normalize # - if 16-hex: hash to 64-hex (new storage format) # - if already 64-hex: use as-is (avoid double-hash) # - regex-validate the exact variable used in the Mongo query try: safe_sig = str(signature or "").strip().lower() if not safe_sig: return False # 1) 64-hex already -> use as-is if re.fullmatch(r"[0-9a-f]{64}", safe_sig): query_signature = safe_sig # 2) legacy 16-hex -> migrate key space by hashing to 64-hex elif re.fullmatch(r"[0-9a-f]{16}", safe_sig): query_signature = hashlib.sha256(safe_sig.encode("utf-8", errors="ignore")).hexdigest() query_signature = str(query_signature).strip().lower() else: return False # Force to str (fail-closed) before any DB query query_signature = str(query_signature).strip().lower() # Final hard validation on the exact variable used in Mongo query if not re.fullmatch(r"[0-9a-f]{64}", str(query_signature)): return False except Exception: return False if not _write_enabled(): return False if _init_failed: return False try: # ודא שהלקוח מאותחל (best-effort) _ = _get_collection(for_read=False) if _client is None: return False db_name = os.getenv("DATABASE_NAME") or "code_keeper_bot" collection = _client[db_name]["error_signatures"] # best-effort indexes try: from pymongo import ASCENDING # type: ignore collection.create_index([("signature", ASCENDING)], unique=True, background=True) collection.create_index([("last_seen", ASCENDING)], background=True) except Exception: pass now = datetime.now(timezone.utc) cutoff = now - timedelta(days=30) # Bulk/atomic: שאילתה אחת שמחזירה את המסמך הישן (אם קיים) וגם מעדכנת/upsert try: from pymongo import ReturnDocument # type: ignore prev = collection.find_one_and_update( {"signature": query_signature}, # lgtm[py/nosql-injection] { "$set": {"last_seen": now}, "$inc": {"count": 1}, "$setOnInsert": {"first_seen": now}, }, upsert=True, return_document=ReturnDocument.BEFORE, ) if isinstance(prev, dict): last_seen = prev.get("last_seen") if isinstance(last_seen, datetime) and last_seen >= cutoff: return False return True except Exception: # Fallback: שתי שאילתות (שומר תאימות לסביבות בלי find_one_and_update) existing = collection.find_one({"signature": query_signature, "last_seen": {"$gte": cutoff}}) # lgtm[py/nosql-injection] collection.update_one( {"signature": query_signature}, # lgtm[py/nosql-injection] { "$set": {"last_seen": now}, "$inc": {"count": 1}, "$setOnInsert": {"first_seen": now}, }, upsert=True, ) return existing is None except Exception: return False
[תיעוד] def enrich_alert_with_signature(alert_data: Dict[str, Any]) -> Dict[str, Any]: """מעשיר את נתוני ההתראה עם חתימה ומידע על חדשות.""" # חשוב: בפרויקט כבר ייתכן שדה error_signature עם מזהה "טקסונומי" (למשל OOM_KILLED). # אסור לדרוס אותו. את ה-fingerprint (hash) נשמור בשדה נפרד. # קריאה בטוחה לשדות קיימים (לא להניח dict "נקי") try: existing_is_new = alert_data.get("is_new_error") except Exception: existing_is_new = None try: existing_hash = alert_data.get("error_signature_hash") except Exception: existing_hash = None # השתמש ב-hash קיים רק אם הוא באמת מכיל תוכן (מניעת שדות ריקים שנוצרו בעבר) signature_hash_existing = _safe_str(existing_hash, limit=128) # Normalize any existing hash into a safe signature format. if signature_hash_existing: signature_hash_existing = _sanitize_signature(signature_hash_existing) signature_hash = signature_hash_existing if not signature_hash: # Compute a deterministic hash from the error data and sanitize it. computed = compute_error_signature(alert_data or {}) signature_hash = _sanitize_signature(computed) # Compute a dedicated, sanitized signature for DB lookups to avoid using any # externally supplied value in Mongo queries. db_signature = "" try: computed_db = compute_error_signature(alert_data or {}) db_signature = _sanitize_signature(computed_db) except Exception: db_signature = "" # If we failed to compute a dedicated DB signature, fall back to the primary hash. if not db_signature: db_signature = signature_hash # אם אין חתימה אמיתית – לא מוסיפים שדות Signature כלל (וגם מנקים שדות ריקים אם קיימים) if not signature_hash: try: if not _safe_str(alert_data.get("error_signature_hash"), limit=128): alert_data.pop("error_signature_hash", None) except Exception: pass try: # אל תדרוס error_signature "טקסונומי", אבל אם הוא ריק – ננקה אותו if not _safe_str(alert_data.get("error_signature"), limit=128): alert_data.pop("error_signature", None) except Exception: pass try: # אם הוסיפו בעבר is_new_error=False בלי חתימה, עדיף לא לשמור אותו בכלל if alert_data.get("is_new_error") in (False, None, ""): alert_data.pop("is_new_error", None) except Exception: pass return alert_data # Idempotency: רק אם היה כבר hash קיים וגם is_new_error קיים, לא ניגשים שוב ל-DB. # אם מישהו הגיע עם is_new_error=True אבל בלי hash (למשל sentry_polling), אסור "לסמוך" על זה. if existing_is_new in (True, False) and bool(signature_hash_existing): is_new = bool(existing_is_new) else: is_new = is_new_error(db_signature) # הוספה בלבד: לא יוצרים dict חדש, לא דורסים Metadata קיים alert_data["error_signature_hash"] = signature_hash alert_data["is_new_error"] = is_new # תאימות: אם אין error_signature קיים (או שהוא ריק) נשתמש ב-hash כברירת מחדל. # אם קיים מזהה טקסונומי (למשל OOM_KILLED) – לא נוגעים. try: existing = alert_data.get("error_signature") except Exception: existing = None if existing in (None, "", False): alert_data["error_signature"] = signature_hash return alert_data
[תיעוד] def record_alert( *, alert_id: Optional[str], name: str, severity: str, summary: str = "", source: str = "", silenced: bool = False, details: Optional[Dict[str, Any]] = None, ) -> None: """Insert (or upsert via unique key) a single alert record. - When alert_id is provided, use it for de-duplication via a unique key. - Otherwise use a stable hash based on name/severity/summary/minute. """ if not _write_enabled(): return if _init_failed: return try: coll = _get_collection(for_read=False) if coll is None: return now = datetime.now(timezone.utc) key = _build_key(alert_id, name or "", severity or "", summary or "", now) # חשוב: קודם enrich על המטען הגולמי, ורק אחר כך sanitize ל-DB. # זה מונע מצב שבו dict/list נהרסים לפני חישוב החתימה (למשל sentry_issue_id שנמצא במבנה מקונן). details_payload: Dict[str, Any] = dict(details or {}) if isinstance(details, dict) else {} try: enrich_alert_with_signature(details_payload) except Exception: pass clean_details = _sanitize_details(details_payload) endpoint = _extract_endpoint(clean_details) if clean_details else None alert_type = _extract_alert_type(str(name or ""), clean_details) duration_seconds = _extract_duration(clean_details) search_blob = _build_search_blob(str(name or ""), str(summary or ""), clean_details) doc = { "ts_dt": now, "name": str(name or "alert"), "severity": str(severity or "info").lower(), "summary": str(summary or ""), "source": str(source or ""), "_key": key, "search_blob": search_blob, } # Transparency: mark whether this alert was silenced at dispatch time try: doc["silenced"] = bool(silenced) except Exception: doc["silenced"] = False if clean_details: doc["details"] = clean_details if endpoint: doc["endpoint"] = endpoint if alert_type: doc["alert_type"] = alert_type if duration_seconds is not None: doc["duration_seconds"] = float(duration_seconds) if alert_id: doc["alert_id"] = str(alert_id) try: # Upsert by key (idempotent). Using update_one for better semantics with unique key. coll.update_one({"_key": key}, {"$setOnInsert": doc}, upsert=True) # type: ignore[attr-defined] except Exception: # Fall back to insert (ignore dup errors silently) try: coll.insert_one(doc) # type: ignore[attr-defined] except Exception: pass # --- Catalog (Registry): persist observed alert_type forever (best-effort) --- try: # Do not pollute catalog with drills if clean_details and bool(clean_details.get("is_drill")): return except Exception: pass try: if alert_type: _upsert_alert_type_catalog( alert_type=alert_type, name=str(name or "alert"), summary=str(summary or ""), seen_dt=now, ) except Exception: pass except Exception: return
def _upsert_alert_type_catalog( *, alert_type: str, name: str, summary: str, seen_dt: datetime, ) -> None: coll = _get_catalog_collection() if coll is None: return key = _safe_str(alert_type, limit=128).lower() if not key: return now = seen_dt if isinstance(seen_dt, datetime) else datetime.now(timezone.utc) payload = { "alert_type": key, "last_seen_dt": now, "last_seen_name": _safe_str(name, limit=128), "last_seen_summary": _safe_str(summary, limit=256), "updated_at": now, } try: coll.update_one( {"alert_type": key}, { "$setOnInsert": {"first_seen_dt": now, "created_at": now}, "$set": payload, "$inc": {"total_count": 1}, }, upsert=True, ) # type: ignore[attr-defined] except Exception: return
[תיעוד] def fetch_alert_type_catalog( *, min_total_count: int = 1, limit: int = 5000, ) -> List[Dict[str, Any]]: """Return catalog of all observed alert types (fail-open). Each row includes: alert_type, total_count, first_seen_dt, last_seen_dt, sample_name, sample_title """ coll = _get_catalog_collection() if coll is None: return [] try: min_total = int(min_total_count) except Exception: min_total = 1 min_total = max(1, min_total) try: lim = int(limit) except Exception: lim = 5000 lim = max(1, min(50_000, lim)) try: match: Dict[str, Any] = {"total_count": {"$gte": min_total}} cursor = ( coll.find( match, { "_id": 0, "alert_type": 1, "total_count": 1, "first_seen_dt": 1, "last_seen_dt": 1, "last_seen_name": 1, "last_seen_summary": 1, }, ) # type: ignore[attr-defined] .sort([("last_seen_dt", -1)]) # type: ignore[attr-defined] .limit(lim) # type: ignore[attr-defined] ) except Exception: return [] out: List[Dict[str, Any]] = [] try: for doc in cursor: try: a_type = _safe_str(doc.get("alert_type"), limit=128).lower() if not a_type: continue out.append( { "alert_type": a_type, "count": int(doc.get("total_count", 0) or 0), "first_seen_dt": doc.get("first_seen_dt"), "last_seen_dt": doc.get("last_seen_dt"), "sample_name": _safe_str(doc.get("last_seen_name"), limit=128), "sample_title": _safe_str(doc.get("last_seen_summary"), limit=256), } ) except Exception: continue except Exception: return [] return out
[תיעוד] def fetch_alerts_by_type( *, alert_type: str, limit: int = 100, include_details: bool = True, ) -> List[Dict[str, Any]]: """Fetch recent alerts of a specific type with Sentry details. Returns a list of dicts, for example:: { "alert_id": str, "ts_dt": datetime, "name": str, "summary": str, "sentry_issue_id": Optional[str], "sentry_permalink": Optional[str], "sentry_short_id": Optional[str], } """ coll = _get_collection() if coll is None: return [] normalized_type = _safe_str(alert_type, limit=128).lower() if not normalized_type: return [] try: limit_int = max(1, min(500, int(limit))) except Exception: limit_int = 100 safe_pattern = re.escape(normalized_type) match = { "alert_type": {"$regex": f"^{safe_pattern}$", "$options": "i"}, "details.is_drill": {"$ne": True}, } projection = { "_id": 0, "alert_id": 1, "ts_dt": 1, "name": 1, "summary": 1, } if include_details: projection["details.sentry_issue_id"] = 1 projection["details.sentry_permalink"] = 1 projection["details.sentry_short_id"] = 1 projection["details.error_signature"] = 1 try: cursor = coll.find(match, projection).sort([("ts_dt", -1)]).limit(limit_int) # type: ignore[attr-defined] except Exception: return [] out: List[Dict[str, Any]] = [] for doc in cursor: try: details = doc.get("details") or {} out.append( { "alert_id": str(doc.get("alert_id") or ""), "ts_dt": doc.get("ts_dt"), "name": _safe_str(doc.get("name"), limit=128), "summary": _safe_str(doc.get("summary"), limit=256), "sentry_issue_id": _safe_str(details.get("sentry_issue_id"), limit=64), "sentry_permalink": _safe_str(details.get("sentry_permalink"), limit=512), "sentry_short_id": _safe_str(details.get("sentry_short_id"), limit=32), "error_signature": _safe_str(details.get("error_signature"), limit=128), } ) except Exception: continue return out
[תיעוד] def count_alerts_since(since_dt: datetime) -> tuple[int, int]: """Return (total, critical) counts since the given datetime (UTC recommended).""" if not _read_enabled(): return 0, 0 if _init_failed: return 0, 0 try: coll = _get_collection(for_read=True) if coll is None: return 0, 0 match: Dict[str, Any] = {"ts_dt": {"$gte": since_dt}} # Default: exclude Drill alerts from stats to prevent metric pollution match["details.is_drill"] = {"$ne": True} # Bulk: aggregate בשאילתה אחת במקום 2 count_documents pipeline = [ {"$match": match}, { "$group": { "_id": None, "total": {"$sum": 1}, "critical": { "$sum": { "$cond": [ { "$eq": [ {"$toLower": {"$ifNull": ["$severity", "info"]}}, "critical", ] }, 1, 0, ] } }, } }, ] rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] if not rows: return 0, 0 doc = rows[0] if isinstance(rows[0], dict) else {} return int(doc.get("total", 0) or 0), int(doc.get("critical", 0) or 0) except Exception: return 0, 0
[תיעוד] def count_alerts_last_hours(hours: int = 24) -> tuple[int, int]: if hours <= 0: return 0, 0 since = datetime.now(timezone.utc) - timedelta(hours=int(hours)) return count_alerts_since(since)
[תיעוד] def list_recent_alert_ids(limit: int = 10) -> List[str]: """Return recent alert identifiers from the DB (fail-open). Preference order: document ``alert_id`` when present, otherwise the stable unique ``_key`` used for de-duplication. Results are ordered by ``ts_dt`` descending and truncated to ``limit``. """ if not _read_enabled(): return [] if _init_failed: return [] try: coll = _get_collection(for_read=True) if coll is None: return [] try: # Projection keeps payload small; sorting by time desc cursor = ( coll.find({}, {"alert_id": 1, "_key": 1, "ts_dt": 1}) # type: ignore[attr-defined] .sort([("ts_dt", -1)]) # type: ignore[attr-defined] .limit(max(1, min(200, int(limit or 10)))) # type: ignore[attr-defined] ) except Exception: return [] out: List[str] = [] try: for doc in cursor: # type: ignore[assignment] try: ident = doc.get("alert_id") or doc.get("_key") if ident: out.append(str(ident)) except Exception: continue except Exception: return [] return out except Exception: return []
[תיעוד] def fetch_alerts( *, start_dt: Optional[datetime], end_dt: Optional[datetime], severity: Optional[str] = None, alert_type: Optional[str] = None, endpoint: Optional[str] = None, search: Optional[str] = None, page: int = 1, per_page: int = 50, ) -> Tuple[List[Dict[str, Any]], int]: """Return paginated alert documents filtered by the provided criteria.""" coll = _get_collection() if coll is None: return [], 0 try: per_page = max(1, min(200, int(per_page))) except Exception: per_page = 50 try: page = max(1, int(page)) except Exception: page = 1 skip = (page - 1) * per_page match = _build_time_filter(start_dt, end_dt) if severity: match["severity"] = str(severity).lower() if alert_type: match["alert_type"] = str(alert_type).lower() if endpoint: match["endpoint"] = str(endpoint) if search: pattern = _safe_str(search, limit=256) if pattern: match["$or"] = [ {"name": {"$regex": pattern, "$options": "i"}}, {"summary": {"$regex": pattern, "$options": "i"}}, {"search_blob": {"$regex": pattern, "$options": "i"}}, ] projection = { "_id": 0, "ts_dt": 1, "name": 1, "severity": 1, "summary": 1, "details": 1, "duration_seconds": 1, "alert_type": 1, "endpoint": 1, "source": 1, "silenced": 1, } try: cursor = ( coll.find(match, projection) # type: ignore[attr-defined] .sort("ts_dt", -1) # type: ignore[attr-defined] .skip(skip) # type: ignore[attr-defined] .limit(per_page) # type: ignore[attr-defined] ) except Exception: return [], 0 alerts: List[Dict[str, Any]] = [] for doc in cursor: ts = doc.get("ts_dt") ts_iso = _isoformat_utc(ts) alerts.append( { "timestamp": ts_iso, "name": doc.get("name"), "severity": doc.get("severity"), "summary": doc.get("summary"), "metadata": doc.get("details") or {}, "duration_seconds": doc.get("duration_seconds"), "alert_type": doc.get("alert_type"), "endpoint": doc.get("endpoint"), "source": doc.get("source"), "silenced": bool(doc.get("silenced", False)), } ) try: total = int(coll.count_documents(match)) # type: ignore[attr-defined] except Exception: total = len(alerts) return alerts, total
[תיעוד] def aggregate_alert_type_stats( *, start_dt: Optional[datetime], end_dt: Optional[datetime], min_count: int = 1, limit: int = 500, ) -> List[Dict[str, Any]]: """Aggregate active alert_types with counts and last_seen (exclude drills). Returns a list of dicts: { "alert_type": str, "count": int, "last_seen_dt": datetime, "sample_title": str, "sample_name": str } Fail-open: returns [] on any error / when storage is unavailable. """ coll = _get_collection() if coll is None: return [] try: min_count_int = int(min_count) except Exception: min_count_int = 1 min_count_int = max(1, min_count_int) try: limit_int = int(limit) except Exception: limit_int = 500 limit_int = max(1, min(2000, limit_int)) match = _build_time_filter(start_dt, end_dt) # Default: exclude Drill alerts from analytics helpers match["details.is_drill"] = {"$ne": True} # Only consider documents with a real alert_type (avoid grouping null/empty) match["alert_type"] = {"$type": "string", "$ne": ""} pipeline = [ {"$match": match}, {"$sort": {"ts_dt": -1}}, { "$group": { "_id": {"$toLower": "$alert_type"}, "count": {"$sum": 1}, "last_seen_dt": {"$first": "$ts_dt"}, "sample_title": {"$first": "$summary"}, "sample_name": {"$first": "$name"}, } }, {"$match": {"count": {"$gte": min_count_int}}}, {"$sort": {"count": -1, "last_seen_dt": -1}}, {"$limit": limit_int}, ] try: rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] except Exception: return [] out: List[Dict[str, Any]] = [] for row in rows: try: alert_type = _safe_str(row.get("_id"), limit=128).lower() if not alert_type: continue last_seen_dt = row.get("last_seen_dt") if not isinstance(last_seen_dt, datetime): continue out.append( { "alert_type": alert_type, "count": int(row.get("count", 0) or 0), "last_seen_dt": last_seen_dt, "sample_title": _safe_str(row.get("sample_title"), limit=256), "sample_name": _safe_str(row.get("sample_name"), limit=128), } ) except Exception: continue return out
[תיעוד] def aggregate_alert_summary( *, start_dt: Optional[datetime], end_dt: Optional[datetime], ) -> Dict[str, int]: """Aggregate alert counts by severity and deployment flag.""" coll = _get_collection() if coll is None: return {"total": 0, "critical": 0, "anomaly": 0, "deployment": 0} match = _build_time_filter(start_dt, end_dt) # Default: exclude Drill alerts from summary/analytics match["details.is_drill"] = {"$ne": True} pipeline = [ {"$match": match}, { "$group": { "_id": None, "total": {"$sum": 1}, "critical": { "$sum": { "$cond": [{"$eq": ["$severity", "critical"]}, 1, 0], } }, "anomaly": { "$sum": { "$cond": [{"$eq": ["$severity", "anomaly"]}, 1, 0], } }, "deployment": { "$sum": { "$cond": [ { "$or": [ {"$eq": ["$alert_type", "deployment_event"]}, {"$eq": ["$name", "deployment_event"]}, ] }, 1, 0, ] } }, } }, ] try: result = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] if not result: return {"total": 0, "critical": 0, "anomaly": 0, "deployment": 0} doc = result[0] return { "total": int(doc.get("total", 0)), "critical": int(doc.get("critical", 0)), "anomaly": int(doc.get("anomaly", 0)), "deployment": int(doc.get("deployment", 0)), } except Exception: return {"total": 0, "critical": 0, "anomaly": 0, "deployment": 0}
[תיעוד] def fetch_alert_timestamps( *, start_dt: Optional[datetime], end_dt: Optional[datetime], severity: Optional[str] = None, alert_type: Optional[str] = None, limit: int = 500, ) -> List[datetime]: """Return recent alert timestamps matching the given filters.""" coll = _get_collection() if coll is None: return [] match = _build_time_filter(start_dt, end_dt) # Default: exclude Drill alerts from analytics helpers match["details.is_drill"] = {"$ne": True} if severity: match["severity"] = str(severity).lower() if alert_type: match["alert_type"] = str(alert_type).lower() try: cursor = ( coll.find(match, {"ts_dt": 1}) # type: ignore[attr-defined] .sort("ts_dt", -1) # type: ignore[attr-defined] .limit(max(1, limit)) # type: ignore[attr-defined] ) except Exception: return [] out: List[datetime] = [] for doc in cursor: ts = doc.get("ts_dt") if isinstance(ts, datetime): out.append(ts) return out
[תיעוד] def aggregate_alert_timeseries( *, start_dt: Optional[datetime], end_dt: Optional[datetime], granularity_seconds: int, ) -> List[Dict[str, Any]]: """Aggregate alert counts per severity over time buckets.""" coll = _get_collection() if coll is None: return [] try: bucket_seconds = max(1, int(granularity_seconds or 60)) except Exception: bucket_seconds = 3600 bucket_ms = bucket_seconds * 1000 match = _build_time_filter(start_dt, end_dt) # Default: exclude Drill alerts from timeseries to prevent metric pollution match["details.is_drill"] = {"$ne": True} pipeline = [ {"$match": match}, { "$project": { "bucket": { "$toDate": { "$subtract": [ {"$toLong": "$ts_dt"}, {"$mod": [{"$toLong": "$ts_dt"}, bucket_ms]}, ] } }, "severity": { "$toLower": {"$ifNull": ["$severity", "info"]}, }, } }, { "$group": { "_id": {"bucket": "$bucket", "severity": "$severity"}, "count": {"$sum": 1}, } }, { "$group": { "_id": "$_id.bucket", "counts": { "$push": { "severity": "$_id.severity", "count": "$count", } }, "total": {"$sum": "$count"}, } }, {"$sort": {"_id": 1}}, ] try: rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] except Exception: return [] result: List[Dict[str, Any]] = [] for row in rows: bucket = row.get("_id") ts_iso = _isoformat_utc(bucket) counts = {"critical": 0, "anomaly": 0, "warning": 0, "info": 0} for entry in row.get("counts", []): severity = str(entry.get("severity") or "info").lower() if severity not in counts: if severity.startswith("crit"): severity = "critical" elif severity.startswith("warn"): severity = "warning" elif severity.startswith("anom"): severity = "anomaly" else: severity = "info" counts[severity] += int(entry.get("count", 0)) counts["total"] = int(row.get("total", 0)) counts["timestamp"] = ts_iso result.append(counts) return result