הראה קוד מקור ל monitoring.metrics_storage

"""
Lightweight DB-backed metrics storage with safe, best-effort batching.

Design goals:
- Fail-open: never break app flow if DB unavailable or pymongo missing
- No import-time DB connections; initialize lazily on first flush
- Use environment variables only (avoid importing config to prevent cycles)
- Memory-safety in misconfiguration: drop/cap buffer when storage unavailable

Environment variables:
- METRICS_DB_ENABLED: "true/1/yes" to enable DB writes (default: false)
- MONGODB_URL: Mongo connection string (required when enabled)
- DATABASE_NAME: Database name (default: code_keeper_bot)
- METRICS_COLLECTION: Collection name (default: service_metrics)
- METRICS_BATCH_SIZE: Batch size threshold (default: 50)
- METRICS_FLUSH_INTERVAL_SEC: Time-based flush threshold (default: 5 seconds)
- METRICS_MAX_BUFFER: Max queued items in memory (default: 5000)
- METRICS_ROLLUP_SECONDS: Rollup bucket size in seconds for DB writes (default: 60)
"""
from __future__ import annotations

import math
import os
import sys
import time
from collections import deque
from datetime import datetime, timezone
from threading import Event, Lock, Thread
from typing import Any, Dict, List, Optional, Tuple

# Optional structured event emission (do not hard-depend)
try:  # pragma: no cover
    from observability import emit_event  # type: ignore
