הראה קוד מקור ל services.webserver

import asyncio
import atexit
import logging
import os
import secrets
from dataclasses import dataclass
import hashlib
import hmac
import re
from typing import Any, Dict, Optional, Tuple

# Configure structured logging and Sentry as early as possible,
# and install sensitive data redaction on log handlers before Sentry hooks logging.
try:
    from observability import setup_structlog_logging, init_sentry, get_log_level_from_env  # type: ignore
    setup_structlog_logging(get_log_level_from_env("INFO"))
    try:
        from utils import install_sensitive_filter  # type: ignore
        install_sensitive_filter()
    except Exception as e:
        # תיעוד חריגה במקום pass בלבד – זרימה אוטומטית מטופלת כאנומליה
        try:
            from observability import emit_event  # type: ignore
            emit_event(
                "install_sensitive_filter_failed",
                severity="anomaly",
                operation="startup",
                handled=True,
                error=str(e),
            )
        except Exception:
            logging.getLogger(__name__).warning(
                "install_sensitive_filter_failed", extra={"operation": "startup", "handled": True}
            )
    init_sentry()
except Exception as e:
    # Fail-open: אל תחסום עלייה – אך רשום אזהרה במקום pass
    logging.getLogger(__name__).warning(
        "observability_init_failed", extra={"operation": "startup", "handled": True, "error": str(e)}
    )

from aiohttp import web
import json
import time
from services.db_health_service import (
    get_db_health_service,
    InvalidCollectionNameError,
    CollectionAccessDeniedError,
    MAX_SKIP,
    clean_db_health_filter_value,
)
from services.sentry_utils import first_int
try:
    # Correlation for web requests
    from observability import generate_request_id, bind_request_id  # type: ignore
