הראה קוד מקור ל monitoring.silences

"""
Mongo-backed alert silences for ChatOps.

Goals:
- Fail-open: never raise; return safe defaults
- Lazy init with env-only config (avoid global config import cycles)
- TTL on until_ts so expired silences auto-clean
- Active flag to allow manual unsilence before TTL

ENV:
- ALERTS_DB_ENABLED or METRICS_DB_ENABLED: enable DB access
- MONGODB_URL, DATABASE_NAME
- ALERTS_SILENCES_COLLECTION (default: alerts_silences)
- SILENCE_MAX_DAYS (default: 7)
- SILENCES_MAX_ACTIVE (default: 50)
"""
from __future__ import annotations

from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
import os
import re
import uuid


def _is_true(val: Optional[str]) -> bool:
    return str(val or "").lower() in {"1", "true", "yes", "on"}


def _enabled() -> bool:
    # Explicit enables override the global disable flag used by the test harness
    if _is_true(os.getenv("ALERTS_DB_ENABLED")) or _is_true(os.getenv("METRICS_DB_ENABLED")):
        return True
    if _is_true(os.getenv("DISABLE_DB")):
        return False
    return False


_client = None  # type: ignore
_collection = None  # type: ignore
_init_failed = False


def _get_collection():  # pragma: no cover
    global _client, _collection, _init_failed
    if _collection is not None or _init_failed:
        return _collection
    if not _enabled():
        _init_failed = True
        return None
    try:
        try:
            from pymongo import MongoClient, ASCENDING  # type: ignore
        except Exception:
            _init_failed = True
            return None
        mongo_url = os.getenv("MONGODB_URL")
        db_name = os.getenv("DATABASE_NAME") or "code_keeper_bot"
        coll_name = os.getenv("ALERTS_SILENCES_COLLECTION") or "alerts_silences"
        client_kwargs = dict(
            maxPoolSize=20,
            minPoolSize=0,
            serverSelectionTimeoutMS=2000,
            socketTimeoutMS=5000,
            connectTimeoutMS=2000,
            retryWrites=True,
            retryReads=True,
            tz_aware=True,
        )
        # Allow operation without explicit MONGODB_URL (useful for tests with stubbed MongoClient)
        try:
            if mongo_url:
                _client = MongoClient(mongo_url, **client_kwargs)
            else:
                # Try without URI; fall back to localhost if signature requires a URI
                try:
                    _client = MongoClient(**client_kwargs)
                except TypeError:
                    _client = MongoClient("mongodb://localhost:27017", **client_kwargs)
        except Exception:
            _init_failed = True
            return None
        db = _client[db_name]
        _collection = db[coll_name]
        try:
            _client.admin.command("ping")
        except Exception:
            pass
        # Indexes: TTL on until_ts, plus pattern and active
        try:
            _collection.create_index([("until_ts", ASCENDING)], expireAfterSeconds=0)  # type: ignore[attr-defined]
        except Exception:
            pass
        try:
            _collection.create_index([("pattern", ASCENDING)])  # type: ignore[attr-defined]
        except Exception:
            pass
        try:
            _collection.create_index([("active", ASCENDING)])  # type: ignore[attr-defined]
        except Exception:
            pass
        try:
            _collection.create_index([("severity", ASCENDING)])  # type: ignore[attr-defined]
        except Exception:
            pass
        return _collection
    except Exception:
        _init_failed = True
        return None


