"""
Predictive Health Engine (Observability v7)
- Maintains sliding window for metrics: error_rate_percent, latency_seconds, memory_usage_percent
- Exponential smoothing (EW-Regression) for trend estimation, fallback to linear regression
- Adaptive Feedback Loop: compare predictions vs actuals and adjust smoothing reactiveness
- If forecast crosses adaptive threshold within horizon (default 15 minutes), logs a predictive incident
- Predictive incidents are appended to data/predictions_log.json (JSONL)
- Optionally triggers preemptive actions (cache clear, GC, controlled restart) and logs PREDICTIVE_ACTION_TRIGGERED
- Auto-cleanup for predictions older than 24h
This module is intentionally best-effort and fail-open. It should never raise.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Deque, Dict, List, Optional, Tuple, Protocol, runtime_checkable, cast
from collections import deque
from datetime import datetime, timezone, timedelta
import json
import os
import time
import math
# Optional psutil for memory
try: # pragma: no cover - optional
import psutil # type: ignore
except Exception: # pragma: no cover
psutil = None # type: ignore
# Lazy event emitter (avoid hard import at module import time)
try:
from observability import emit_event # type: ignore
except Exception: # pragma: no cover
[תיעוד]
def emit_event(event: str, severity: str = "info", **fields): # type: ignore
return None
# Prometheus counters/gauges are optional
try: # pragma: no cover - metrics optional in some environments
from metrics import business_events_total # noqa: F401
from metrics import errors_total # noqa: F401
from metrics import Counter # type: ignore
from metrics import prediction_accuracy_percent as _accuracy_gauge # type: ignore
from metrics import prevented_incidents_total as _prevented_ctr # type: ignore
except Exception: # pragma: no cover
Counter = None # type: ignore
_accuracy_gauge = None # type: ignore
_prevented_ctr = None # type: ignore
try: # expose counters if prometheus_client available via metrics module
from metrics import ( # type: ignore
predicted_incidents_total as _predicted_ctr,
actual_incidents_total as _actual_ctr,
)
except Exception:
_predicted_ctr = None # type: ignore
_actual_ctr = None # type: ignore
@runtime_checkable
class _CacheLike(Protocol):
def clear_stale(self, max_scan: int = 1000, ttl_seconds_threshold: int = 60) -> int: ...
def clear_all(self) -> int: ...
# Optional cache manager for preemptive actions (typed via Protocol)
try:
from cache_manager import cache as _cache_instance
_cache: Optional[_CacheLike] = cast("Optional[_CacheLike]", _cache_instance)
except Exception: # pragma: no cover
_cache = None
_DATA_DIR = os.path.join("data")
_PREDICTIONS_FILE = os.path.join(_DATA_DIR, "predictions_log.json")
_INCIDENTS_FILE = os.path.join(_DATA_DIR, "incidents_log.json")
# Sliding windows store up to 4 hours to give more context but filter by horizon
_MAX_WINDOW_SEC = 4 * 60 * 60
_RECOMPUTE_MIN_INTERVAL_SEC = 60 # avoid excessive recomputation
_DEFAULT_HORIZON_SEC = int(os.getenv("PREDICTIVE_HORIZON_SECONDS", str(15 * 60)) or 900)
_MEMORY_THRESHOLD_PCT = float(os.getenv("MEMORY_USAGE_THRESHOLD_PERCENT", "85") or 85.0)
# In-memory buffers: (ts_seconds, value)
_values_error_rate: Deque[Tuple[float, float]] = deque(maxlen=240) # roughly 1/min for 4h
_values_latency: Deque[Tuple[float, float]] = deque(maxlen=240)
_values_memory: Deque[Tuple[float, float]] = deque(maxlen=240)
_last_recompute_ts: float = 0.0
# v7 – model refinement & feedback configuration
_MODEL_MODE = os.getenv("PREDICTIVE_MODEL", "exp_smoothing").lower()
_HALFLIFE_MINUTES: float = float(os.getenv("PREDICTIVE_HALFLIFE_MINUTES", "30") or 30.0)
_HALFLIFE_MIN_MIN = 5.0
_HALFLIFE_MIN_MAX = 120.0
_FEEDBACK_MIN_INTERVAL_SEC = int(os.getenv("PREDICTIVE_FEEDBACK_INTERVAL_SEC", "300") or 300)
_CLEANUP_INTERVAL_SEC = int(os.getenv("PREDICTIVE_CLEANUP_INTERVAL_SEC", "3600") or 3600)
_MAX_PREDICTION_AGE_SEC = int(os.getenv("PREDICTION_MAX_AGE_SECONDS", "86400") or 86400)
_last_feedback_ts: float = 0.0
_last_cleanup_ts: float = 0.0
_counted_prevented_ids: set[str] = set()
[תיעוד]
@dataclass
class Trend:
metric: str
slope_per_minute: float
intercept: float
current_value: float
threshold: float
predicted_cross_ts: Optional[float]
def _ensure_dirs() -> None:
try:
os.makedirs(_DATA_DIR, exist_ok=True)
except Exception:
pass
def _now() -> float:
return time.time()
def _linear_regression(points: List[Tuple[float, float]]) -> Tuple[float, float]:
"""Return (slope_per_minute, intercept) over time expressed in minutes.
points: list of (ts_seconds, value)
"""
try:
if not points:
return 0.0, 0.0
if len(points) == 1:
return 0.0, float(points[0][1])
# Convert to minutes relative to first point to improve conditioning
t0 = float(points[0][0])
xs = [((p[0] - t0) / 60.0) for p in points]
ys = [float(p[1]) for p in points]
n = float(len(xs))
sum_x = sum(xs)
sum_y = sum(ys)
sum_xx = sum(x * x for x in xs)
sum_xy = sum(x * y for x, y in zip(xs, ys))
denom = (n * sum_xx - sum_x * sum_x)
if denom == 0.0:
return 0.0, ys[-1]
slope = (n * sum_xy - sum_x * sum_y) / denom
# intercept with respect to minute-axis origin at t0
intercept = (sum_y - slope * sum_x) / n
# Convert intercept back to absolute time origin: y = slope*(t_minute - 0) + intercept
return float(slope), float(intercept)
except Exception:
return 0.0, 0.0
def _exp_weighted_regression(points: List[Tuple[float, float]], halflife_minutes: float) -> Tuple[float, float]:
"""Exponentially-weighted linear regression.
Returns (slope_per_minute, intercept) like _linear_regression but uses
exponential weights so that recent observations matter more.
halflife_minutes controls weight decay (~0.5 weight every halflife).
"""
try:
if not points:
return 0.0, 0.0
if len(points) == 1:
return 0.0, float(points[0][1])
t0 = float(points[0][0])
xs = [((p[0] - t0) / 60.0) for p in points]
ys = [float(p[1]) for p in points]
# Compute weights with exponential decay by age (in minutes)
halflife = max(_HALFLIFE_MIN_MIN, min(_HALFLIFE_MIN_MAX, float(halflife_minutes or 30.0)))
if halflife <= 0.0:
halflife = 30.0
# w = 0.5 ** (age / halflife)
ages = [float(xs[-1] - x) for x in xs]
ws = [pow(0.5, (age / halflife)) for age in ages]
Sw = sum(ws)
if Sw <= 0.0:
return 0.0, ys[-1]
Sx = sum(w * x for w, x in zip(ws, xs))
Sy = sum(w * y for w, y in zip(ws, ys))
Sxx = sum(w * x * x for w, x in zip(ws, xs))
Sxy = sum(w * x * y for w, x, y in zip(ws, xs, ys))
denom = (Sw * Sxx - Sx * Sx)
if denom == 0.0:
return 0.0, ys[-1]
slope = (Sw * Sxy - Sx * Sy) / denom
intercept = (Sy - slope * Sx) / Sw
return float(slope), float(intercept)
except Exception:
return 0.0, 0.0
def _evict_old(points: Deque[Tuple[float, float]], min_ts: float) -> None:
try:
while points and points[0][0] < min_ts:
points.popleft()
except Exception:
return
def _get_thresholds() -> Dict[str, float]:
"""Fetch adaptive thresholds for error_rate_percent and latency_seconds.
Returns zeros when unavailable (treated as disabled).
"""
try:
from alert_manager import get_thresholds_snapshot # type: ignore
snap = get_thresholds_snapshot() or {}
return {
"error_rate_percent": float(snap.get("error_rate_percent", {}).get("threshold", 0.0) or 0.0),
"latency_seconds": float(snap.get("latency_seconds", {}).get("threshold", 0.0) or 0.0),
}
except Exception:
return {"error_rate_percent": 0.0, "latency_seconds": 0.0}
def _get_current_values() -> Tuple[float, float, float]:
"""Return (error_rate_percent, latency_seconds, memory_usage_percent)."""
err = lat = mem = 0.0
try:
from alert_manager import ( # type: ignore
get_current_error_rate_percent,
get_current_avg_latency_seconds,
)
# חשוב: פעולות Predictive (כולל ריסטארט וורקר) צריכות להתבסס על בריאות פנימית בלבד.
# שגיאות שמקורן בשירותים חיצוניים / Circuit Breaker יכולות להקפיץ את שיעור השגיאות,
# אבל לא מעידות שהוורקר עצמו "תקול".
err = float(get_current_error_rate_percent(window_sec=5 * 60, source="internal"))
lat = float(get_current_avg_latency_seconds(window_sec=5 * 60, source="internal"))
except Exception:
err, lat = 0.0, 0.0
try:
if psutil is not None:
mem = float(psutil.virtual_memory().percent)
else:
mem = 0.0
except Exception:
mem = 0.0
return err, lat, mem
[תיעוד]
def note_observation(
*,
error_rate_percent: Optional[float] = None,
latency_seconds: Optional[float] = None,
memory_usage_percent: Optional[float] = None,
ts: Optional[float] = None,
) -> None:
"""Record a single observation into sliding windows.
Values default to current snapshot from alert_manager/psutil.
"""
try:
t = float(ts if ts is not None else _now())
if error_rate_percent is None or latency_seconds is None or memory_usage_percent is None:
cur_err, cur_lat, cur_mem = _get_current_values()
error_rate_percent = cur_err if error_rate_percent is None else error_rate_percent
latency_seconds = cur_lat if latency_seconds is None else latency_seconds
memory_usage_percent = cur_mem if memory_usage_percent is None else memory_usage_percent
_values_error_rate.append((t, float(error_rate_percent)))
_values_latency.append((t, float(latency_seconds)))
_values_memory.append((t, float(memory_usage_percent)))
# Evict old
min_ts = t - _MAX_WINDOW_SEC
_evict_old(_values_error_rate, min_ts)
_evict_old(_values_latency, min_ts)
_evict_old(_values_memory, min_ts)
except Exception:
return
def _predict_cross(
metric: str,
points: Deque[Tuple[float, float]],
current_value: float,
threshold: float,
now_ts: float,
horizon_sec: int,
) -> Trend:
try:
# Focus trend estimation on recent window equal to prediction horizon
# to better reflect current direction (reduces influence of stale data).
cutoff = now_ts - float(horizon_sec)
pts = [p for p in list(points) if p[0] >= cutoff] or list(points)
if _MODEL_MODE == "exp_smoothing":
slope_min, intercept = _exp_weighted_regression(pts, _HALFLIFE_MINUTES)
else:
slope_min, intercept = _linear_regression(pts)
predicted_ts: Optional[float] = None
# If we've already crossed the threshold, treat as "now"
if threshold > 0.0 and current_value >= threshold:
predicted_ts = now_ts
# Otherwise, only rising trends matter for preemptive actions and predictions
elif slope_min > 0.0 and threshold > 0.0:
# Solve for y = slope*(minutes_since_t0) + intercept crosses threshold within horizon
# Compute minutes from t0 to crossing
minutes_since_t0_to_cross = (threshold - intercept) / slope_min if slope_min != 0.0 else float("inf")
# Convert to absolute timestamp
t0 = float(pts[0][0]) if pts else now_ts
predicted = t0 + (minutes_since_t0_to_cross * 60.0)
if predicted >= now_ts and (predicted - now_ts) <= float(horizon_sec):
predicted_ts = predicted
return Trend(
metric=metric,
slope_per_minute=float(slope_min),
intercept=float(intercept),
current_value=float(current_value),
threshold=float(threshold),
predicted_cross_ts=predicted_ts,
)
except Exception:
return Trend(metric=metric, slope_per_minute=0.0, intercept=0.0, current_value=0.0, threshold=float(threshold or 0.0), predicted_cross_ts=None)
[תיעוד]
def evaluate_predictions(now_ts: Optional[float] = None, horizon_seconds: Optional[int] = None) -> List[Trend]:
"""Compute predictions for all metrics. Returns list of Trend objects."""
t = float(now_ts if now_ts is not None else _now())
horizon = int(horizon_seconds if horizon_seconds is not None else _DEFAULT_HORIZON_SEC)
try:
thrs = _get_thresholds()
err_thr = float(thrs.get("error_rate_percent", 0.0) or 0.0)
lat_thr = float(thrs.get("latency_seconds", 0.0) or 0.0)
# Current values from the tails of buffers to avoid double-fetches
cur_err = float(_values_error_rate[-1][1]) if _values_error_rate else 0.0
cur_lat = float(_values_latency[-1][1]) if _values_latency else 0.0
cur_mem = float(_values_memory[-1][1]) if _values_memory else 0.0
out: List[Trend] = []
out.append(_predict_cross("error_rate_percent", _values_error_rate, cur_err, err_thr, t, horizon))
out.append(_predict_cross("latency_seconds", _values_latency, cur_lat, lat_thr, t, horizon))
out.append(_predict_cross("memory_usage_percent", _values_memory, cur_mem, _MEMORY_THRESHOLD_PCT, t, horizon))
return out
except Exception:
return []
def _append_prediction_record(record: Dict[str, Any]) -> None:
try:
_ensure_dirs()
line = json.dumps(record, ensure_ascii=False)
with open(_PREDICTIONS_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception:
return
def _cleanup_old_predictions(now_ts: Optional[float] = None) -> None:
"""Remove predictions older than configured max age from JSONL file.
Best-effort: rewrites the file in place under data/ only.
"""
try:
t = float(now_ts if now_ts is not None else _now())
global _last_cleanup_ts
if (t - _last_cleanup_ts) < float(_CLEANUP_INTERVAL_SEC):
return
_last_cleanup_ts = t
if not os.path.exists(_PREDICTIONS_FILE):
return
cutoff_iso = datetime.fromtimestamp(t - float(_MAX_PREDICTION_AGE_SEC), timezone.utc)
# Read all lines and keep only recent ones
kept: List[str] = []
with open(_PREDICTIONS_FILE, "r", encoding="utf-8") as f:
for line in f:
s = (line or "").strip()
if not s:
continue
try:
obj = json.loads(s)
ts_str = obj.get("ts")
if not ts_str:
continue
ts_dt = datetime.fromisoformat(str(ts_str))
if ts_dt >= cutoff_iso:
kept.append(json.dumps(obj, ensure_ascii=False))
except Exception:
continue
# Write back safely to the same file under data/
_ensure_dirs()
with open(_PREDICTIONS_FILE, "w", encoding="utf-8") as out:
for k in kept:
out.write(k + "\n")
except Exception:
return
def _log_prediction(trend: Trend) -> None:
try:
rec = {
"prediction_id": _safe_uuid(),
"ts": datetime.now(timezone.utc).isoformat(),
"metric": trend.metric,
"current_value": trend.current_value,
"threshold": trend.threshold,
"slope_per_minute": trend.slope_per_minute,
"predicted_cross_ts": datetime.fromtimestamp(trend.predicted_cross_ts, timezone.utc).isoformat() if trend.predicted_cross_ts else None,
"horizon_seconds": _DEFAULT_HORIZON_SEC,
}
_append_prediction_record(rec)
try:
emit_event("PREDICTIVE_INCIDENT_DETECTED", severity="warn", metric=trend.metric)
except Exception:
pass
try:
if _predicted_ctr is not None:
_predicted_ctr.labels(metric=str(trend.metric)).inc() # type: ignore[attr-defined]
except Exception:
pass
except Exception:
return
def _safe_uuid() -> str:
try:
import uuid
return str(uuid.uuid4())
except Exception:
return ""
[תיעוד]
def reset_state_for_tests() -> None:
"""Clear in-memory state for unit tests (not for production use)."""
try:
_values_error_rate.clear()
_values_latency.clear()
_values_memory.clear()
global _last_recompute_ts, _last_feedback_ts, _last_cleanup_ts
_last_recompute_ts = 0.0
_last_feedback_ts = 0.0
_last_cleanup_ts = 0.0
global _HALFLIFE_MINUTES
_HALFLIFE_MINUTES = float(os.getenv("PREDICTIVE_HALFLIFE_MINUTES", "30") or 30.0)
_counted_prevented_ids.clear()
except Exception:
return
[תיעוד]
def maybe_recompute_and_preempt(now_ts: Optional[float] = None) -> List[Trend]:
"""Throttled evaluation. If any metric predicted to breach within horizon, trigger preemptive actions.
Returns the trends (for observability/tests).
"""
global _last_recompute_ts
t = float(now_ts if now_ts is not None else _now())
try:
if (t - _last_recompute_ts) < _RECOMPUTE_MIN_INTERVAL_SEC:
# Throttled: avoid side effects and also skip heavy computations.
# Return empty and let the next interval do work once.
return []
_last_recompute_ts = t
trends = evaluate_predictions(now_ts=t)
for tr in trends:
if tr.predicted_cross_ts is None:
continue
_log_prediction(tr)
_trigger_preemptive_action(tr)
# Maintenance: cleanup old predictions and run feedback loop
_cleanup_old_predictions(now_ts=t)
_feedback_loop(now_ts=t)
except Exception:
return []
else:
# Advance only when full evaluation completed without uncaught errors
_last_recompute_ts = t
return trends
def _trigger_preemptive_action(tr: Trend) -> None:
try:
action = None
# דילוג מוחלט על פעולות מנע אם SAFE_MODE/דגל ביטול פעיל
if str(os.getenv("SAFE_MODE", "")).lower() in ("1", "true", "yes", "y", "on") or \
str(os.getenv("DISABLE_PREEMPTIVE_ACTIONS", "")).lower() in ("1", "true", "yes", "y", "on"):
try:
emit_event("PREDICTIVE_ACTION_SKIPPED", severity="info", metric=tr.metric, reason="safe_mode")
except Exception:
pass
return
if tr.metric == "latency_seconds":
# Clear stale cache (fallback to clear_all when unknown)
try:
# Prefer clear_stale when available
if _cache is not None and hasattr(_cache, "clear_stale"):
deleted = int(_cache.clear_stale() or 0)
elif _cache is not None and hasattr(_cache, "clear_all"):
deleted = int(_cache.clear_all() or 0)
else:
deleted = 0
except Exception:
deleted = 0
action = f"cache_clear({deleted})"
elif tr.metric == "memory_usage_percent":
try:
import gc
gc.collect()
except Exception:
pass
action = "gc.collect()"
elif tr.metric == "error_rate_percent":
# Controlled restart of a single worker (log-only in dev)
action = "restart_worker:1"
try:
emit_event(
"service_restart_attempt",
severity="anomaly",
service="worker-1",
reason="predictive",
handled=True,
)
except Exception:
pass
emit_event("PREDICTIVE_ACTION_TRIGGERED", severity="warn", metric=tr.metric, action=action or "none")
except Exception:
return
[תיעוד]
def get_recent_predictions(limit: int = 5) -> List[Dict[str, Any]]:
try:
if limit <= 0:
return []
if not os.path.exists(_PREDICTIONS_FILE):
return []
items: List[Dict[str, Any]] = []
with open(_PREDICTIONS_FILE, "r", encoding="utf-8") as f:
for line in f:
t = (line or "").strip()
if not t:
continue
try:
items.append(json.loads(t))
except Exception:
continue
return items[-limit:]
except Exception:
return []
def _load_actual_incidents(limit: int = 200) -> List[Dict[str, Any]]:
try:
if not os.path.exists(_INCIDENTS_FILE):
return []
items: List[Dict[str, Any]] = []
with open(_INCIDENTS_FILE, "r", encoding="utf-8") as f:
for line in f:
s = (line or "").strip()
if not s:
continue
try:
items.append(json.loads(s))
except Exception:
continue
return items[-limit:]
except Exception:
return []
def _feedback_loop(now_ts: Optional[float] = None) -> None:
"""Adaptive feedback: compare recent predictions to actual incidents.
- Compute precision over recent window (~24h) and export prediction_accuracy_percent gauge.
- If many false positives observed, increase halflife (less reactive). If many misses (actuals without prior predictions),
decrease halflife (more reactive). Bounds are enforced.
- Update prevented_incidents_total based on correlation between predictions and lack of subsequent incidents.
"""
try:
t = float(now_ts if now_ts is not None else _now())
global _last_feedback_ts, _HALFLIFE_MINUTES
if (t - _last_feedback_ts) < float(_FEEDBACK_MIN_INTERVAL_SEC):
return
_last_feedback_ts = t
# Load recent horizon of predictions and incidents (24h)
preds = get_recent_predictions(limit=1000)
incidents = _load_actual_incidents(limit=1000)
if not preds:
# No predictions; set accuracy to 100 if also no incidents, else 0
if _accuracy_gauge is not None:
acc = 100.0 if not incidents else 0.0
try:
_accuracy_gauge.set(acc)
except Exception:
pass
return
cutoff = datetime.fromtimestamp(t - float(_MAX_PREDICTION_AGE_SEC), timezone.utc)
recent_preds = [p for p in preds if _safe_parse_dt(p.get("ts")) and _safe_parse_dt(p.get("ts")) >= cutoff]
recent_incs = [i for i in incidents if _safe_parse_dt(i.get("ts")) and _safe_parse_dt(i.get("ts")) >= cutoff]
# Match predictions to incidents by metric and time window (prediction within 30m before incident)
matched_pred_ids: set[str] = set()
matched_inc_ids: set[str] = set()
for inc in recent_incs:
inc_ts = _safe_parse_dt(inc.get("ts"))
inc_metric = str(inc.get("metric") or "")
if not inc_ts or not inc_metric:
continue
for p in recent_preds:
if p.get("prediction_id") in matched_pred_ids:
continue
if str(p.get("metric") or "") != inc_metric:
continue
# If prediction timestamp is within 30m before incident or immediate
p_ts = _safe_parse_dt(p.get("ts"))
if not p_ts:
continue
delta = (inc_ts - p_ts).total_seconds()
if 0.0 <= delta <= 1800.0:
matched_pred_ids.add(str(p.get("prediction_id")))
matched_inc_ids.add(str(inc.get("incident_id", "")))
break
true_positives = len(matched_pred_ids)
total_predictions = len(recent_preds)
precision = (true_positives / total_predictions) * 100.0 if total_predictions > 0 else 0.0
if _accuracy_gauge is not None:
try:
_accuracy_gauge.set(round(max(0.0, min(100.0, precision)), 2))
except Exception:
pass
# Adaptive halflife tuning
# If precision < 50% and predictions are many -> too sensitive, increase halflife by 20%
# If incidents exist without matches (misses) and predictions few -> not sensitive enough, decrease by 20%
pred_many = total_predictions >= 10
misses = max(0, len(recent_incs) - len(matched_inc_ids))
adjust = None
if pred_many and precision < 50.0:
adjust = 1.2
elif misses >= 3 and total_predictions <= 5:
adjust = 1.0 / 1.2
if adjust is not None:
try:
_HALFLIFE_MINUTES = max(_HALFLIFE_MIN_MIN, min(_HALFLIFE_MIN_MAX, _HALFLIFE_MINUTES * float(adjust)))
except Exception:
pass
# Prevented incidents heuristic: predictions for which no incident occurred within 45m afterwards
for p in recent_preds:
pid = str(p.get("prediction_id") or "")
if not pid or pid in _counted_prevented_ids:
continue
p_ts = _safe_parse_dt(p.get("ts"))
metric = str(p.get("metric") or "")
if not p_ts or not metric:
continue
horizon_after = p_ts + timedelta(minutes=45)
occurred = False
for inc in recent_incs:
inc_ts = _safe_parse_dt(inc.get("ts"))
if not inc_ts:
continue
if str(inc.get("metric") or "") != metric:
continue
if p_ts <= inc_ts <= horizon_after:
occurred = True
break
if not occurred and _prevented_ctr is not None:
try:
_prevented_ctr.labels(metric=metric).inc()
_counted_prevented_ids.add(pid)
except Exception:
pass
except Exception:
return
def _safe_parse_dt(dt_str: Optional[str]) -> Optional[datetime]:
try:
if not dt_str:
return None
return datetime.fromisoformat(str(dt_str))
except Exception:
return None
[תיעוד]
def get_trend_snapshot() -> Dict[str, Dict[str, Any]]:
"""Return a snapshot with direction and slope for UI (/predict).
Keys are metrics; values include direction emoji and slope.
"""
try:
trends = evaluate_predictions()
out: Dict[str, Dict[str, Any]] = {}
for tr in trends:
if tr.slope_per_minute > 1e-6:
direction = "🔴" # rising
elif tr.slope_per_minute < -1e-6:
direction = "🟢" # decreasing
else:
direction = "⚪" # flat
out[tr.metric] = {
"direction": direction,
"slope_per_minute": round(tr.slope_per_minute, 6),
"current_value": round(tr.current_value, 6),
"threshold": round(tr.threshold, 6),
"predicted_cross_at": tr.predicted_cross_ts,
}
return out
except Exception:
return {}