הראה קוד מקור ל services.investigation_service

"""
Investigation/Triage service – collects data for /triage.

Design:
- Sentry-first: search events by request_id or free-text query
- Fallback: empty timeline when Sentry not configured
- Grafana links: best-effort using GRAFANA_URL env
- HTML rendering: minimal, self-contained and safe (escapes content)

Env:
- GRAFANA_URL: base URL of Grafana, e.g. https://grafana.example.com
- PUBLIC_BASE_URL/WEBAPP_URL: used for building absolute links when relevant (optional)
"""
from __future__ import annotations

import os
import html
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from urllib.parse import quote_plus, urlparse

try:  # Optional sentry integration
    import integrations_sentry as sentry_client  # type: ignore
except Exception:  # pragma: no cover
    sentry_client = None  # type: ignore


def _now_iso() -> str:
    try:
        return datetime.now(timezone.utc).isoformat()
    except Exception:
        return ""


def _grafana_links_for_request(request_id: str) -> List[Dict[str, str]]:
    base = (os.getenv("GRAFANA_URL") or "").rstrip("/")
    if not base:
        return []
    rid = html.escape(str(request_id or ""))
    # Best-effort panels; actual dashboards may differ between deployments
    return [
        {"name": "Logs (24h)", "url": f"{base}/explore?orgId=1&query=\"request_id:{rid}\""},
        {"name": "Latency (5m)", "url": f"{base}/d/lZcyP/latency?orgId=1&var_request_id={rid}&from=now-5m&to=now"},
        {"name": "Errors (24h)", "url": f"{base}/d/err01/errors?orgId=1&var_request_id={rid}&from=now-24h&to=now"},
    ]


def _sentry_ui_base() -> Optional[str]:
    """Best-effort Sentry UI base URL (e.g., https://sentry.io).

    Prefer deriving from SENTRY_DSN; fallback to https://sentry.io.
    """
    try:
        dsn = os.getenv("SENTRY_DSN") or ""
        if dsn:
            try:
                parsed = urlparse(dsn)
                host = parsed.hostname or ""
            except Exception:
                host = ""
            if host:
                # Common DSN hosts: o123.ingest.sentry.io, ingest.sentry.io, sentry.io, self-hosted domains
                if host.endswith(".sentry.io") or host == "sentry.io":
                    return "https://sentry.io"
                if host.startswith("ingest."):
                    return f"https://{host[len('ingest.'):]}"
                return f"https://{host}"
    except Exception:
        pass
    return "https://sentry.io"


def _sentry_links_for_request(request_id: str) -> List[Dict[str, str]]:
    """Construct Sentry UI links for triage.

    Heuristic:
    - If the input looks like a bare token (no ':'/'='/space), treat it as request_id
      and search by request_id:"<token>".
    - Otherwise, treat the input as a raw Sentry query (e.g., endpoint=v2/getMonitors)
      and pass it through as-is.
    """
    rid = str(request_id or "").strip()
    if not rid:
        return []
    base = _sentry_ui_base()
    org = os.getenv("SENTRY_ORG") or os.getenv("SENTRY_ORG_SLUG")
    if not org:
        return []

    try:
        # Align with the search heuristic used in triage():
        if (":" not in rid) and ("=" not in rid) and (" " not in rid):
            query_expr = f'request_id:"{rid}"'
        else:
            query_expr = rid
    except Exception:
        query_expr = rid

    q = quote_plus(query_expr)
    # Issues search (24h) is the most universally supported view
    issues = f"{base}/organizations/{org}/issues/?query={q}&statsPeriod=24h"
    # Discover results (if enabled) – optional but useful
    discover = f"{base}/organizations/{org}/discover/results/?query={q}&sort=-timestamp"
    return [
        {"name": "Sentry Issues (24h)", "url": issues},
        {"name": "Sentry Discover", "url": discover},
    ]


def _summarize_timeline_text(timeline: List[Dict[str, Any]], limit: int = 10) -> str:
    lines: List[str] = []
    # כבד במדויק את ה-limit, כולל מקרה של 0
    if limit <= 0:
        return ""
    for i, item in enumerate(timeline[: limit], 1):
        ts = str(item.get("timestamp") or item.get("ts") or "")
        message = str(item.get("message") or item.get("title") or "")
        lines.append(f"{i}. {ts}{message}")
    return "\n".join(lines)