[תיעוד] def parse_duration_to_seconds(text: str, *, max_days: int = 7) -> Optional[int]: """Parse duration like "30s", "5m", "2h", "3d" to seconds, bounded by max_days. Returns None on invalid. """ try: s = (text or "").strip().lower() if not s: return None match = re.fullmatch(r"\s*(\d+)\s*([smhd])\s*", s) if not match: return None num = int(match.group(1)) unit = match.group(2) if unit == "s": seconds = num elif unit == "m": seconds = num * 60 elif unit == "h": seconds = num * 3600 else: seconds = num * 86400 max_seconds = max(1, int(max_days or 7)) * 86400 if seconds <= 0: return None if seconds > max_seconds: return max_seconds return seconds except Exception: return None
def _now_dt() -> datetime: return datetime.now(timezone.utc) def _max_active_allowed() -> int: try: return max(1, int(os.getenv("SILENCES_MAX_ACTIVE", "50") or 50)) except Exception: return 50
[תיעוד] def create_silence( *, pattern: str, duration_seconds: int, created_by: int, reason: str = "", severity: Optional[str] = None, force: bool = False, ) -> Optional[Dict[str, Any]]: """Create a new active silence. Returns stored document on success, else None.""" if not _enabled() or _init_failed: return None try: patt = (pattern or "").strip() if not patt: return None # Guard against overly broad patterns unless forced dangerous = {".*", "^.*$", "(?s).*", ".+", "^.+$"} if patt in dangerous and not force: return None coll = _get_collection() if coll is None: return None # Limit active silences now = _now_dt() active_count = int(coll.count_documents({"active": True, "until_ts": {"$gte": now}})) # type: ignore[attr-defined] if active_count >= _max_active_allowed(): return None # Normalize severity sev = (severity or "").strip().lower() or None # Bound duration to SILENCE_MAX_DAYS try: max_days = int(os.getenv("SILENCE_MAX_DAYS", "7") or 7) except Exception: max_days = 7 max_seconds = max(1, max_days) * 86400 dur = max(1, int(duration_seconds or 0)) if dur > max_seconds: dur = max_seconds until = now + timedelta(seconds=dur) doc = { "_id": uuid.uuid4().hex, "pattern": patt, "severity": sev, "created_by": int(created_by or 0), "created_at": now, "until_ts": until, "reason": str(reason or ""), "active": True, } try: coll.insert_one(doc) # type: ignore[attr-defined] except Exception: return None return doc except Exception: return None
def _iter_active(coll) -> List[Dict[str, Any]]: try: now = _now_dt() cursor = coll.find({"active": True, "until_ts": {"$gte": now}}).sort("until_ts", 1) # type: ignore[attr-defined] return list(cursor) except Exception: return []
[תיעוד] def is_silenced(name: str, severity: Optional[str] = None) -> Tuple[bool, Optional[Dict[str, Any]]]: """Return (True, silence_doc) if any active silence matches alert name and optional severity.""" if not _enabled() or _init_failed: return False, None try: coll = _get_collection() if coll is None: return False, None items = _iter_active(coll) if not items: return False, None sev_in = (severity or "").strip().lower() for it in items: try: patt = str(it.get("pattern") or "") sev = (it.get("severity") or "").strip().lower() if sev and sev_in and sev != sev_in: continue if patt: try: if re.search(patt, name or "", flags=re.IGNORECASE): return True, it except re.error: # Invalid regex in DB: fall back to exact compare if patt.lower() == (name or "").lower(): return True, it except Exception: continue return False, None except Exception: return False, None
[תיעוד] def unsilence_by_id(silence_id: str) -> bool: if not _enabled() or _init_failed: return False try: coll = _get_collection() if coll is None: return False res = coll.update_one({"_id": str(silence_id)}, {"$set": {"active": False}}) # type: ignore[attr-defined] try: return bool(getattr(res, "modified_count", 0) or getattr(res, "matched_count", 0)) except Exception: return False except Exception: return False
[תיעוד] def unsilence_by_pattern(pattern: str) -> int: if not _enabled() or _init_failed: return 0 try: coll = _get_collection() if coll is None: return 0 res = coll.update_many({"pattern": str(pattern or ""), "active": True}, {"$set": {"active": False}}) # type: ignore[attr-defined] try: return int(getattr(res, "modified_count", 0) or 0) except Exception: return 0 except Exception: return 0
[תיעוד] def list_active_silences(limit: int = 50) -> List[Dict[str, Any]]: if not _enabled() or _init_failed: return [] try: coll = _get_collection() if coll is None: return [] items = _iter_active(coll) if not items: return [] return items[: max(1, int(limit or 50))] except Exception: return []