except Exception:  # pragma: no cover
[תיעוד] def emit_event(event: str, severity: str = "info", **fields: Any) -> None: # type: ignore return None
def _is_true(val: Optional[str]) -> bool: return str(val or "").lower() in {"1", "true", "yes", "on"} def _write_enabled() -> bool: """Check if WRITING metrics to DB is enabled. שומרי בטיחות: - ברירת המחדל היא OFF (כדי לא להעמיס על MongoDB בלי כוונה) - אפשר לעצור כתיבה באופן מיידי עם DISABLE_METRICS_WRITES=true """ if _is_true(os.getenv("DISABLE_DB")): return False if _is_true(os.getenv("DISABLE_METRICS_WRITES")): return False return _is_true(os.getenv("METRICS_DB_ENABLED")) def _read_enabled() -> bool: """Check if READING metrics from DB is enabled. קריאה מה-DB נשארת פעילה תמיד (אלא אם מושבתת מפורשות), כדי שדשבורד ה-Observability יציג נתונים היסטוריים גם כשהכתיבה מושבתת. """ if _is_true(os.getenv("DISABLE_DB")): return False if _is_true(os.getenv("DISABLE_METRICS_READS")): return False # אם METRICS_DB_ENABLED=true או שיש URL תקין - אפשר לקרוא if _is_true(os.getenv("METRICS_DB_ENABLED")): return True # Fallback: אפשר קריאה אם יש MongoDB URL מוגדר (גם בלי METRICS_DB_ENABLED) return bool(os.getenv("MONGODB_URL")) def _enabled() -> bool: """Legacy function - now delegates to _write_enabled for backwards compatibility.""" return _write_enabled() # Lazily-initialized PyMongo client/collection _client = None # type: ignore _collection = None # type: ignore _init_failed = False # True when initialization permanently failed (e.g., pymongo missing) _write_disabled = False # True when writes are intentionally disabled (not a failure) _buf: deque[Dict[str, Any]] = deque() _agg: Dict[Tuple[int, str, str], Dict[str, Any]] = {} _lock = Lock() _last_flush_ts: float = time.time() # Background flush worker (to avoid blocking request path) _worker_started = False _worker_event = Event() _start_lock = Lock() def _is_pytest() -> bool: return bool(os.getenv("PYTEST_CURRENT_TEST")) or ("pytest" in sys.modules) def _start_worker_if_needed() -> None: global _worker_started if _worker_started or _is_pytest(): return # Double-checked locking to avoid spawning multiple workers under concurrency with _start_lock: if _worker_started or _is_pytest(): return try: t = Thread(target=_worker_loop, name="metrics-db-writer", daemon=True) t.start() _worker_started = True except Exception: # Fail-open: never block app due to background thread return def _worker_loop() -> None: # pragma: no cover """Flush queued metrics in background so UI requests aren't blocked by DB inserts.""" while True: try: interval = int(os.getenv("METRICS_FLUSH_INTERVAL_SEC", "5") or "5") except Exception: interval = 5 try: # Wake on signal or periodically _worker_event.wait(timeout=max(1, interval)) _worker_event.clear() except Exception: pass try: flush(force=False) except Exception: pass def _build_time_match(start_dt: Optional[datetime], end_dt: Optional[datetime]) -> Dict[str, Any]: # Support both legacy per-request docs and rollup docs. match: Dict[str, Any] = {"type": {"$in": ["request", "request_agg"]}} if start_dt or end_dt: window: Dict[str, Any] = {} if start_dt: window["$gte"] = start_dt if end_dt: window["$lte"] = end_dt if window: match["ts"] = window return match def _max_buffer_size() -> int: try: return max(1, int(os.getenv("METRICS_MAX_BUFFER", "5000") or "5000")) except Exception: return 5000 def _rollup_seconds() -> int: """Return rollup bucket size in seconds for DB writes.""" try: value = int(os.getenv("METRICS_ROLLUP_SECONDS", "60") or "60") except Exception: value = 60 return max(1, min(3600, value)) def _bucket_start_dt(now: datetime, bucket_seconds: int) -> datetime: """Floor a datetime to bucket_seconds (UTC).""" try: ts = int(now.timestamp()) except Exception: ts = int(time.time()) b = int(bucket_seconds) if bucket_seconds else 60 b = max(1, b) bucket_ts = ts - (ts % b) return datetime.fromtimestamp(bucket_ts, tz=timezone.utc) def _safe_label(value: Any, *, default: str, limit: int = 160) -> str: """Normalize a potentially noisy label into a bounded string.""" try: text = str(value or "").strip() except Exception: text = "" if not text: return default # Keep labels low-cardinality-ish (no newlines, bounded length) text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ") if len(text) > limit: text = text[:limit] return text def _drain_agg_to_buf_unlocked() -> None: """Move rollup entries from _agg into _buf. Caller must hold _lock.""" if not _agg: return try: for doc in _agg.values(): _buf.append(doc) finally: _agg.clear() def _get_collection(*, for_read: bool = True): # pragma: no cover - exercised indirectly """Get the MongoDB collection for metrics storage. Args: for_read: If True, allows connection even when writes are disabled. This enables the Observability dashboard to show historical data. """ global _client, _collection, _init_failed, _write_disabled # If already initialized successfully, return the collection if _collection is not None: return _collection # If initialization permanently failed (e.g., pymongo missing), don't retry if _init_failed: return None # If writes are disabled and this is a write request, return None # but allow read requests to proceed with initialization if _write_disabled and not for_read: return None # Check if we should connect based on read/write mode enabled = _read_enabled() if for_read else _write_enabled() if not enabled: # Mark write_disabled (not init_failed) so reads can still work if not for_read: _write_disabled = True return None try: try: from pymongo import MongoClient # type: ignore except Exception: _init_failed = True emit_event("metrics_db_pymongo_missing", severity="warn") return None mongo_url = os.getenv("MONGODB_URL") if not mongo_url: _init_failed = True emit_event("metrics_db_missing_url", severity="warn") return None db_name = os.getenv("DATABASE_NAME") or "code_keeper_bot" coll_name = os.getenv("METRICS_COLLECTION") or "service_metrics" _client = MongoClient( mongo_url, maxPoolSize=20, minPoolSize=0, serverSelectionTimeoutMS=2000, socketTimeoutMS=5000, connectTimeoutMS=2000, retryWrites=True, retryReads=True, ) db = _client[db_name] _collection = db[coll_name] try: _client.admin.command("ping") except Exception: # Connection might still succeed later; do not fail-hard pass emit_event("metrics_db_initialized", severity="info", collection=coll_name) return _collection except Exception as e: _init_failed = True emit_event("metrics_db_init_error", severity="warn", error=str(e)) return None def _flush_once(now_ts: float) -> bool: coll = _get_collection(for_read=False) # Writing metrics requires write access if coll is None: # If initialization failed permanently, clear buffer to prevent leaks if _init_failed: with _lock: try: _agg.clear() _buf.clear() except Exception: pass return False # Convert current rollups into buffer before popping a batch try: with _lock: _drain_agg_to_buf_unlocked() # Cap buffer to prevent unbounded growth under persistent failures try: max_buf = _max_buffer_size() while len(_buf) > max_buf: _buf.popleft() except Exception: pass except Exception: # best-effort; continue pass # Pop a batch under lock, but perform IO without holding the lock try: batch_size = int(os.getenv("METRICS_BATCH_SIZE", "50") or "50") except Exception: batch_size = 50 items: List[Dict[str, Any]] = [] try: with _lock: while _buf and len(items) < max(1, batch_size): items.append(_buf.popleft()) except Exception: # Never lose already-popped items try: with _lock: for it in reversed(items): _buf.appendleft(it) except Exception: pass return False if not items: return False try: coll.insert_many(items, ordered=False) # type: ignore[attr-defined] global _last_flush_ts with _lock: _last_flush_ts = now_ts return True except Exception as e: # Re-queue on failure with _lock: for it in reversed(items): _buf.appendleft(it) emit_event("metrics_db_batch_insert_error", severity="warn", error=str(e), count=len(items)) return False
[תיעוד] def flush(force: bool = False) -> None: # Emergency kill-switch: disable DB writes explicitly when needed. # Default is OFF (i.e., writes are allowed when METRICS_DB_ENABLED=true). if _is_true(os.getenv("DISABLE_METRICS_WRITES")): return if not _enabled(): return now_ts = time.time() try: interval = int(os.getenv("METRICS_FLUSH_INTERVAL_SEC", "5") or "5") except Exception: interval = 5 # Time-based threshold if not force and (now_ts - _last_flush_ts) < max(1, interval): return # Flush multiple batches best-effort (useful for force=True and for draining the queue) for _ in range(100): if not _flush_once(time.time()): break
[תיעוד] def enqueue_request_metric( status_code: int, duration_seconds: float, *, request_id: str | None = None, extra: Optional[Dict[str, Any]] = None, ) -> None: """Queue a single request metric for best-effort DB persistence. This write path is opt-in via ``METRICS_DB_ENABLED=true``. In production we do **not** write "one document per request". Instead, we roll up requests into per-bucket documents to keep MongoDB load low. """ # Emergency kill-switch: disable DB writes explicitly when needed. # Default is OFF (i.e., writes are allowed when METRICS_DB_ENABLED=true). if _is_true(os.getenv("DISABLE_METRICS_WRITES")): return if not _enabled(): return # If initialization was deemed impossible (pymongo missing / bad URL), # drop new items immediately to uphold fail-open semantics. if _init_failed: return try: now = datetime.now(timezone.utc) rollup_sec = _rollup_seconds() bucket_dt = _bucket_start_dt(now, rollup_sec) bucket_ts = int(bucket_dt.timestamp()) # Best-effort enrich with low-cardinality context (method/path/handler) method = "UNKNOWN" path = "unknown" handler = "" if isinstance(extra, dict): method = _safe_label(extra.get("method"), default="UNKNOWN", limit=16).upper() handler = _safe_label(extra.get("handler"), default="", limit=160) path = _safe_label(extra.get("path") or handler, default="unknown", limit=160) else: method = "UNKNOWN" path = "unknown" dur = max(0.0, float(duration_seconds)) is_err = 1 if int(status_code) >= 500 else 0 with _lock: # Re-check under lock to avoid races if _init_failed: return key = (bucket_ts, method, path) doc = _agg.get(key) if doc is None: doc = { "ts": bucket_dt, "type": "request_agg", "bucket_seconds": int(rollup_sec), "method": method, "path": path, # Keep handler if provided (may be useful for debugging) "handler": handler or None, "count": 0, "sum_duration": 0.0, "max_duration": 0.0, "error_count": 0, } _agg[key] = doc # Update rollup counters try: doc["count"] = int(doc.get("count", 0) or 0) + 1 except Exception: doc["count"] = 1 try: doc["sum_duration"] = float(doc.get("sum_duration", 0.0) or 0.0) + dur except Exception: doc["sum_duration"] = dur try: prev_max = float(doc.get("max_duration", 0.0) or 0.0) except Exception: prev_max = 0.0 doc["max_duration"] = max(prev_max, dur) try: doc["error_count"] = int(doc.get("error_count", 0) or 0) + int(is_err) except Exception: doc["error_count"] = int(is_err) # Cap total queued DB docs (rollups + pending inserts) try: max_buf = _max_buffer_size() # If _agg grows too much (e.g. huge cardinality), start dropping oldest while len(_agg) + len(_buf) > max_buf: # Drain one rollup into buffer, then drop from buffer head _drain_agg_to_buf_unlocked() if _buf: _buf.popleft() else: break except Exception: pass # Never flush on the request path. Wake background worker instead. _start_worker_if_needed() try: _worker_event.set() except Exception: pass except Exception: # Fail-open: never raise return
[תיעוד] def aggregate_request_timeseries( *, start_dt: Optional[datetime], end_dt: Optional[datetime], granularity_seconds: int, ) -> List[Dict[str, Any]]: """Aggregate request metrics into fixed time buckets.""" coll = _get_collection() if coll is None: return [] try: bucket_seconds = max(1, int(granularity_seconds or 1)) except Exception: bucket_seconds = 60 bucket_ms = bucket_seconds * 1000 match = _build_time_match(start_dt, end_dt) pipeline = [ {"$match": match}, { "$project": { "bucket": { "$toDate": { "$subtract": [ {"$toLong": "$ts"}, {"$mod": [{"$toLong": "$ts"}, bucket_ms]}, ] } }, # Support both legacy per-request docs and rollup docs "count": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$count", 0]}, 1, ] }, "sum_duration": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$sum_duration", 0.0]}, {"$ifNull": ["$duration_seconds", 0.0]}, ] }, "max_duration": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$max_duration", 0.0]}, {"$ifNull": ["$duration_seconds", 0.0]}, ] }, "error_count": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$error_count", 0]}, { "$cond": [ {"$gte": ["$status_code", 500]}, 1, 0, ] }, ] }, } }, { "$group": { "_id": "$bucket", "count": {"$sum": "$count"}, "sum_duration": {"$sum": "$sum_duration"}, "max_duration": {"$max": "$max_duration"}, "error_count": {"$sum": "$error_count"}, } }, { "$project": { "count": 1, "avg_duration": { "$cond": [ {"$gt": ["$count", 0]}, {"$divide": ["$sum_duration", "$count"]}, 0.0, ] }, "max_duration": 1, "error_count": 1, } }, {"$sort": {"_id": 1}}, ] try: rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] except Exception: return [] out: List[Dict[str, Any]] = [] for row in rows: bucket = row.get("_id") ts_iso = None if isinstance(bucket, datetime): ts_iso = bucket.astimezone(timezone.utc).isoformat() out.append( { "timestamp": ts_iso, "count": int(row.get("count", 0)), "avg_duration": float(row.get("avg_duration", 0.0) or 0.0), "max_duration": float(row.get("max_duration", 0.0) or 0.0), "error_count": int(row.get("error_count", 0)), } ) return out
[תיעוד] def aggregate_top_endpoints( *, start_dt: Optional[datetime], end_dt: Optional[datetime], limit: int = 5, ) -> List[Dict[str, Any]]: """Return the slowest HTTP endpoints within the given time window.""" coll = _get_collection() if coll is None: return [] try: max_items = max(1, min(50, int(limit))) except Exception: max_items = 5 match = _build_time_match(start_dt, end_dt) pipeline = [ {"$match": match}, { "$project": { "path": { "$ifNull": [ "$path", {"$ifNull": ["$handler", "unknown"]}, ] }, "method": {"$ifNull": ["$method", "UNKNOWN"]}, "count": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$count", 0]}, 1, ] }, "sum_duration": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$sum_duration", 0.0]}, {"$ifNull": ["$duration_seconds", 0.0]}, ] }, "max_duration": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$max_duration", 0.0]}, {"$ifNull": ["$duration_seconds", 0.0]}, ] }, } }, { "$group": { "_id": {"path": "$path", "method": "$method"}, "count": {"$sum": "$count"}, "sum_duration": {"$sum": "$sum_duration"}, "max_duration": {"$max": "$max_duration"}, } }, { "$project": { "count": 1, "avg_duration": { "$cond": [ {"$gt": ["$count", 0]}, {"$divide": ["$sum_duration", "$count"]}, 0.0, ] }, "max_duration": 1, } }, {"$sort": {"max_duration": -1}}, {"$limit": max_items}, ] try: rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] except Exception: return [] out: List[Dict[str, Any]] = [] for row in rows: ident = row.get("_id") or {} path = ident.get("path") or "unknown" method = ident.get("method") or "UNKNOWN" out.append( { "endpoint": str(path), "method": str(method), "count": int(row.get("count", 0)), "avg_duration": float(row.get("avg_duration", 0.0) or 0.0), "max_duration": float(row.get("max_duration", 0.0) or 0.0), } ) return out
[תיעוד] def average_request_duration( *, start_dt: Optional[datetime], end_dt: Optional[datetime], ) -> Optional[float]: """Return the average request duration for a given window.""" coll = _get_collection() if coll is None: return None match = _build_time_match(start_dt, end_dt) pipeline = [ {"$match": match}, { "$project": { "count": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$count", 0]}, 1, ] }, "sum_duration": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$sum_duration", 0.0]}, {"$ifNull": ["$duration_seconds", 0.0]}, ] }, } }, { "$group": { "_id": None, "count": {"$sum": "$count"}, "sum_duration": {"$sum": "$sum_duration"}, } }, { "$project": { "avg_duration": { "$cond": [ {"$gt": ["$count", 0]}, {"$divide": ["$sum_duration", "$count"]}, 0.0, ] } } }, ] try: rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] except Exception: return None if not rows: return None value = rows[0].get("avg_duration") try: return float(value) except Exception: return None
[תיעוד] def aggregate_error_ratio( *, start_dt: Optional[datetime], end_dt: Optional[datetime], ) -> Dict[str, int]: """Return total/error counts for the window.""" coll = _get_collection() if coll is None: return {"total": 0, "errors": 0} match = _build_time_match(start_dt, end_dt) pipeline = [ {"$match": match}, { "$project": { "count": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$count", 0]}, 1, ] }, "error_count": { "$cond": [ {"$eq": ["$type", "request_agg"]}, {"$ifNull": ["$error_count", 0]}, { "$cond": [ {"$gte": ["$status_code", 500]}, 1, 0, ] }, ] }, } }, { "$group": { "_id": None, "total": {"$sum": "$count"}, "errors": {"$sum": "$error_count"}, } }, ] try: rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] except Exception: return {"total": 0, "errors": 0} if not rows: return {"total": 0, "errors": 0} doc = rows[0] return {"total": int(doc.get("total", 0)), "errors": int(doc.get("errors", 0))}
[תיעוד] def find_by_request_id( request_id: str, *, limit: int = 20, ) -> List[Dict[str, Any]]: """Find metrics records by request_id. Used by triage service to provide fallback when Sentry is unavailable. Returns empty list on any failure (fail-open). """ if not request_id: return [] coll = _get_collection() if coll is None: return [] try: import re rid = str(request_id).strip() if not rid: return [] max_items = max(1, min(100, int(limit))) # Regex safety: escape special characters to prevent injection (see alert_tags_storage.py) rid_escaped = re.escape(rid) # Search by exact match or partial match (prefix) query: Dict[str, Any] = { "$or": [ {"request_id": rid}, {"request_id": {"$regex": f"^{rid_escaped}"}}, ] } cursor = ( coll.find(query) # type: ignore[attr-defined] .sort("ts", -1) .limit(max_items) ) results: List[Dict[str, Any]] = [] for doc in cursor: try: ts = doc.get("ts") ts_iso = "" if isinstance(ts, datetime): ts_iso = ts.astimezone(timezone.utc).isoformat() results.append({ "timestamp": ts_iso, "request_id": str(doc.get("request_id") or ""), "status_code": int(doc.get("status_code", 0) or 0), "duration_seconds": float(doc.get("duration_seconds", 0.0) or 0.0), "path": str(doc.get("path") or doc.get("handler") or ""), "method": str(doc.get("method") or ""), }) except Exception: continue return results except Exception: return []
[תיעוד] def aggregate_latency_percentiles( *, start_dt: Optional[datetime], end_dt: Optional[datetime], percentiles: Tuple[int, ...] = (50, 95, 99), sample_limit: int = 5000, ) -> Dict[str, float]: """Return latency percentiles (seconds) for the given window. Best-effort: - Try Mongo $percentile aggregation when available. - Otherwise, sample up to sample_limit records and compute percentiles in Python. """ coll = _get_collection() if coll is None: return {} try: pcts = tuple(int(p) for p in (percentiles or (50, 95, 99))) except Exception: pcts = (50, 95, 99) pcts = tuple(p for p in pcts if 0 < int(p) < 100) if not pcts: pcts = (50, 95, 99) # Percentiles require raw per-request samples (rollups are not sufficient). match = {"type": "request"} if start_dt or end_dt: window: Dict[str, Any] = {} if start_dt: window["$gte"] = start_dt if end_dt: window["$lte"] = end_dt if window: match["ts"] = window # Attempt Mongo-native percentile aggregation (MongoDB 5.2+) try: p_array = [float(p) / 100.0 for p in pcts] pipeline = [ {"$match": match}, { "$group": { "_id": None, "p": { "$percentile": { "input": "$duration_seconds", "p": p_array, "method": "approximate", } }, } }, ] rows = list(coll.aggregate(pipeline)) # type: ignore[attr-defined] if rows: arr = rows[0].get("p") if isinstance(arr, list) and len(arr) == len(pcts): out: Dict[str, float] = {} for idx, p in enumerate(pcts): try: out[f"p{p}"] = float(arr[idx]) except Exception: continue if out: return out except Exception: # fallback below pass # Fallback: sample durations and compute percentiles in Python (nearest-rank). try: max_items = max(1, min(50_000, int(sample_limit))) except Exception: max_items = 5000 try: cursor = ( coll.find(match, {"duration_seconds": 1, "_id": 0}) # type: ignore[attr-defined] .sort("ts", -1) .limit(max_items) ) values: list[float] = [] for doc in cursor: try: values.append(float(doc.get("duration_seconds", 0.0) or 0.0)) except Exception: continue if not values: return {} values.sort() def _nearest_rank(p: int) -> float: if not values: return 0.0 # nearest-rank: ceil(p/100 * N) - 1 n = len(values) k = int(math.ceil((float(p) / 100.0) * float(n))) - 1 k = max(0, min(n - 1, k)) return float(values[k]) out: Dict[str, float] = {} for p in pcts: try: out[f"p{p}"] = _nearest_rank(int(p)) except Exception: continue return out except Exception: return {}