[תיעוד] def render_triage_html(result: Dict[str, Any]) -> str: """Render a compact triage HTML report. The HTML is intentionally simple and self-contained. """ rid = html.escape(str(result.get("request_id") or result.get("query") or "")) timeline: List[Dict[str, Any]] = list(result.get("timeline") or []) links: List[Dict[str, str]] = list(result.get("grafana_links") or []) sentry_links: List[Dict[str, str]] = list(result.get("sentry_links") or []) rows: List[str] = [] for item in timeline[:20]: ts = html.escape(str(item.get("timestamp") or item.get("ts") or "")) msg = html.escape(str(item.get("message") or item.get("title") or "")) url = str(item.get("url") or item.get("permalink") or "").strip() if url: msg = f'<a href="{html.escape(url)}" target="_blank" rel="noopener">{msg}</a>' rows.append(f"<tr><td>{ts}</td><td>{msg}</td></tr>") link_tags = [] for ln in (sentry_links + links)[:6]: name = html.escape(str(ln.get("name") or "Link")) url = html.escape(str(ln.get("url") or "")) if url: link_tags.append(f'<a href="{url}" target="_blank" rel="noopener">{name}</a>') html_out = f""" <!doctype html> <meta charset="utf-8" /> <title>/triage – {rid}</title> <style> body {{ font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, "Helvetica Neue", Arial; padding: 12px; }} h1 {{ font-size: 18px; margin: 0 0 8px; }} .meta {{ color: #6b7280; font-size: 12px; margin-bottom: 10px; }} table {{ width: 100%; border-collapse: collapse; }} th, td {{ text-align: left; border-bottom: 1px solid #eee; padding: 6px 4px; font-size: 13px; }} .links a {{ margin-right: 12px; color: #2563eb; text-decoration: none; }} </style> <h1>🔎 Triage: {rid}</h1> <div class="meta">נוצר: {_now_iso()}</div> <div class="links">{' '.join(link_tags)}</div> <table> <thead><tr><th>זמן</th><th>אירוע</th></tr></thead> <tbody> {''.join(rows) or '<tr><td colspan="2">אין אירועים.</td></tr>'} </tbody> </table> """ return html_out
def _search_local_errors(request_id: str, limit: int = 20) -> List[Dict[str, Any]]: """Search in-memory error buffer for matching request_id (fallback). Returns list of timeline events (best-effort, fail-open). """ results: List[Dict[str, Any]] = [] try: from observability import get_recent_errors # type: ignore errors = get_recent_errors(limit=200) rid = str(request_id or "").strip().lower() if not rid: return [] for err in errors: try: err_rid = str(err.get("request_id") or "").strip().lower() # Match exact or prefix if err_rid and (err_rid == rid or err_rid.startswith(rid) or rid in err_rid): results.append({ "timestamp": str(err.get("ts") or err.get("timestamp") or ""), "message": str(err.get("error") or err.get("message") or err.get("event") or ""), "url": "", "source": "local_buffer", }) except Exception: continue except Exception: pass return results[:limit] def _search_metrics_storage(request_id: str, limit: int = 20) -> List[Dict[str, Any]]: """Search DB-backed metrics storage for matching request_id (fallback). Returns list of timeline events (best-effort, fail-open). """ results: List[Dict[str, Any]] = [] try: from monitoring.metrics_storage import find_by_request_id # type: ignore records = find_by_request_id(request_id, limit=limit) for rec in records: try: status = int(rec.get("status_code", 0) or 0) duration = float(rec.get("duration_seconds", 0.0) or 0.0) path = str(rec.get("path") or "") method = str(rec.get("method") or "") msg_parts = [] if method: msg_parts.append(method) if path: msg_parts.append(path) msg_parts.append(f"status={status}") msg_parts.append(f"duration={duration:.3f}s") results.append({ "timestamp": str(rec.get("timestamp") or ""), "message": " ".join(msg_parts), "url": "", "source": "metrics_db", }) except Exception: continue except Exception: pass return results[:limit]
[תיעוד] async def triage(query_or_request_id: str, limit: int = 20) -> Dict[str, Any]: query = str(query_or_request_id or "").strip() timeline: List[Dict[str, Any]] = [] # Sentry-first search try: if sentry_client and sentry_client.is_configured(): # type: ignore[attr-defined] # If query looks like a bare token, search by request_id field if ":" not in query and "=" not in query and " " not in query: squery = f'request_id:"{query}"' else: squery = query events = await sentry_client.search_events(squery, limit=limit) # type: ignore[attr-defined] # Normalize fields for rendering for ev in events: timeline.append( { "timestamp": ev.get("timestamp") or "", "message": ev.get("message") or "", "url": ev.get("url") or "", "source": "sentry", } ) except Exception: pass # Fallback: search local sources when Sentry returns nothing # This handles cases where: # 1. Sentry is not configured # 2. The request was successful (status 200) and not sent to Sentry # 3. The event hasn't been indexed in Sentry yet if not timeline: try: # Check if query looks like a request_id (bare token) if ":" not in query and "=" not in query and " " not in query: # Calculate sub-limits to ensure combined results don't exceed limit # Split evenly between sources, giving preference to local errors local_limit = (limit + 1) // 2 metrics_limit = limit - local_limit # 1. Search in-memory error buffer local_errors = _search_local_errors(query, limit=local_limit) timeline.extend(local_errors) # 2. Search DB-backed metrics storage metrics_records = _search_metrics_storage(query, limit=metrics_limit) timeline.extend(metrics_records) # Sort by timestamp (newest first) and truncate to limit try: timeline.sort(key=lambda x: str(x.get("timestamp") or ""), reverse=True) timeline = timeline[:limit] except Exception: pass except Exception: pass # Links (best-effort) grafana_links = _grafana_links_for_request(query) sentry_links = _sentry_links_for_request(query) result: Dict[str, Any] = { "query": query, "request_id": query, "timeline": timeline, "summary_text": _summarize_timeline_text(timeline), "grafana_links": grafana_links, "sentry_links": sentry_links, } result["summary_html"] = render_triage_html(result) return result