except Exception:  # pragma: no cover
[תיעוד] def generate_request_id(): # type: ignore return ""
[תיעוד] def bind_request_id(_rid: str) -> None: # type: ignore return None
try: from metrics import ( metrics_endpoint_bytes, metrics_content_type, record_request_outcome, record_request_queue_delay, note_request_started, note_request_finished, note_deployment_started, note_deployment_shutdown, ) except Exception: # pragma: no cover metrics_endpoint_bytes = lambda: b"" # type: ignore metrics_content_type = lambda: "text/plain; charset=utf-8" # type: ignore def record_request_outcome(status_code: int, duration_seconds: float, **_kwargs) -> None: # type: ignore return None def record_request_queue_delay(method: str, endpoint: str | None, delay_seconds: float, **_kwargs) -> None: # type: ignore return None def note_request_started() -> None: # type: ignore return None def note_request_finished() -> None: # type: ignore return None def note_deployment_started(_summary: str = "Service starting up") -> None: # type: ignore return None def note_deployment_shutdown(_summary: str = "Service shutting down") -> None: # type: ignore return None from html import escape as html_escape # הערה: לא נייבא את code_sharing כ-reference קבוע כדי לאפשר monkeypatch דינמי בטסטים. # במקום זאת נפתור את ה-service בזמן ריצה בתוך ה-handler. # Optional structured logging/event emission and error counter (fail-open) try: # type: ignore from observability import emit_event # type: ignore except Exception: # pragma: no cover
[תיעוד] def emit_event(event: str, severity: str = "info", **fields): # type: ignore return None
try: from metrics import errors_total # type: ignore except Exception: # pragma: no cover errors_total = None # type: ignore try: _AI_REQUEST_TIMEOUT = max(5.0, min(20.0, float(os.getenv("OBS_AI_EXPLAIN_TIMEOUT", "10")))) except ValueError: _AI_REQUEST_TIMEOUT = 10.0 _AI_ROUTE_TOKEN = os.getenv("OBS_AI_EXPLAIN_TOKEN") or os.getenv("AI_EXPLAIN_TOKEN") or "" logger = logging.getLogger(__name__) # --- DB Health auth (Token-based) --- DB_HEALTH_TOKEN = os.getenv("DB_HEALTH_TOKEN", "") def _constant_time_compare(a: str, b: str) -> bool: """השוואה בזמן קבוע למניעת timing attacks. משתמש ב-hmac.compare_digest שמבצע השוואה בזמן קבוע ללא קיצור-דרך על אי-התאמה ראשונה. """ try: return hmac.compare_digest( a.encode("utf-8") if isinstance(a, str) else a, b.encode("utf-8") if isinstance(b, str) else b, ) except (TypeError, AttributeError): return False @web.middleware async def db_health_auth_middleware(request: web.Request, handler): """Middleware להגנה על endpoints רגישים (DB + Jobs Monitor).""" if ( request.path.startswith("/api/db/") or request.path.startswith("/api/jobs") or request.path.startswith("/api/debug/") ): if not DB_HEALTH_TOKEN: # אם לא מוגדר token, חסום לגמרי return web.json_response({"error": "disabled"}, status=403) auth = request.headers.get("Authorization", "") or "" # עדיפות ל-header; fallback ל-query param (?token=...) רק ל-endpoint תחזוקה, # כדי לא להחליש את מצב האבטחה של /api/db/* ו-/api/jobs/* provided_token = "" if auth.startswith("Bearer "): provided_token = auth[7:] # הסר את "Bearer " else: allow_query_token = request.path == "/api/debug/maintenance_cleanup" if allow_query_token: try: provided_token = str(request.query.get("token") or "") except Exception: provided_token = "" if not provided_token: return web.json_response({"error": "unauthorized"}, status=401) # השוואה בזמן קבוע למניעת timing attacks! # secrets.compare_digest או hmac.compare_digest if not _constant_time_compare(provided_token, DB_HEALTH_TOKEN): return web.json_response({"error": "unauthorized"}, status=401) return await handler(request) # AI explain service is optional in minimal envs. # IMPORTANT: Tests monkeypatch `services.webserver.ai_explain_service`, so keep the attribute always present. try: # type: ignore from services import ai_explain_service as ai_explain_service # type: ignore except Exception: # pragma: no cover class _AiExplainServiceStub: class AiExplainError(RuntimeError): pass async def generate_ai_explanation(self, *_a, **_k): # type: ignore[no-untyped-def] raise self.AiExplainError("service_unavailable") ai_explain_service = _AiExplainServiceStub() # type: ignore # --- Queue delay (request queueing) instrumentation --- _QUEUE_DELAY_HEADERS = ("X-Queue-Start", "X-Request-Start") _QUEUE_DELAY_EVENT_NAME = "access_logs" _QUEUE_DELAY_EPOCH_RE = re.compile(r"(-?\d+(?:\.\d+)?)") def _queue_delay_warn_threshold_ms() -> int: try: return max(0, int(float(os.getenv("QUEUE_DELAY_WARN_MS", "500") or 500))) except Exception: return 500 def _parse_request_start_to_epoch_seconds(raw: str | None) -> float | None: """Parse request start header into epoch seconds (best-effort). Supported shapes: - "t=1700000000.123" (seconds, float) - "1700000000" (seconds, int) - "1700000000123" (milliseconds) - "1700000000123456" (microseconds) - "1700000000123456789" (nanoseconds) """ try: text = str(raw or "").strip() except Exception: return None if not text: return None # Common prefix: "t=..." if text.lower().startswith("t="): text = text.split("=", 1)[1].strip() m = _QUEUE_DELAY_EPOCH_RE.search(text) if not m: return None token = m.group(1) if not token: return None # Float token => treat as seconds (e.g., "1700000000.123") if "." in token: try: value = float(token) except Exception: return None return value if value > 0 else None # Integer token => infer unit from digit length try: value_int = int(token) except Exception: return None if value_int <= 0: return None digits = len(token.lstrip("+-")) # epoch seconds ~ 10 digits, ms ~ 13, us ~ 16, ns ~ 19 if digits <= 10: return float(value_int) if digits <= 13: return float(value_int) / 1_000.0 if digits <= 16: return float(value_int) / 1_000_000.0 return float(value_int) / 1_000_000_000.0 def _compute_queue_delay_ms(headers: Any, *, now_epoch_seconds: float) -> Tuple[int, str | None]: """Return (queue_delay_ms, source_header) with fail-open behavior.""" for header_name in _QUEUE_DELAY_HEADERS: try: raw = headers.get(header_name) except Exception: raw = None if not raw: continue ts = _parse_request_start_to_epoch_seconds(raw) if ts is None: continue try: delay_ms = int(round(max(0.0, float(now_epoch_seconds - float(ts)) * 1000.0))) except Exception: delay_ms = 0 return delay_ms, header_name return 0, None def _bind_queue_delay_context(queue_delay_ms: int, source_header: str | None) -> None: """Bind queue delay to structlog contextvars (best-effort).""" try: import structlog # type: ignore payload: Dict[str, Any] = {"queue_delay": int(queue_delay_ms)} if source_header: payload["queue_delay_source"] = str(source_header) structlog.contextvars.bind_contextvars(**payload) except Exception: return # --- Sentry webhook: in-memory de-dup to avoid bursts --- _SENTRY_DEDUP: dict[str, float] = {} def _sentry_dedup_window_seconds() -> int: try: return max(0, int(float(os.getenv("SENTRY_WEBHOOK_DEDUP_WINDOW_SECONDS", "300") or 300))) except Exception: return 300 def _sentry_secret() -> str: # Prefer explicit Sentry webhook secret; fallback to generic webhook secret if set. return str(os.getenv("SENTRY_WEBHOOK_SECRET") or os.getenv("WEBHOOK_SECRET") or "").strip() def _sha256_hmac_hex(secret: str, msg: bytes) -> str: try: return hmac.new(secret.encode("utf-8"), msg, hashlib.sha256).hexdigest() except Exception: return "" def _constant_time_equals(a: str, b: str) -> bool: try: return hmac.compare_digest(str(a or ""), str(b or "")) except Exception: return False def _verify_sentry_webhook(request: web.Request, body: bytes) -> bool: """Best-effort verification for Sentry webhook calls. Supported modes: - HMAC signature headers (preferred when provided by Sentry) - Bearer token / query param token fallback (for setups where headers are not configurable) """ secret = _sentry_secret() if not secret: # Explicit opt-in: if no secret configured, allow (fail-open). return True # 1) Token fallback: Authorization: Bearer <secret> or ?token=<secret> try: auth = str(request.headers.get("Authorization") or "").strip() if auth.lower().startswith("bearer "): token = auth.split(" ", 1)[1].strip() if token and _constant_time_equals(token, secret): return True except Exception: pass try: token = str(request.query.get("token") or "").strip() if token and _constant_time_equals(token, secret): return True except Exception: pass # 2) HMAC signature headers (Sentry varies between integrations) # Common headers: # - X-Sentry-Hook-Signature / X-Sentry-Signature # - X-Sentry-Hook-Timestamp / X-Sentry-Timestamp try: sig = ( request.headers.get("X-Sentry-Hook-Signature") or request.headers.get("X-Sentry-Signature") or request.headers.get("Sentry-Hook-Signature") or request.headers.get("Sentry-Signature") or "" ) sig = str(sig or "").strip() except Exception: sig = "" try: ts = ( request.headers.get("X-Sentry-Hook-Timestamp") or request.headers.get("X-Sentry-Timestamp") or request.headers.get("Sentry-Hook-Timestamp") or request.headers.get("Sentry-Timestamp") or "" ) ts = str(ts or "").strip() except Exception: ts = "" if not sig: return False # Accept both common signing shapes: # - HMAC(secret, body) # - HMAC(secret, f"{timestamp}.{body}") try: candidate_a = _sha256_hmac_hex(secret, body) if candidate_a and _constant_time_equals(candidate_a, sig): return True except Exception: pass if ts: try: candidate_b = _sha256_hmac_hex(secret, ts.encode("utf-8") + b"." + body) if candidate_b and _constant_time_equals(candidate_b, sig): return True except Exception: pass return False @dataclass class _SentryAlert: name: str summary: str severity: str dedup_key: str details: Dict[str, Any] def _map_sentry_level_to_severity(level: str | None) -> str: v = str(level or "").strip().lower() if v in {"fatal", "critical"}: return "critical" if v in {"error", "err"}: return "error" if v in {"warning", "warn"}: return "warning" if v in {"info"}: return "info" # Unknown -> keep it visible but not noisy return "warning" def _truncate(text: str, limit: int) -> str: try: s = str(text or "").strip() except Exception: s = "" if limit and len(s) > limit: return s[: max(0, limit - 1)] + "…" return s def _extract_sentry_alert(payload: Any) -> _SentryAlert | None: if not isinstance(payload, dict) or not payload: return None action = str(payload.get("action") or payload.get("trigger") or payload.get("status") or "").strip().lower() data = payload.get("data") if isinstance(payload.get("data"), dict) else {} issue = ( data.get("issue") if isinstance(data.get("issue"), dict) else (payload.get("issue") if isinstance(payload.get("issue"), dict) else {}) ) event = ( data.get("event") if isinstance(data.get("event"), dict) else (payload.get("event") if isinstance(payload.get("event"), dict) else {}) ) project = ( data.get("project") if isinstance(data.get("project"), dict) else (payload.get("project") if isinstance(payload.get("project"), dict) else {}) ) title = ( payload.get("title") or issue.get("title") or event.get("title") or event.get("message") or payload.get("message") or "" ) title_s = _truncate(str(title), 220) issue_id = str(issue.get("id") or payload.get("issue_id") or "").strip() short_id = str(issue.get("shortId") or issue.get("short_id") or payload.get("shortId") or payload.get("short_id") or "").strip() permalink = str( issue.get("permalink") or payload.get("permalink") or event.get("permalink") or payload.get("url") or "" ).strip() project_slug = str(project.get("slug") or payload.get("project_slug") or payload.get("project") or "").strip() level = str(event.get("level") or payload.get("level") or payload.get("level_name") or "").strip().lower() or None severity = _map_sentry_level_to_severity(level) # Drop resolved notifications by default (still record as info if they come through). if action in {"resolved", "resolved_issue", "issue_resolved"}: severity = "info" # Heuristic: some Sentry issues are background/maintenance noise (e.g. pymongo pool housekeeping). # Downgrade them to warning so they don't look like user-facing errors in Telegram/Observability. try: lowered_title = str(title_s or "").lower() # NOTE: do not override "resolved" notifications; they should remain informational, # otherwise we emit new warning-level alerts and create new dedup keys. if ( severity != "info" and ("_operationcancelled" in lowered_title or "operation cancelled" in lowered_title) ): severity = "warning" except Exception: pass # Stable identifiers for dedup primary_id = issue_id or short_id or str(event.get("id") or event.get("event_id") or "").strip() dedup_key = "|".join([x for x in [primary_id, project_slug, severity, action] if x]) if not dedup_key: # Worst-case fallback: title bucket dedup_key = f"title:{title_s[:80]}" display_id = short_id or (issue_id[:8] if issue_id else "issue") name = _truncate(f"Sentry: {display_id}", 128) occurrence_count = first_int( issue.get("count"), issue.get("eventCount"), issue.get("occurrence_count"), payload.get("occurrence_count"), payload.get("count"), event.get("count"), ) # Important: order matters for UI display. Put alert_type and Sentry IDs first. details: Dict[str, Any] = { "alert_type": "sentry_issue", "sentry_issue_id": issue_id or None, "sentry_short_id": short_id or None, "sentry_permalink": permalink or None, "sentry_event_id": str(event.get("id") or event.get("event_id") or "").strip() or None, "project": project_slug or None, "level": level or None, "action": action or None, "logger": str(event.get("logger") or payload.get("logger") or "").strip() or None, "culprit": str(issue.get("culprit") or event.get("culprit") or "").strip() or None, "environment": str(event.get("environment") or payload.get("environment") or "").strip() or None, "occurrence_count": occurrence_count, } # Remove None values to keep storage clean details = {k: v for k, v in details.items() if v not in (None, "")} summary = title_s or "Sentry alert" return _SentryAlert(name=name, summary=summary, severity=severity, dedup_key=dedup_key, details=details) def _should_emit_sentry_alert(dedup_key: str) -> bool: window = _sentry_dedup_window_seconds() if window <= 0: return True now = time.time() # Lazy cleanup of old entries (best-effort) try: cutoff = now - float(window) for k, ts in list(_SENTRY_DEDUP.items()): if ts < cutoff: _SENTRY_DEDUP.pop(k, None) except Exception: pass last = _SENTRY_DEDUP.get(dedup_key) if last is not None and (now - float(last)) < float(window): return False _SENTRY_DEDUP[dedup_key] = now return True
[תיעוד] def create_app() -> web.Application: # הוסף middleware שמייצר ומקשר request_id לכל בקשה נכנסת @web.middleware async def _request_id_mw(request: web.Request, handler): req_id = generate_request_id() or "" start = time.perf_counter() wall_now = time.time() handler_name = getattr(handler, "__name__", None) or handler.__class__.__name__ queue_delay_ms, queue_delay_source = _compute_queue_delay_ms( request.headers, now_epoch_seconds=float(wall_now) ) _bind_queue_delay_context(queue_delay_ms, queue_delay_source) try: note_request_started() except Exception: pass try: bind_request_id(req_id) except Exception as e: try: emit_event( "bind_request_id_failed", severity="anomaly", operation="request_id_middleware", handled=True, request_id=req_id, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "bind_request_id_failed", extra={"operation": "request_id_middleware", "handled": True} ) # המשך עיבוד try: response = await handler(request) finally: try: note_request_finished() except Exception: pass try: if hasattr(response, "headers") and req_id: response.headers["X-Request-ID"] = req_id except Exception as e: try: emit_event( "set_request_id_header_failed", severity="anomaly", operation="request_id_middleware", handled=True, request_id=req_id, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "set_request_id_header_failed", extra={"operation": "request_id_middleware", "handled": True} ) # Update unified request metrics (best-effort) try: duration = max(0.0, float(time.perf_counter() - start)) status = int(getattr(response, "status", 0) or 0) route_name = None try: route = getattr(request.match_info, "route", None) route_name = getattr(route, "name", None) except Exception: route_name = None path_label = getattr(request, "path", "") method_label = getattr(request, "method", "GET") handler_label = route_name or handler_name or path_label record_request_outcome( status, duration, source="aiohttp", handler=handler_label, path=path_label, method=method_label, cache_hit=None, ) try: record_request_queue_delay( method_label, route_name or handler_name or path_label, float(queue_delay_ms) / 1000.0, ) except Exception: pass # Structured access log (best-effort) try: access_fields: Dict[str, Any] = { "request_id": req_id, "method": method_label, "path": path_label, "handler": handler_label, "status_code": status, "duration_ms": int(duration * 1000), "queue_delay": int(queue_delay_ms), } if queue_delay_source: access_fields["queue_delay_source"] = str(queue_delay_source) # Silence noisy monitoring endpoints when request is "ok". # - For health/metrics: skip only successes (<400) but keep 4xx/5xx. # - For favicon: also skip 404/4xx noise, keep 5xx. # - For root availability probes: skip HEAD / when "ok" (<400), but keep 4xx/5xx. silent_paths = {"/metrics", "/health", "/healthz", "/favicon.ico"} is_silent_path = path_label in silent_paths is_root_check = (path_label == "/" and str(method_label).upper() == "HEAD") should_silence = False if is_silent_path or is_root_check: ok_threshold = 500 if path_label == "/favicon.ico" else 400 should_silence = int(status) < int(ok_threshold) if not should_silence: emit_event(_QUEUE_DELAY_EVENT_NAME, severity="info", **access_fields) except Exception: pass # Warning when queue delay is suspiciously high try: threshold = _queue_delay_warn_threshold_ms() if threshold > 0 and int(queue_delay_ms) >= int(threshold): warn_fields: Dict[str, Any] = { "request_id": req_id, "queue_delay": int(queue_delay_ms), "threshold_ms": int(threshold), "method": method_label, "path": path_label, "handler": handler_label, } if queue_delay_source: warn_fields["queue_delay_source"] = str(queue_delay_source) emit_event("queue_delay_high", severity="warning", **warn_fields) except Exception: pass except Exception as e: try: emit_event( "record_request_outcome_failed", severity="anomaly", operation="request_metrics", handled=True, request_id=req_id, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "record_request_outcome_failed", extra={"operation": "request_metrics", "handled": True} ) return response app = web.Application(middlewares=[_request_id_mw, db_health_auth_middleware]) # --- Query Performance Profiler routes (best-effort) --- # NOTE: המנגנון יכול להיות מנוטרל "קשיח" ברמת הקוד בתוך DatabaseManager.ENABLE_PROFILING. try: profiler_enabled_env = str(os.getenv("PROFILER_ENABLED", "true") or "").strip().lower() in {"1", "true", "yes", "y", "on"} except Exception: profiler_enabled_env = True if profiler_enabled_env: try: from database import db_manager # type: ignore # Hard-disable: אל תרשום routes/שירות אם DatabaseManager נועל את הפיצ'ר if not bool(getattr(db_manager, "ENABLE_PROFILING", True)): try: emit_event("profiler_disabled_by_code_flag", severity="info") except Exception: pass else: from services.query_profiler_service import PersistentQueryProfilerService # type: ignore from handlers.profiler_handler import setup_profiler_routes # type: ignore try: # שיכוך כאבים: ברירת מחדל גבוהה יותר כדי לא לתעד כל latency "רגיל" threshold_ms = int(float(os.getenv("PROFILER_SLOW_THRESHOLD_MS", "1000") or 1000)) except Exception: threshold_ms = 1000 profiler_service = PersistentQueryProfilerService(db_manager, slow_threshold_ms=threshold_ms) app["profiler_service"] = profiler_service setup_profiler_routes(app, profiler_service) except Exception as e: try: emit_event( "profiler_routes_init_failed", severity="warn", handled=True, error=str(e), ) except Exception: pass async def on_startup(app: web.Application): """אתחול שירותים בעליית השרת.""" try: # אתחול מוקדם של DB Health Service svc = await get_db_health_service() app["db_health_service"] = svc logger.info("DB Health Service initialized") except Exception as e: logger.warning(f"DB Health Service init failed: {e}") async def on_cleanup(app: web.Application): """ניקוי משאבים בכיבוי השרת.""" svc = app.get("db_health_service") if svc and hasattr(svc, "close"): await svc.close() app.on_startup.append(on_startup) app.on_cleanup.append(on_cleanup) async def health(request: web.Request) -> web.Response: return web.json_response({"status": "ok"}) async def metrics_view(request: web.Request) -> web.Response: try: payload = metrics_endpoint_bytes() return web.Response(body=payload, headers={"Content-Type": metrics_content_type()}) except Exception as e: logger.error(f"metrics_view error: {e}") # העדף emit_event שהוחלף במודול זה (monkeypatch), ואם לא – פתור דינמית את observability.emit_event try: chosen_emit = emit_event # type: ignore try: import sys as _sys obs = _sys.modules.get("observability") obs_emit = getattr(obs, "emit_event", None) if obs is not None else None except Exception: obs_emit = None # אם emit_event במודול הזה אינו מגיע מ-observability (סימן ל-monkeypatch) – העדף אותו # אחרת, אפשר להשתמש ב-observability.emit_event בזמן ריצה כדי לכבד monkeypatch שבוצע לאחר import if not callable(chosen_emit) or getattr(chosen_emit, "__module__", "") == "observability": if callable(obs_emit): chosen_emit = obs_emit # type: ignore chosen_emit( "metrics_view_error", severity="error", error_code="E_METRICS_VIEW", error=str(e), ) # type: ignore except Exception: pass try: if errors_total is not None: errors_total.labels(code="E_METRICS_VIEW").inc() except Exception: pass return web.Response(status=500, text="metrics error") async def alerts_view(request: web.Request) -> web.Response: """Alertmanager webhook endpoint: forwards alerts and logs them. Expected payload schema: {"alerts": [...]} or a single alert object. """ try: raw = await request.text() data = json.loads(raw) if raw else {} except Exception as e: try: # פנה ל-emit_event שהוחדר למודול זה (מאפשר monkeypatch בטסטים) emit_event("alerts_parse_error", severity="warn", error_code="E_ALERTS_PARSE", error=str(e)) # type: ignore except Exception: pass try: if errors_total is not None: errors_total.labels(code="E_ALERTS_PARSE").inc() except Exception: pass return web.Response(status=400, text="invalid json") # Normalize to list of alerts alerts = [] if isinstance(data, dict) and "alerts" in data and isinstance(data["alerts"], list): alerts = data["alerts"] elif isinstance(data, dict) and data: alerts = [data] # Forward via helper (Slack/Telegram) and emit events try: # Log only a lightweight counter at info level here; per-alert severities # are handled inside alert_forwarder.forward_alerts. emit_event("alert_received", severity="info", count=int(len(alerts))) except Exception: pass try: from alert_forwarder import forward_alerts # type: ignore forward_alerts(alerts) except Exception as e: # Soft-fail; דווח כאנומליה מטופלת try: emit_event( "alerts_forward_failed", severity="anomaly", operation="alerts_forward", handled=True, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "alerts_forward_failed", extra={"operation": "alerts_forward", "handled": True} ) return web.json_response({"status": "ok", "forwarded": len(alerts)}) async def alerts_get_view(request: web.Request) -> web.Response: """Return recent internal alerts as JSON for ChatOps and dashboards. Query params: - limit: int (default 20) """ try: limit = int(request.query.get("limit", "20")) except Exception: limit = 20 try: from internal_alerts import get_recent_alerts # type: ignore items = get_recent_alerts(limit=max(1, min(200, limit))) or [] except Exception: items = [] return web.json_response({"alerts": items}) async def sentry_webhook_view(request: web.Request) -> web.Response: """Sentry webhook endpoint: converts Sentry alerts into internal alerts. - Emits internal_alerts.emit_internal_alert(...) so Telegram forwarding works. - Persists to Mongo via monitoring.alerts_storage through internal_alerts (best-effort). """ try: body = await request.read() except Exception: body = b"" if not _verify_sentry_webhook(request, body): try: emit_event("sentry_webhook_unauthorized", severity="warn", handled=True) # type: ignore except Exception: pass return web.json_response({"ok": False, "error": "unauthorized"}, status=401) try: raw_text = body.decode("utf-8", errors="replace") if body else "" payload = json.loads(raw_text) if raw_text else {} except Exception as e: try: emit_event( "sentry_webhook_parse_error", severity="warn", handled=True, error=str(e), ) # type: ignore except Exception: pass return web.json_response({"ok": False, "error": "invalid_json"}, status=400) alert = _extract_sentry_alert(payload) if alert is None: try: emit_event("sentry_webhook_ignored", severity="info", handled=True, reason="empty_payload") # type: ignore except Exception: pass return web.json_response({"ok": True, "ignored": True}) should_emit = _should_emit_sentry_alert(alert.dedup_key) try: emit_event( "sentry_webhook_received", severity="info", handled=True, dedup=not should_emit, sentry_short_id=str(alert.details.get("sentry_short_id") or ""), project=str(alert.details.get("project") or ""), level=str(alert.details.get("level") or ""), action=str(alert.details.get("action") or ""), ) # type: ignore except Exception: pass if should_emit: try: from internal_alerts import emit_internal_alert # type: ignore emit_internal_alert( name=alert.name, severity=str(alert.severity), summary=str(alert.summary), **(alert.details or {}), ) except Exception as e: try: emit_event( "sentry_webhook_emit_failed", severity="anomaly", handled=True, error=str(e), ) # type: ignore except Exception: pass return web.json_response({"ok": False, "error": "emit_failed"}, status=500) return web.json_response({"ok": True, "deduped": (not should_emit)}) async def incidents_get_view(request: web.Request) -> web.Response: """Return incident history as JSON. Query params: - limit: int (default 20) """ try: limit = int(request.query.get("limit", "20")) except Exception: limit = 20 try: from remediation_manager import get_incidents # type: ignore items = get_incidents(limit=max(1, min(200, limit))) or [] except Exception: items = [] return web.json_response({"incidents": items}) async def ai_explain_view(request: web.Request) -> web.Response: start = time.perf_counter() req_id = request.headers.get("X-Request-ID") or "" if _AI_ROUTE_TOKEN: auth_header = request.headers.get("Authorization", "").strip() expected_header = f"Bearer {_AI_ROUTE_TOKEN}" try: valid_token = secrets.compare_digest(auth_header, expected_header) except Exception: valid_token = False if not valid_token: return web.json_response( { "error": "unauthorized", "message": "missing or invalid bearer token", }, status=401, ) try: payload = await request.json() except json.JSONDecodeError: return web.json_response({"error": "bad_request", "message": "invalid json"}, status=400) except Exception: return web.json_response({"error": "bad_request", "message": "invalid body"}, status=400) context = payload.get("context") expected_sections = payload.get("expected_sections") if not isinstance(context, dict): return web.json_response( {"error": "invalid_context", "message": "context must be an object"}, status=400, ) if expected_sections is not None and not isinstance(expected_sections, list): expected_sections = None alert_uid = str(context.get("alert_uid") or "") try: explanation = await asyncio.wait_for( ai_explain_service.generate_ai_explanation( context, expected_sections=expected_sections, request_id=req_id, ), timeout=_AI_REQUEST_TIMEOUT, ) except asyncio.TimeoutError: duration = time.perf_counter() - start try: emit_event( "ai_explain_request_failure", severity="error", alert_uid=alert_uid, duration_ms=int(duration * 1000), error_code="handler_timeout", handled=True, ) except Exception: pass return web.json_response( { "error": "timeout", "message": "פניית ה-AI חרגה מחלון הזמן", }, status=504, ) except ai_explain_service.AiExplainError as exc: duration = time.perf_counter() - start error_code = str(exc) or "provider_error" if error_code == "invalid_context": status = 400 message = "מבנה ההקשר אינו תקין" elif error_code == "anthropic_api_key_missing": status = 503 message = "השירות לא הוגדר (חסר מפתח Anthropic)" elif error_code in {"service_unavailable", "ai_explain_service_unavailable"}: status = 503 message = "שירות ההסבר אינו זמין" else: status = 502 message = "ספק ה-AI לא הצליח להחזיר תשובה" try: emit_event( "ai_explain_request_failure", severity="error", alert_uid=alert_uid, duration_ms=int(duration * 1000), error_code=error_code, handled=status < 500, ) except Exception: pass return web.json_response({"error": error_code, "message": message}, status=status) except Exception as exc: duration = time.perf_counter() - start try: emit_event( "ai_explain_request_failure", severity="error", alert_uid=alert_uid, duration_ms=int(duration * 1000), error=str(exc), handled=False, ) except Exception: pass logger.exception("ai_explain_handler_failed") return web.json_response( {"error": "internal_error", "message": "שגיאה בשירות ההסבר"}, status=500, ) duration = time.perf_counter() - start try: emit_event( "ai_explain_request_success", severity="info", alert_uid=alert_uid, duration_ms=int(duration * 1000), provider=explanation.get("provider"), ) except Exception: pass return web.json_response(explanation) async def share_view(request: web.Request) -> web.Response: share_id = request.match_info.get("share_id", "") try: # 해결 תלויות בזמן ריצה כדי לאפשר monkeypatch ב-tests: try: import importlib integ = importlib.import_module("integrations") _code_sharing = getattr(integ, "code_sharing") except Exception: from integrations import code_sharing as _code_sharing # type: ignore data = _code_sharing.get_internal_share(share_id) except Exception as e: logger.error(f"share_view error: {e}") try: # דווח אירוע מובנה על שגיאה בהצגת שיתוף # הערה: משתמש ב-emit_event של המודול כדי לאפשר monkeypatch בטסטים emit_event("share_view_error", severity="error", error_code="E_SHARE_VIEW", share_id=str(share_id), error=str(e)) except Exception: pass try: if errors_total is not None: errors_total.labels(code="E_SHARE_VIEW").inc() except Exception: pass data = None if not data: # החזר 404 וגם דווח אירוע מובנה לצורכי ניטור try: # הערה: משתמש ב-emit_event של המודול כדי לאפשר monkeypatch בטסטים emit_event("share_view_not_found", severity="warn", share_id=str(share_id)) except Exception: pass return web.Response(status=404, text="Share not found or expired") # החזר HTML פשוט לצפייה נוחה code = data.get("code", "") file_name = data.get("file_name", "snippet.txt") language = data.get("language", "text") try: emit_event("share_view_success", severity="info", share_id=str(share_id), file_name=str(file_name), language=str(language)) except Exception: pass html = f""" <!DOCTYPE html> <html lang="he"> <head> <meta charset="utf-8" /> <meta name="viewport" content="width=device-width, initial-scale=1" /> <title>Share: {file_name}</title> <style> body {{ font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; margin: 24px; }} pre {{ white-space: pre-wrap; word-wrap: break-word; background: #0d1117; color: #c9d1d9; padding: 16px; border-radius: 8px; overflow: auto; }} h1 {{ font-size: 18px; }} .meta {{ color: #57606a; margin-bottom: 8px; }} a {{ color: #58a6ff; }} </style> </head> <body> <h1>📄 {file_name}</h1> <div class="meta">שפה: {language}</div> <pre>{html_escape(code)}</pre> </body> </html> """ return web.Response(text=html, content_type="text/html") async def db_health_pool_view(request: web.Request) -> web.Response: """GET /api/db/pool - מצב Connection Pool.""" try: # await לקבלת ה-service (יכול להיות async init) svc = await get_db_health_service() # await לקריאה ל-MongoDB (Motor או thread pool) pool = await svc.get_pool_status() return web.json_response(pool.to_dict()) except Exception as e: logger.error(f"db_health_pool error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def db_health_ops_view(request: web.Request) -> web.Response: """GET /api/db/ops - פעולות איטיות פעילות.""" try: threshold = int(request.query.get("threshold_ms", "1000")) include_system = request.query.get("include_system", "").lower() == "true" svc = await get_db_health_service() # await חובה! - הקריאה ל-MongoDB היא אסינכרונית ops = await svc.get_current_operations( threshold_ms=threshold, include_system=include_system, ) return web.json_response( { "count": len(ops), "threshold_ms": threshold, "operations": [op.to_dict() for op in ops], } ) except Exception as e: logger.error(f"db_health_ops error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def db_health_collections_view(request: web.Request) -> web.Response: """GET /api/db/collections - סטטיסטיקות collections.""" try: collection = request.query.get("collection") svc = await get_db_health_service() # await חובה! - collStats יכול לקחת זמן stats = await svc.get_collection_stats(collection_name=collection) return web.json_response( { "count": len(stats), "collections": [s.to_dict() for s in stats], } ) except Exception as e: logger.error(f"db_health_collections error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def db_collection_documents_view(request: web.Request) -> web.Response: """GET /api/db/{collection}/documents - שליפת מסמכים מ-collection. Query Parameters: skip: מספר מסמכים לדלג (ברירת מחדל: 0) limit: מספר מסמכים להחזיר (ברירת מחדל: 20, מקסימום: 100) Returns: JSON עם documents, total, skip, limit, has_more Status Codes: 200: הצלחה 400: פרמטרים לא תקינים / שם collection לא תקין 403: גישה ל-collection חסומה 500: שגיאת שרת """ try: collection_name = request.match_info.get("collection", "") # פרסור פרמטרים עם ברירות מחדל try: skip = int(request.query.get("skip", "0")) limit = int(request.query.get("limit", "20")) except ValueError: return web.json_response( {"error": "invalid_params", "message": "skip and limit must be integers"}, status=400, ) # וולידציה בסיסית if skip < 0 or limit < 1: return web.json_response( {"error": "invalid_params", "message": "skip >= 0, limit >= 1"}, status=400, ) if skip > MAX_SKIP: return web.json_response( {"error": "invalid_params", "message": f"skip cannot exceed {MAX_SKIP}"}, status=400, ) filters: Dict[str, Any] = {} user_id_raw = clean_db_health_filter_value(request.query.get("userId") or request.query.get("user_id"), 40) status_raw = clean_db_health_filter_value(request.query.get("status"), 40) file_id_raw = clean_db_health_filter_value(request.query.get("fileId") or request.query.get("file_id"), 120) sort_raw = clean_db_health_filter_value(request.query.get("sort"), 20) sort_value = sort_raw or None if user_id_raw: try: filters["user_id"] = int(user_id_raw) except Exception: filters["user_id"] = user_id_raw if status_raw: filters["status"] = status_raw if file_id_raw: filters["file_id"] = file_id_raw svc = await get_db_health_service() result = await svc.get_documents( collection_name=collection_name, skip=skip, limit=limit, filters=filters or None, sort=sort_value, ) return web.json_response(result) except InvalidCollectionNameError as e: # שם collection לא תקין → 400 Bad Request return web.json_response( {"error": "invalid_collection_name", "message": str(e)}, status=400, ) except CollectionAccessDeniedError as e: # גישה חסומה → 403 Forbidden return web.json_response( {"error": "access_denied", "message": str(e)}, status=403, ) except Exception as e: logger.error(f"db_collection_documents error: {e}") return web.json_response( {"error": "internal_error", "message": "An unexpected error occurred"}, status=500, ) async def db_health_summary_view(request: web.Request) -> web.Response: """GET /api/db/health - סיכום בריאות כללי.""" try: svc = await get_db_health_service() # await חובה! summary = await svc.get_health_summary() return web.json_response(summary) except Exception as e: logger.error(f"db_health_summary error: {e}") return web.json_response({"error": "failed", "message": "internal_error"}, status=500) async def maintenance_cleanup_view(request: web.Request) -> web.Response: """GET /api/debug/maintenance_cleanup Endpoint תחזוקה חד-פעמי (DB): - איפוס קולקציות לוגים: slow_queries_log, service_metrics - יצירת TTL לקולקציות לוגים (מחיקה אוטומטית בעתיד) - ניקוי אינדקסים ב-code_snippets: השארה של אינדקסים קריטיים בלבד. ⚠️ מוגן ע"י db_health_auth_middleware (Bearer token). """ from services.db_provider import get_db def _run_cleanup() -> dict: preview = str(request.query.get("preview", "") or "").lower() in {"1", "true", "yes", "on"} db = get_db() # --- collections to purge --- try: slow_queries_coll = db.slow_queries_log except Exception: slow_queries_coll = None try: service_metrics_coll = db.service_metrics except Exception: service_metrics_coll = None try: code_snippets_coll = db.code_snippets except Exception: code_snippets_coll = None # Detect fail-open/noop db if not getattr(slow_queries_coll, "delete_many", None) or not getattr(service_metrics_coll, "delete_many", None): raise RuntimeError("db_unavailable_or_no_delete_many") if not getattr(code_snippets_coll, "index_information", None) or not getattr(code_snippets_coll, "drop_index", None): raise RuntimeError("db_unavailable_or_no_index_management") deleted_slow = 0 deleted_metrics = 0 if not preview: slow_res = slow_queries_coll.delete_many({}) metrics_res = service_metrics_coll.delete_many({}) deleted_slow = int(getattr(slow_res, "deleted_count", 0) or 0) deleted_metrics = int(getattr(metrics_res, "deleted_count", 0) or 0) # --- TTL indexes (permanent cleanup) --- def _ensure_ttl_index(coll: Any, *, field: str, expire_seconds: int, index_name: str) -> dict: """Ensure TTL index exists with requested expireAfterSeconds (best-effort).""" info_before = {} try: info_before = coll.index_information() or {} except Exception: info_before = {} existing_meta = info_before.get(index_name) if isinstance(info_before, dict) else None if isinstance(existing_meta, dict): try: existing_expire = existing_meta.get("expireAfterSeconds") existing_key = existing_meta.get("key") if ( existing_expire == int(expire_seconds) and existing_key == [(field, 1)] ): return { "name": index_name, "field": field, "expireAfterSeconds": int(expire_seconds), "status": "exists", } except Exception: pass # Try drop conflicting TTL index with the same name if not preview: try: coll.drop_index(index_name) except Exception: pass created_name = None if not preview: try: created_name = coll.create_index( [(field, 1)], name=index_name, expireAfterSeconds=int(expire_seconds), background=True, ) except Exception as e: # Best-effort: report error and continue return { "name": index_name, "field": field, "expireAfterSeconds": int(expire_seconds), "status": "error", "error": str(e), } return { "name": str(created_name or index_name), "field": field, "expireAfterSeconds": int(expire_seconds), "status": "planned" if preview else "created", } # Explicitly drop legacy TTL index that may conflict (IndexOptionsConflict) service_metrics_pre_drop: dict[str, Any] if preview: service_metrics_pre_drop = {"planned_drop": ["metrics_ttl"]} else: dropped_pre: list[str] = [] try: service_metrics_coll.drop_index("metrics_ttl") dropped_pre.append("metrics_ttl") except Exception: pass service_metrics_pre_drop = {"dropped": dropped_pre} ttl_results: dict[str, Any] = { "slow_queries_log": _ensure_ttl_index( slow_queries_coll, field="timestamp", expire_seconds=604800, # 7 days index_name="ttl_cleanup", ), # service_metrics uses "ts" in code, but we'll also create a "timestamp" TTL for safety/backward-compat "service_metrics_ts": _ensure_ttl_index( service_metrics_coll, field="ts", expire_seconds=86400, # 24 hours index_name="ttl_cleanup_ts", ), "service_metrics_timestamp": _ensure_ttl_index( service_metrics_coll, field="timestamp", expire_seconds=86400, # 24 hours index_name="ttl_cleanup", ), } ttl_results["service_metrics_pre_drop"] = service_metrics_pre_drop # --- index cleanup (code_snippets) --- try: idx_info = code_snippets_coll.index_information() or {} except Exception: idx_info = {} indexes_before = sorted([str(k) for k in idx_info.keys()]) indexes_before_details: dict[str, dict] = {} for k, meta in (idx_info or {}).items(): if not isinstance(meta, dict): continue name = str(k) # keep response compact but useful for review (especially text indexes) indexes_before_details[name] = { "key": meta.get("key"), "unique": bool(meta.get("unique")) if "unique" in meta else False, "expireAfterSeconds": meta.get("expireAfterSeconds"), "weights": meta.get("weights"), "default_language": meta.get("default_language"), } dropped: list[str] = [] kept: list[str] = [] drop_errors: dict[str, str] = {} def _should_keep_code_snippets_index(index_name: str, meta: Any) -> bool: # Keep by explicit name if index_name in {"_id_", "search_text_idx", "unique_file_name", "user_id", "user_updated_at"}: return True if not isinstance(meta, dict): return False key = meta.get("key") # Keep single-field user_id index (name may be user_id_1 / user_id_idx etc.) if key in ([("user_id", 1)], [("user_id", -1)]): return True # Keep default UI sort index (user_id + updated_at desc) if key in ( [("user_id", 1), ("updated_at", -1)], [("updated_at", -1), ("user_id", 1)], [("user_id", -1), ("updated_at", -1)], [("updated_at", -1), ("user_id", -1)], ): return True # Keep a unique index enforcing unique file name per user (compound user_id + file_name) try: if bool(meta.get("unique")) and key in ( [("user_id", 1), ("file_name", 1)], [("file_name", 1), ("user_id", 1)], [("user_id", 1), ("file_name", -1)], [("file_name", -1), ("user_id", 1)], [("user_id", -1), ("file_name", 1)], [("file_name", 1), ("user_id", -1)], [("user_id", -1), ("file_name", -1)], [("file_name", -1), ("user_id", -1)], ): return True except Exception: pass return False for name in sorted(idx_info.keys()): idx_name = str(name) meta = idx_info.get(name) if _should_keep_code_snippets_index(idx_name, meta): kept.append(idx_name) continue try: if preview: dropped.append(idx_name) # planned else: code_snippets_coll.drop_index(idx_name) dropped.append(idx_name) except Exception as e: # Best-effort: אם אינדקס לא קיים/לא ניתן למחיקה, נשמור שגיאה ונתקדם. drop_errors[idx_name] = str(e) # Ensure critical UI sort index exists (user_id + updated_at desc) ensured: dict[str, Any] = {"name": "user_updated_at", "key": [("user_id", 1), ("updated_at", -1)]} if preview: ensured["status"] = "planned" else: try: code_snippets_coll.create_index( [("user_id", 1), ("updated_at", -1)], name="user_updated_at", background=True, ) ensured["status"] = "created_or_exists" except Exception as e: ensured["status"] = "error" ensured["error"] = str(e) try: idx_info_after = code_snippets_coll.index_information() or {} except Exception: idx_info_after = {} indexes_after = sorted([str(k) for k in idx_info_after.keys()]) return { "ok": True, "preview": preview, "deleted_documents": { "slow_queries_log": deleted_slow, "service_metrics": deleted_metrics, "total": deleted_slow + deleted_metrics, }, "ttl": ttl_results, "indexes": { "collection": "code_snippets", "before": indexes_before, "before_details": indexes_before_details, "after": indexes_after, "dropped": dropped, "kept": kept, "drop_errors": drop_errors, "ensured": ensured, }, } try: result = await asyncio.to_thread(_run_cleanup) try: emit_event( "maintenance_cleanup_done", severity="info", deleted_total=int((result.get("deleted_documents") or {}).get("total") or 0), dropped_indexes_count=int(len(((result.get("indexes") or {}).get("dropped")) or [])), ) except Exception: pass return web.json_response(result) except Exception as e: logger.exception("maintenance_cleanup_failed") try: emit_event("maintenance_cleanup_failed", severity="error", handled=True, error=str(e)) except Exception: pass return web.json_response({"ok": False, "error": "failed", "message": "internal_error"}, status=500) app.router.add_get("/health", health) # Always expose /healthz alias for platform probes try: app.router.add_get("/healthz", health) except Exception as e: # Ignore if already registered – אך תעד כאנומליה מטופלת try: emit_event( "healthz_route_register_failed", severity="anomaly", operation="startup", handled=True, error=str(e), ) except Exception: logging.getLogger(__name__).warning( "healthz_route_register_failed", extra={"operation": "startup", "handled": True} ) app.router.add_get("/metrics", metrics_view) app.router.add_post("/alerts", alerts_view) app.router.add_get("/alerts", alerts_get_view) app.router.add_post("/webhooks/sentry", sentry_webhook_view) app.router.add_get("/incidents", incidents_get_view) app.router.add_post("/api/ai/explain", ai_explain_view) app.router.add_get("/share/{share_id}", share_view) app.router.add_get("/api/debug/maintenance_cleanup", maintenance_cleanup_view) app.router.add_get("/api/db/pool", db_health_pool_view) app.router.add_get("/api/db/ops", db_health_ops_view) app.router.add_get("/api/db/collections", db_health_collections_view) app.router.add_get("/api/db/{collection}/documents", db_collection_documents_view) app.router.add_get("/api/db/health", db_health_summary_view) # Jobs Monitor routes try: register_jobs_routes(app) except Exception: pass return app
[תיעוד] async def get_jobs_list(request: web.Request) -> web.Response: """GET /api/jobs - רשימת כל ה-jobs""" from services.job_registry import JobRegistry registry = JobRegistry() jobs = [] for job in registry.list_all(): is_enabled = registry.is_enabled(job.job_id) jobs.append( { "job_id": job.job_id, "name": job.name, "description": job.description, "category": job.category.value, "type": job.job_type.value, "interval_seconds": job.interval_seconds, "enabled": is_enabled, "env_toggle": job.env_toggle, # can_trigger: מאפשר הפעלה ידנית אם יש callback מוגדר "can_trigger": bool(job.callback_name), } ) return web.json_response({"jobs": jobs})
[תיעוד] async def get_job_detail(request: web.Request) -> web.Response: """GET /api/jobs/{job_id} - פרטי job ספציפי""" from services.job_registry import JobRegistry from services.job_tracker import get_job_tracker job_id = request.match_info.get("job_id") registry = JobRegistry() tracker = get_job_tracker() job = registry.get(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) history = tracker.get_job_history(job_id, limit=20) active = [r for r in tracker.get_active_runs() if r.job_id == job_id] return web.json_response( { "job": { "job_id": job.job_id, "name": job.name, "description": job.description, "category": job.category.value, "type": job.job_type.value, "interval_seconds": job.interval_seconds, "enabled": registry.is_enabled(job.job_id), "source_file": job.source_file, }, "active_runs": [_run_to_dict(r) for r in active], "history": [_run_to_dict(r) for r in history], } )
[תיעוד] async def get_run_detail(request: web.Request) -> web.Response: """GET /api/jobs/runs/{run_id} - פרטי הרצה""" from services.job_tracker import get_job_tracker run_id = request.match_info.get("run_id") tracker = get_job_tracker() run = tracker.get_run(run_id) if not run: return web.json_response({"error": "Run not found"}, status=404) return web.json_response({"run": _run_to_dict(run, include_logs=True)})
[תיעוד] async def get_active_runs(request: web.Request) -> web.Response: """GET /api/jobs/active - הרצות פעילות""" from services.job_tracker import get_job_tracker tracker = get_job_tracker() runs = tracker.get_active_runs() return web.json_response({"active_runs": [_run_to_dict(r) for r in runs]})
[תיעוד] async def trigger_job(request: web.Request) -> web.Response: """POST /api/jobs/{job_id}/trigger - הפעלה ידנית""" from services.job_registry import JobRegistry job_id = request.match_info.get("job_id") registry = JobRegistry() job = registry.get(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) # Trigger via Telegram JobQueue (same process as bot) tg_app = None try: tg_app = request.app.get("telegram_application") except Exception: tg_app = None if tg_app is None: return web.json_response({"error": "job_queue_unavailable"}, status=503) jq = getattr(tg_app, "job_queue", None) if jq is None or not hasattr(jq, "get_jobs_by_name"): return web.json_response({"error": "job_queue_unavailable"}, status=503) try: jobs = jq.get_jobs_by_name(job_id) except Exception: jobs = [] if not jobs: return web.json_response({"error": "job_not_scheduled"}, status=404) job_obj = jobs[0] callback = getattr(job_obj, "callback", None) if not callable(callback): return web.json_response({"error": "job_callback_unavailable"}, status=500) # Schedule immediate one-off run try: suffix = str(int(time.time())) except Exception: suffix = "now" try: data = getattr(job_obj, "data", None) chat_id = getattr(job_obj, "chat_id", None) user_id = getattr(job_obj, "user_id", None) kwargs = {"when": 0, "name": f"{job_id}_manual_{suffix}"} if data is not None: kwargs["data"] = data if chat_id is not None: kwargs["chat_id"] = chat_id if user_id is not None: kwargs["user_id"] = user_id jq.run_once(callback, **kwargs) except Exception: try: # Fallback for older signatures jq.run_once(callback, when=0) except Exception: logger.exception("jobs_trigger_failed job_id=%s", job_id) return web.json_response( {"error": "trigger_failed", "message": "Failed to trigger job"}, status=500, ) return web.json_response({"message": f"Job {job_id} triggered", "job_id": job_id})
def _run_to_dict(run, include_logs: bool = False) -> dict: """המרת JobRun ל-dict""" d = { "run_id": run.run_id, "job_id": run.job_id, "started_at": run.started_at.isoformat() if run.started_at else None, "ended_at": run.ended_at.isoformat() if run.ended_at else None, "status": run.status.value, "progress": run.progress, "total_items": run.total_items, "processed_items": run.processed_items, "error_message": run.error_message, "trigger": run.trigger, "user_id": run.user_id, "duration_seconds": ( (run.ended_at - run.started_at).total_seconds() if run.ended_at and run.started_at else None ), } if include_logs: d["logs"] = [ { "timestamp": log.timestamp.isoformat(), "level": log.level, "message": log.message, } for log in run.logs ] return d
[תיעוד] def register_jobs_routes(app: web.Application): """רישום routes של Jobs""" app.router.add_get("/api/jobs", get_jobs_list) app.router.add_get("/api/jobs/active", get_active_runs) app.router.add_get("/api/jobs/{job_id}", get_job_detail) app.router.add_get("/api/jobs/runs/{run_id}", get_run_detail) app.router.add_post("/api/jobs/{job_id}/trigger", trigger_job)
[תיעוד] def run(host: str = "0.0.0.0", port: int = 10000) -> None: # OpenTelemetry (best-effort, fail-open) try: from observability_otel import setup_telemetry as _setup_otel # type: ignore _setup_otel( service_name=str(os.getenv("OTEL_SERVICE_NAME") or "codebot-aiohttp"), service_version=os.getenv("SERVICE_VERSION") or os.getenv("RENDER_GIT_COMMIT") or None, environment=os.getenv("ENVIRONMENT") or os.getenv("ENV") or None, flask_app=None, ) except Exception: pass try: note_deployment_started("aiohttp service starting up") except Exception: pass app = create_app() web.run_app(app, host=host, port=port)
if __name__ == "__main__": # pragma: no cover - used by Render/CLI entrypoint host = os.getenv("WEB_HOST") or os.getenv("HOST") or "0.0.0.0" port_env = os.getenv("PORT") or os.getenv("WEB_PORT") or "10000" try: port = int(port_env) except (TypeError, ValueError): port = 10000 run(host=host, port=port)