"""
מנהל Cache מתקדם עם Redis
Advanced Cache Manager with Redis
"""
import json
import logging
import os
import time
import re
import hashlib
from functools import wraps
from typing import Any, Dict, List, Optional, Union, Callable, TypeVar, ParamSpec, Coroutine, cast, Tuple
import copy
import random
import threading
try:
import redis
except Exception: # redis אינו חובה – נריץ במצב מושבת אם חסר
redis = None
import asyncio
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
# מטריקות Prometheus (best-effort) — רישום חסין כפילויות
try: # pragma: no cover
from prometheus_client import Counter, Histogram, REGISTRY
except Exception: # pragma: no cover
Counter = Histogram = REGISTRY = None
def _ensure_metric(name: str, create_fn):
"""יוצר או מחזיר מטריקה קיימת באותו שם מ-REGISTRY.
המטרה: למנוע ValueError על רישום כפול כאשר המודול נטען מחדש (importlib.reload).
"""
# אם prometheus_client לא זמין, נחזיר None כדי שהקוד יקצר דרך best-effort
if REGISTRY is None:
try:
return create_fn() # ייתכן שמחזיר None ממילא
except Exception:
return None
try:
# שימוש ב-API פנימי אך יציב יחסית כדי לאתר קולקטור קיים
existing = getattr(REGISTRY, "_names_to_collectors", {}).get(name)
if existing is not None:
return existing
except Exception:
# נפילה שקטה — ננסה ליצור ונחזור אם תתרחש כפילות
pass
try:
return create_fn()
except Exception:
# במקרה של ValueError: Duplicated timeseries... נחפש ונחזיר את הקיים
try:
return getattr(REGISTRY, "_names_to_collectors", {}).get(name)
except Exception:
return None
# Cache metrics (labels kept minimal) — נרשמות באופן אידמפוטנטי
cache_hits_total = _ensure_metric(
"cache_hits_total", lambda: Counter("cache_hits_total", "Total cache hits", ["backend"]) if Counter else None
)
cache_misses_total = _ensure_metric(
"cache_misses_total", lambda: Counter("cache_misses_total", "Total cache misses", ["backend"]) if Counter else None
)
cache_op_duration_seconds = _ensure_metric(
"cache_op_duration_seconds",
lambda: Histogram(
"cache_op_duration_seconds",
"Cache operation duration in seconds",
["operation", "backend"],
) if Histogram else None,
)
# ===================== Dynamic TTL utilities =====================
[תיעוד]
class DynamicTTL:
"""ניהול TTL דינמי לפי סוג תוכן וקונטקסט.
הערכים כאן מייצגים TTL בסיסי בשניות עבור סוגי תוכן שכיחים.
"""
BASE_TTL: Dict[str, int] = {
"user_stats": 600, # 10 דקות
"file_content": 3600, # שעה
"file_list": 300, # 5 דקות
"markdown_render": 1800, # 30 דקות
"search_results": 180, # 3 דקות
"public_stats": 600, # 10 דקות
"bookmarks": 120, # 2 דקות
"tags": 300, # 5 דקות
"settings": 60, # דקה
"sticky_summary": 60, # דקה (מרגיע polling)
# Collections (My Collections) – TTL מומלץ לפי המדריך
"collections_list": 60,
"collections_detail": 30,
"collections_items": 180,
}
[תיעוד]
@classmethod
def calculate_ttl(cls, content_type: str, context: Dict[str, Any] | None = None) -> int:
"""חשב TTL בסיסי מוכוון קונטקסט.
מבטיח גבולות בטוחים: מינימום 60 שניות, מקסימום 7200 (שעתיים).
"""
ctx: Dict[str, Any] = context or {}
base_ttl: int = int(cls.BASE_TTL.get(content_type, 300))
# התאמות לפי קונטקסט
if bool(ctx.get("is_favorite")):
base_ttl = int(base_ttl * 1.5)
try:
last_mod_hours = float(ctx.get("last_modified_hours_ago", 24))
except Exception:
last_mod_hours = 24.0
if last_mod_hours < 1.0:
base_ttl = int(base_ttl * 0.5)
if str(ctx.get("access_frequency", "low")).lower() == "high":
base_ttl = int(base_ttl * 2)
if str(ctx.get("user_tier", "regular")).lower() == "premium":
# משתמשי פרימיום יעדיפו עדכונים מהירים
base_ttl = int(base_ttl * 0.7)
return max(60, min(base_ttl, 7200))
[תיעוד]
class ActivityBasedTTL:
"""התאמת TTL לפי שעות פעילות (best-effort)."""
@staticmethod
def _now_hour() -> int:
try:
from datetime import datetime
return int(datetime.now().hour)
except Exception:
return 12
[תיעוד]
@classmethod
def get_activity_multiplier(cls) -> float:
hour = cls._now_hour()
if 9 <= hour < 18: # שעות שיא – קצר יותר
return 0.7
if 18 <= hour < 23: # ערב – בינוני
return 1.0
return 1.5 # לילה – ארוך יותר
[תיעוד]
@classmethod
def adjust_ttl(cls, base_ttl: int) -> int:
try:
mult = float(cls.get_activity_multiplier())
except Exception:
mult = 1.0
# הוסף jitter קטן למניעת thundering herd
ttl = int(max(1, base_ttl) * mult)
jitter = int(max(1, ttl // 10)) # עד ±10%
try:
ttl = ttl + random.randint(-jitter, jitter)
except Exception:
pass
return max(60, min(int(ttl), 7200))
[תיעוד]
def build_cache_key(*parts: Any) -> str:
"""בניית מפתח cache יעיל ומובנה מהחלקים הנתונים.
- מסנן חלקים ריקים
- ממיר לתווים בטוחים (רווחים/סלאשים)
- מגביל אורך ומוסיף hash קצר במידת הצורך
"""
from hashlib import sha256
clean_parts: List[str] = [str(p) for p in parts if p not in (None, "")]
key: str = ":".join(clean_parts)
key = key.replace(" ", "_").replace("/", "-")
if len(key) > 200:
key_hash = sha256(key.encode("utf-8")).hexdigest()[:8]
key = f"{key[:150]}:{key_hash}"
return key
[תיעוד]
class CacheManager:
"""מנהל Cache מתקדם עם Redis"""
[תיעוד]
def __init__(self):
# Debug זמני לניתוח Hit/Miss/Set (ברירת מחדל: כבוי)
# ניתן להפעיל ידנית דרך enable_debug_for(seconds)
self.debug_until: float = 0.0
self.redis_client = None
self.is_enabled = False
self.connect()
[תיעוד]
def enable_debug_for(self, seconds: int) -> float:
"""הפעל/הארך חלון דיבאג זמני ללוגים של HIT/MISS/SET.
- אם seconds <= 0: מכבה דיבאג (debug_until=0)
- אחרת: מאריך (לא מקצר) את החלון כך שיסתיים לפחות בעוד seconds שניות מהעכשיו
מחזיר את timestamp החדש של debug_until.
"""
try:
sec = int(seconds)
except Exception:
sec = 0
if sec <= 0:
self.debug_until = 0.0
return float(self.debug_until)
now = float(time.time())
new_until = float(now + sec)
try:
self.debug_until = float(max(float(getattr(self, "debug_until", 0.0) or 0.0), new_until))
except Exception:
self.debug_until = float(new_until)
return float(self.debug_until)
def _debug_active(self) -> bool:
try:
return float(time.time()) < float(getattr(self, "debug_until", 0.0) or 0.0)
except Exception:
return False
def _debug_log(self, action: str, key: str, **extra: Any) -> None:
"""לוג דיבאג מבוקר־זמן עבור פעולות קאש."""
if not self._debug_active():
return
try:
safe_key = str(key)
if len(safe_key) > 300:
safe_key = safe_key[:300] + "…"
if extra:
logger.info("cache %s key=%s extra=%s", str(action), safe_key, extra)
else:
logger.info("cache %s key=%s", str(action), safe_key)
except Exception:
# דיבאג לא אמור להפיל זרימה
return
[תיעוד]
def connect(self):
"""התחברות ל-Redis"""
try:
if redis is None:
self.is_enabled = False
logger.info("חבילת redis לא מותקנת – Cache מושבת")
return
# קונפיג דרך pydantic אם זמין, אחרת ENV ישיר – לשמירת תאימות
try:
from config import config as _cfg
except Exception:
_cfg = None
# קדימות ל-ENV: אם המשתנה הוגדר (גם אם ריק) זה אות להשבתה מפורשת בטסטים/CI
env_url = os.getenv('REDIS_URL')
if env_url is not None:
redis_url = env_url
else:
redis_url = getattr(_cfg, 'REDIS_URL', None) if _cfg is not None else None
if not redis_url or str(redis_url).strip() == "" or str(redis_url).startswith("disabled"):
self.is_enabled = False
logger.info("Redis אינו מוגדר - Cache מושבת")
return
# כיבוד timeouts מה-ENV, עם ברירות מחדל שמרניות ב-SAFE_MODE
safe_mode = str(os.getenv("SAFE_MODE", "")).lower() in ("1", "true", "yes", "y", "on")
# כבד קונפיג מפורש גם אם הערך 0.0, אל תשתמש ב-or שמבטל 0
connect_timeout_cfg = (getattr(_cfg, 'REDIS_CONNECT_TIMEOUT', None) if _cfg is not None else None)
socket_timeout_cfg = (getattr(_cfg, 'REDIS_SOCKET_TIMEOUT', None) if _cfg is not None else None)
connect_timeout_env = os.getenv("REDIS_CONNECT_TIMEOUT")
socket_timeout_env = os.getenv("REDIS_SOCKET_TIMEOUT")
if connect_timeout_cfg is not None:
socket_connect_timeout = float(connect_timeout_cfg)
elif connect_timeout_env is not None:
socket_connect_timeout = float(connect_timeout_env)
else:
socket_connect_timeout = float("1" if safe_mode else "5")
if socket_timeout_cfg is not None:
socket_timeout = float(socket_timeout_cfg)
elif socket_timeout_env is not None:
socket_timeout = float(socket_timeout_env)
else:
socket_timeout = float("1" if safe_mode else "5")
try:
max_conns_env = (
(str(getattr(_cfg, 'REDIS_MAX_CONNECTIONS', '') or '') if _cfg is not None else '')
or os.getenv("REDIS_MAX_CONNECTIONS")
)
max_connections = int(max_conns_env) if max_conns_env not in (None, "") else int(getattr(_cfg, 'REDIS_MAX_CONNECTIONS', 50) if _cfg is not None else 50)
except Exception:
max_connections = int(getattr(_cfg, 'REDIS_MAX_CONNECTIONS', 50) if _cfg is not None else 50)
self.redis_client = redis.from_url(
redis_url,
decode_responses=True,
socket_connect_timeout=socket_connect_timeout,
socket_timeout=socket_timeout,
retry_on_timeout=True,
health_check_interval=30,
max_connections=max_connections,
)
# בדיקת חיבור
self.redis_client.ping()
self.is_enabled = True
logger.info("התחברות ל-Redis הצליחה - Cache מופעל")
except Exception as e:
logger.warning(f"לא ניתן להתחבר ל-Redis: {e} - Cache מושבת")
self.is_enabled = False
def _make_key(self, prefix: str, *args, **kwargs) -> str:
"""יוצר מפתח cache ייחודי"""
def _clean_repr(s: str) -> str:
# מנקה כתובות זיכרון נפוצות ב-repr ברירת מחדל: "0x7f...."
return re.sub(r"0x[0-9a-fA-F]+", "0x", str(s or ""))
def _stable_scalar(v: Any) -> Optional[str]:
"""החזר ייצוג מחרוזתי לטיפוסים פשוטים בלבד; אחרת None.
חשוב: מחרוזת ריקה היא ערך לגיטימי ולכן אינה משמשת כסנטינל.
"""
if v is None:
return "None"
if isinstance(v, (str, int, float, bool)):
# str("") חייב להישמר כ-"" ולא להיחשב "לא scalar"
return str(v)
if isinstance(v, (bytes, bytearray)):
h = hashlib.sha256(bytes(v)).hexdigest()[:16]
return f"bytes:{h}"
return None
def _stable_part(v: Any, *, _depth: int = 0) -> str:
# 1) טיפוסים פשוטים
scalar = _stable_scalar(v)
if scalar is not None:
return scalar
# 2) מבנים מובנים עם סדר דטרמיניסטי
if isinstance(v, (list, tuple)):
if _depth >= 3:
try:
return f"[len={len(v)}]"
except Exception:
return "[len=?]"
return "[" + ",".join(_stable_part(x, _depth=_depth + 1) for x in v) + "]"
if isinstance(v, set):
if _depth >= 3:
try:
return f"{{len={len(v)}}}"
except Exception:
return "{len=?}"
return "{" + ",".join(sorted(_stable_part(x, _depth=_depth + 1) for x in v)) + "}"
if isinstance(v, dict):
if _depth >= 3:
try:
return f"{{len={len(v)}}}"
except Exception:
return "{len=?}"
try:
items = sorted(v.items(), key=lambda kv: str(kv[0]))
except Exception:
items = list(v.items())
# הגבלת גודל כדי למנוע מפתחות ענקיים
limited = items[:30]
return "{" + ",".join(
f"{_stable_part(k, _depth=_depth + 1)}={_stable_part(val, _depth=_depth + 1)}"
for k, val in limited
) + ("…" if len(items) > 30 else "") + "}"
# 3) אובייקטים עם מזהה מוכר: נשתמש רק במזהה (כדי למנוע כתובות זיכרון במפתח)
for attr in ("user_id", "id", "pk", "uuid"):
try:
if hasattr(v, attr):
ident = getattr(v, attr)
ident_scalar = _stable_scalar(ident)
if ident_scalar is not None:
return ident_scalar
return _clean_repr(str(ident))
except Exception:
continue
# 4) אובייקטים "רגילים" עם __dict__: נבנה fingerprint דטרמיניסטי מהתוכן (מוגבל עומק/גודל)
try:
cls = v.__class__
cls_name = f"{getattr(cls, '__module__', '')}.{getattr(cls, '__qualname__', getattr(cls, '__name__', 'object'))}"
except Exception:
cls_name = "object"
try:
if _depth < 3 and hasattr(v, "__dict__") and isinstance(getattr(v, "__dict__", None), dict):
d = cast(dict, getattr(v, "__dict__", {}) or {})
if d:
try:
items = sorted(d.items(), key=lambda kv: str(kv[0]))
except Exception:
items = list(d.items())
# דוגמים רק חלק מהשדות כדי להימנע מהעמסה/דליפה של נתונים כבדים
sampled: dict[str, str] = {}
for k, val in items[:25]:
try:
sampled[str(k)] = _stable_part(val, _depth=_depth + 1)
except Exception:
sampled[str(k)] = "<?>"
material = json.dumps(sampled, sort_keys=True, ensure_ascii=False)
h = hashlib.sha256(f"{cls_name}:{material}".encode("utf-8", errors="ignore")).hexdigest()[:16]
return f"{cls_name}:{h}"
except Exception:
# ניפול ל-fallback הבא
pass
# 5) fallback אחרון: hash דטרמיניסטי על בסיס class + repr נקי
try:
raw = _clean_repr(str(v))
except Exception:
raw = ""
material = f"{cls_name}:{raw}"
h = hashlib.sha256(material.encode("utf-8", errors="ignore")).hexdigest()[:16]
return f"{cls_name}:{h}"
key_parts: List[str] = [str(prefix)]
key_parts.extend(_stable_part(arg) for arg in args)
if kwargs:
try:
sorted_kwargs = sorted(kwargs.items(), key=lambda kv: str(kv[0]))
except Exception:
sorted_kwargs = list(kwargs.items())
key_parts.extend(f"{str(k)}:{_stable_part(v)}" for k, v in sorted_kwargs)
return ":".join(key_parts)
[תיעוד]
def get(self, key: str) -> Optional[Any]:
"""קבלת ערך מה-cache"""
if not self.is_enabled:
return None
backend = "redis"
timer_ctx = cache_op_duration_seconds.labels(operation="get", backend=backend).time() if cache_op_duration_seconds else None
try:
value = self.redis_client.get(key)
if value:
if cache_hits_total is not None:
cache_hits_total.labels(backend=backend).inc()
self._debug_log("HIT", key)
return json.loads(value)
if cache_misses_total is not None:
cache_misses_total.labels(backend=backend).inc()
self._debug_log("MISS", key)
except Exception as e:
logger.error(f"שגיאה בקריאה מ-cache: {e}")
finally:
try:
if timer_ctx:
timer_ctx() # stop timer
except Exception:
pass
return None
[תיעוד]
def set(self, key: str, value: Any, expire_seconds: int = 300) -> bool:
"""שמירת ערך ב-cache"""
if not self.is_enabled:
return False
backend = "redis"
timer_ctx = cache_op_duration_seconds.labels(operation="set", backend=backend).time() if cache_op_duration_seconds else None
try:
serialized = json.dumps(value, default=str, ensure_ascii=False)
# תמיכה בלקוחות ללא setex: ננסה set(ex=) או set+expire
client = self.redis_client
if hasattr(client, 'setex'):
ok = bool(client.setex(key, expire_seconds, serialized))
self._debug_log("SET", key, ok=ok, ttl=int(expire_seconds))
return ok
# חלק מהלקוחות תומכים ב-ex ב-set
try:
ok = bool(client.set(key, serialized, ex=expire_seconds))
self._debug_log("SET", key, ok=ok, ttl=int(expire_seconds))
return ok
except Exception:
pass
# נסה set ואז expire
ok = bool(client.set(key, serialized))
try:
_ = client.expire(key, int(expire_seconds))
except Exception:
pass
self._debug_log("SET", key, ok=ok, ttl=int(expire_seconds))
return ok
except Exception as e:
logger.error(f"שגיאה בכתיבה ל-cache: {e}")
return False
finally:
try:
if timer_ctx:
timer_ctx()
except Exception:
pass
# ===================== Dynamic TTL helpers =====================
[תיעוד]
def set_dynamic(
self,
key: str,
value: Any,
content_type: str,
context: Optional[Dict[str, Any]] = None,
) -> bool:
"""שמירה ב-cache עם TTL דינמי ותיעוד מינימלי במטריקות/לוגים."""
# חישוב TTL דינמי וביצוע התאמות פעילות + jitter
base_ttl = DynamicTTL.calculate_ttl(content_type, context or {})
adjusted_ttl = ActivityBasedTTL.adjust_ttl(base_ttl)
try:
logger.debug(
"cache_set_dynamic",
extra={
"key": str(key)[:120],
"ttl": int(adjusted_ttl),
"content_type": str(content_type),
},
)
except Exception:
pass
return self.set(key, value, int(adjusted_ttl))
[תיעוד]
def get_with_refresh(
self,
key: str,
refresh_func: Callable[[], Any],
*,
content_type: str,
context: Optional[Dict[str, Any]] = None,
) -> Any:
"""קריאה מ-cache; אם חסר – מחשב, שומר דינמית ומחזיר."""
cached_value = self.get(key)
if cached_value is not None:
return cached_value
fresh_value = refresh_func()
if fresh_value is not None:
try:
self.set_dynamic(key, fresh_value, content_type, context)
except Exception:
# Fail-open: אם שמירה נכשלה לא נשבור זרימה
pass
return fresh_value
[תיעוד]
def delete(self, key: str) -> bool:
"""מחיקת ערך מה-cache"""
if not self.is_enabled:
return False
backend = "redis"
timer_ctx = cache_op_duration_seconds.labels(operation="delete", backend=backend).time() if cache_op_duration_seconds else None
try:
return bool(self.redis_client.delete(key))
except Exception as e:
logger.error(f"שגיאה במחיקה מ-cache: {e}")
return False
finally:
try:
if timer_ctx:
timer_ctx()
except Exception:
pass
[תיעוד]
def delete_pattern(self, pattern: str) -> int:
"""מחיקת כל המפתחות שמתאימים לתבנית"""
if not self.is_enabled:
return 0
backend = "redis"
timer_ctx = cache_op_duration_seconds.labels(operation="delete_pattern", backend=backend).time() if cache_op_duration_seconds else None
try:
client = self.redis_client
deleted = 0
# תואם Redis MATCH pattern (במקרים של FakeRedis/scan_iter שלא מכבד match)
import fnmatch
# תקציב זמן כדי להימנע מחסימת תהליך במאגרים גדולים
budget_seconds = float(
os.getenv("CACHE_DELETE_PATTERN_BUDGET_SECONDS", os.getenv("CACHE_CLEAR_BUDGET_SECONDS", "5"))
)
deadline = time.time() + max(0.0, budget_seconds)
# שימוש בטוח ב-SCAN (אל תשתמש ב-KEYS!)
batch: List[str] = []
batch_size = 200
if hasattr(client, "scan_iter"):
iterator = client.scan_iter(match=pattern, count=500)
elif hasattr(client, "scan"):
# fallback ידני ל-SCAN אם scan_iter לא קיים (עדיין ללא KEYS)
def _scan_fallback(): # type: ignore[no-untyped-def]
cursor = 0
while True:
cursor, keys = client.scan(cursor=cursor, match=pattern, count=500)
for k in keys or []:
yield k
if int(cursor) == 0:
break
iterator = _scan_fallback()
elif hasattr(client, "keys"):
# fallback שמיועד *רק* ללקוחות Fake בטסטים.
# חשוב: ב-Redis אמיתי scan_iter קיים ולכן לא נגיע לכאן.
mod = str(getattr(getattr(client, "__class__", object), "__module__", "") or "")
if mod.startswith("redis"):
# ב-Redis אמיתי לא נרשה שימוש ב-KEYS
return 0
keys = client.keys(pattern)
if keys:
try:
return int(client.delete(*keys) or 0)
except Exception:
return 0
return 0
else:
# אין יכולת סריקה בטוחה -> אל תמחוק
return 0
for k in iterator:
if time.time() > deadline:
break
# הגנה נוספת: חלק מלקוחות Fake לא מכבדים match בפרמטרים של scan_iter
try:
if not fnmatch.fnmatch(str(k), str(pattern)):
continue
except Exception:
# אם לא ניתן להשוות, נמשיך (Fail-open עבור מחיקה מבוקרת)
continue
batch.append(k)
if len(batch) >= batch_size:
try:
deleted += int(client.delete(*batch) or 0)
except Exception:
pass
batch.clear()
if batch and time.time() <= deadline:
try:
deleted += int(client.delete(*batch) or 0)
except Exception:
pass
return int(deleted)
except Exception as e:
logger.error(f"שגיאה במחיקת תבנית מ-cache: {e}")
return 0
finally:
try:
if timer_ctx:
timer_ctx()
except Exception:
pass
[תיעוד]
def invalidate_user_cache(self, user_id: int) -> int:
"""מחיקת כל ה-cache של משתמש ספציפי"""
# התאמה רחבה יותר למפתחות כפי שהם נוצרים כיום ב-_make_key
# המפתחות נראים כך: "<prefix>:<func_name>:<self>:<user_id>:..."
# לכן נמחק לפי prefixes הרלוונטיים ולפי user_id גולמי.
total_deleted = 0
try:
patterns = [
f"*:user:{user_id}:*", # תמיכה לאחור אם יתווסף prefix "user:" בעתיד
f"user_files:*:{user_id}:*", # רשימת קבצי משתמש
f"latest_version:*:{user_id}:*", # גרסה אחרונה לקובץ
f"search_code:*:{user_id}:*", # תוצאות חיפוש למשתמש
f"user_stats:*:{user_id}", # סטטיסטיקות משתמש — מסתיים ב-:<user_id>
f"user_stats:*:{user_id}:*", # גיבוי: אם יתווספו פרמטרים/סופיות בעתיד
f"*:{user_id}:*", # נפילה לאחור: כל מפתח שמכיל את המזהה
f"*:{user_id}", # נפילה לאחור: מפתחות שמסתיימים במזהה
]
for p in patterns:
total_deleted += int(self.delete_pattern(p) or 0)
except Exception as e:
logger.warning(f"invalidate_user_cache failed for user {user_id}: {e}")
# חשוב: זהו מספר המחיקות בפועל כפי ש-Redis החזיר מהפקודת DEL (לא רק מספר דפוסים).
logger.info(f"invalidate_user_cache: נמחקו בפועל {total_deleted} מפתחות מ-Redis עבור משתמש {user_id}")
return total_deleted
[תיעוד]
def clear_all(self) -> int:
"""ניקוי כל המטמון באופן מבוקר.
- אם Redis מושבת – מחזיר 0.
- אם Redis פעיל – מוחק את כל המפתחות באמצעות SCAN+DEL (best-effort).
"""
if not self.is_enabled:
return 0
deleted = 0
try:
client = self.redis_client
# תקציב זמן לניקוי כדי לא לחסום worker אם Redis איטי
budget_seconds = float(os.getenv("CACHE_CLEAR_BUDGET_SECONDS", "5"))
deadline = time.time() + max(0.0, budget_seconds)
if hasattr(client, 'scan_iter'):
for k in client.scan_iter(match='*', count=500):
if time.time() > deadline:
break
try:
deleted += int(client.delete(k) or 0)
except Exception:
pass
if time.time() > deadline:
break
else:
# Fallback: keys + delete
keys = client.keys('*')
if keys:
deleted = int(client.delete(*keys) or 0)
except Exception as e:
logger.warning(f"clear_all failed: {e}")
logger.info(f"ניקוי cache מלא: {deleted} מפתחות נמחקו")
return deleted
# ===================== Invalidation helpers (tag/pattern-based) =====================
[תיעוד]
def clear_stale(self, max_scan: int = 1000, ttl_seconds_threshold: int = 60) -> int:
"""מחיקת מפתחות שכבר עומדים לפוג ("stale") בצורה עדינה.
היגיון:
- אם Redis מושבת – החזר 0.
- סריקה מדורגת (SCAN) של עד max_scan מפתחות.
- מחיקה רק למפתחות עם TTL חיובי קטן מ-ttl_seconds_threshold, או TTL שלילי המציין שאינו קיים.
- לא מוחקים מפתחות ללא TTL (ttl == -1) כדי להימנע מפגיעה בקאש ארוך-חיים.
"""
if not self.is_enabled:
return 0
# דילוג בטוח במצב SAFE_MODE או אם ביקשו לבטל תחזוקת קאש
if str(os.getenv("SAFE_MODE", "")).lower() in ("1", "true", "yes", "y", "on") or \
str(os.getenv("DISABLE_CACHE_MAINTENANCE", "")).lower() in ("1", "true", "yes", "y", "on"):
logger.info("SAFE_MODE/disable flag פעיל — דילוג על clear_stale")
return 0
deleted = 0
scanned = 0
try:
client = self.redis_client
# בדיקת חיות מהירה כדי להיכשל מוקדם
try:
_ = client.ping()
except Exception:
logger.warning("clear_stale: Redis לא מגיב — דילוג על הניקוי")
return 0
# תקציב זמן לניקוי כדי לא לחסום worker אם Redis איטי
budget_seconds = float(os.getenv("CACHE_CLEAR_BUDGET_SECONDS", "5"))
deadline = time.time() + max(0.0, budget_seconds)
# עדיפות ל-scan_iter כדי להימנע מ-blocking
if hasattr(client, 'scan_iter') and hasattr(client, 'ttl'):
for k in client.scan_iter(match='*', count=500):
if time.time() > deadline:
break
scanned += 1
try:
ttl = int(client.ttl(k))
except Exception:
ttl = -2 # התייחסות כמפתח לא קיים/פג
# מחיקה רק אם TTL קצר (<= threshold) או לא קיים (-2)
if ttl == -2 or (ttl >= 0 and ttl <= int(ttl_seconds_threshold)):
try:
deleted += int(client.delete(k) or 0)
except Exception:
pass
# הפסקה כשעברנו את מכסת הסריקות או התקציב
if scanned >= int(max_scan) or time.time() > deadline:
break
else:
# Fallback זהיר: אל תמחק גורף אם אין יכולות TTL/SCAN
return 0
except Exception as e:
logger.warning(f"clear_stale failed: {e}")
logger.info(f"ניקוי cache עדין (stale): נסרקו {scanned} / נמחקו {deleted}")
return deleted
[תיעוד]
def get_stats(self) -> Dict[str, Any]:
"""סטטיסטיקות cache"""
if not self.is_enabled:
return {"enabled": False}
try:
info = self.redis_client.info()
return {
"enabled": True,
"used_memory": info.get('used_memory_human', 'N/A'),
"connected_clients": info.get('connected_clients', 0),
"keyspace_hits": info.get('keyspace_hits', 0),
"keyspace_misses": info.get('keyspace_misses', 0),
"hit_rate": round(
info.get('keyspace_hits', 0) /
max(info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0), 1) * 100,
2
)
}
except Exception as e:
logger.error(f"שגיאה בקבלת סטטיסטיקות cache: {e}")
return {"enabled": True, "error": "unavailable"}
# ===================== Flask dynamic cache decorator =====================
P = ParamSpec("P")
R = TypeVar("R")
[תיעוד]
def dynamic_cache(content_type: str, key_prefix: Optional[str] = None) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""דקורטור ל-caching דינמי ל-Flask endpoints.
- בונה מפתח קאש יציב הכולל משתמש/נתיב/פרמטרים
- שומר רק טיפוסים serializable; עבור Response עם JSON שומר את ה-data בלבד
- Fail-open: לעולם לא מפיל endpoint על בעיות קאש
"""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
# ייבוא מאוחר כדי להימנע מתלות פלצ'ית בזמן import מודולרי/טסטים
try:
from flask import request, session, jsonify
except Exception: # pragma: no cover
request = None
session = {}
def jsonify(x):
return x
# זיהוי משתמש וקונטקסט בסיסי
uid = None
try:
uid = session.get('user_id') if hasattr(session, 'get') else None
except Exception:
uid = None
try:
user_tier = (session.get('user_tier') or 'regular') if hasattr(session, 'get') else 'regular'
except Exception:
user_tier = 'regular'
# מפתח קאש: prefix/שם פונקציה + user + path + query
prefix = key_prefix if key_prefix else getattr(func, "__name__", "endpoint")
req_path = getattr(request, 'path', '') if request is not None else ''
try:
q = request.query_string.decode(errors='ignore') if request is not None else ''
except Exception:
q = ''
cache_key = build_cache_key(prefix, str(uid or 'anonymous'), req_path, q)
# ניסיון שליפה מהקאש
cached_value = cache.get(cache_key)
if cached_value is not None:
if isinstance(cached_value, dict):
return cast(R, jsonify(cached_value))
return cast(R, cached_value)
# חישוב התוצאה
result = func(*args, **kwargs)
# אם זו תגובת Flask עם JSON — שמור רק את ה-data
try:
if hasattr(result, 'get_json'):
data = result.get_json(silent=True)
if data is not None:
cache.set_dynamic(cache_key, data, content_type, {
'user_id': uid,
'user_tier': user_tier,
'endpoint': getattr(func, '__name__', ''),
})
return result
except Exception:
pass
# שמירה של טיפוסים serializable נפוצים
if isinstance(result, (dict, list, str, int, float, bool)):
try:
cache.set_dynamic(cache_key, result, content_type, {
'user_id': uid,
'user_tier': user_tier,
'endpoint': getattr(func, '__name__', ''),
})
except Exception:
pass
return result
except Exception:
# Fail-open על כל תקלה במנגנון הקאש
return func(*args, **kwargs)
return wrapper
return decorator
# יצירת instance גלובלי
cache = CacheManager()
# Fallback in-process cache store (used when Redis disabled or on failures)
_local_cache_store: Dict[str, Tuple[float, Any]] = {}
_local_cache_lock = threading.Lock()
_local_cache_last_cleanup_ts: float = 0.0
def _get_local_cache_max_entries() -> int:
"""מגבלת גודל לפולבק בזיכרון מקומי כדי למנוע זליגת זיכרון.
הערה: הפולבק נועד למקרי Redis מושבת/נופל ולכן חייב להיות מוגבל.
"""
try:
v = int(os.getenv("LOCAL_CACHE_MAX_ENTRIES", "2000"))
except Exception:
v = 2000
return max(0, v)
def _cleanup_local_cache(*, now: Optional[float] = None, force: bool = False) -> None:
"""ניקוי עדין של הפולבק בזיכרון: מחיקת פגי-תוקף + פינוי לפי גודל.
קריטי: בלי ניקוי, `_local_cache_store` גדל ללא גבול כי TTL נבדק רק בקריאה.
"""
global _local_cache_last_cleanup_ts
max_entries = _get_local_cache_max_entries()
if max_entries <= 0:
# אם הגדירו 0/שלילי — כבה לגמרי את הפולבק כדי להעדיף Redis/חישוב חוזר.
with _local_cache_lock:
_local_cache_store.clear()
return
ts = float(time.time() if now is None else now)
# אל תעשה סריקה מלאה בכל בקשה; מספיק כל ~30 שניות או אם עברנו את המגבלה.
if not force and (ts - float(_local_cache_last_cleanup_ts or 0.0)) < 30.0:
with _local_cache_lock:
if len(_local_cache_store) <= max_entries:
return
with _local_cache_lock:
_local_cache_last_cleanup_ts = ts
if not _local_cache_store:
return
# 1) מחיקת ערכים שפג תוקפם
expired_keys: List[str] = []
for k, entry in _local_cache_store.items():
try:
expires_at = float(entry[0])
except Exception:
expires_at = 0.0
if expires_at <= ts:
expired_keys.append(k)
for k in expired_keys:
_local_cache_store.pop(k, None)
# 2) אם עדיין גדול מדי — פנה לפי סדר הכנסה (dict שומר order ב-Python 3.7+)
if len(_local_cache_store) > max_entries:
to_evict = len(_local_cache_store) - max_entries
try:
for k in list(_local_cache_store.keys())[:to_evict]:
_local_cache_store.pop(k, None)
except Exception:
# fallback בטוח: מחיקה אגרסיבית אם משהו השתבש
_local_cache_store.clear()
[תיעוד]
def cached(expire_seconds: int = 300, key_prefix: str = "default"):
"""דקורטור לcaching פונקציות"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# יצירת מפתח cache
cache_key = cache._make_key(key_prefix, func.__name__, *args, **kwargs)
# בדיקה ב-cache (Redis/remote)
result = cache.get(cache_key)
if result is not None:
logger.debug(f"Cache hit: {cache_key}")
return result
# בדיקת Fallback בזיכרון מקומי
try:
now = time.time()
with _local_cache_lock:
entry = _local_cache_store.get(cache_key)
if entry is not None:
# אחורה תאימות: ערך יכול להיות או (expires, value) או (expires, ('json'|'obj', payload))
expires_at = float(entry[0])
if expires_at <= now:
# חשוב: למחוק ערכים פגי-תוקף כדי למנוע גדילה אינסופית
_local_cache_store.pop(cache_key, None)
entry = None
if entry is not None:
stored_value = entry[1]
try:
# אם נשמר מחרוזת JSON – פרסר יחזיר עותק חדש
if isinstance(stored_value, tuple) and len(stored_value) == 2:
kind, payload = stored_value
if kind == 'json' and isinstance(payload, str):
logger.debug(f"Local cache hit(json): {cache_key}")
return json.loads(payload)
if kind == 'obj':
logger.debug(f"Local cache hit(obj): {cache_key}")
return copy.deepcopy(payload)
# תמיכה בנתונים ישנים: החזר deep copy כדי לשמר איסולציה
if isinstance(stored_value, str):
return json.loads(stored_value)
return copy.deepcopy(stored_value)
except Exception:
return stored_value
except Exception:
# לא חוסם זרימה במקרה של שגיאה בפולבק
pass
# הפעלת הפונקציה ושמירה ב-cache
result = func(*args, **kwargs)
# אל תאחסן תוצאות חסרות (None) – מונע קיבוע שגיאות/חוסרים ומפחית פלייקיות בטסטים
if result is None:
return result
wrote_remote = False
try:
wrote_remote = bool(cache.set(cache_key, result, expire_seconds))
except Exception:
wrote_remote = False
# אם נכשל כתיבה לרימוט — שמור בזיכרון מקומי עם TTL
if not wrote_remote:
try:
# אחסן כמחרוזת JSON לשמירה על סמאנטיקת עותק כמו Redis; fallback ל-deepcopy
try:
serialized = json.dumps(result, default=str, ensure_ascii=False)
payload = ('json', serialized)
except Exception:
payload = ('obj', copy.deepcopy(result))
with _local_cache_lock:
_local_cache_store[cache_key] = (time.time() + float(expire_seconds), payload)
_cleanup_local_cache()
except Exception:
pass
logger.debug(f"Cache miss, stored: {cache_key}")
return result
return wrapper
return decorator
[תיעוד]
def async_cached(expire_seconds: int = 300, key_prefix: str = "default"):
"""דקורטור לcaching פונקציות async"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# יצירת מפתח cache
cache_key = cache._make_key(key_prefix, func.__name__, *args, **kwargs)
# בדיקה ב-cache (Redis/remote)
result = cache.get(cache_key)
if result is not None:
logger.debug(f"Cache hit: {cache_key}")
return result
# בדיקת Fallback בזיכרון מקומי
try:
now = time.time()
with _local_cache_lock:
entry = _local_cache_store.get(cache_key)
if entry is not None:
expires_at = float(entry[0])
if expires_at <= now:
_local_cache_store.pop(cache_key, None)
entry = None
if entry is not None:
stored_value = entry[1]
try:
if isinstance(stored_value, tuple) and len(stored_value) == 2:
kind, payload = stored_value
if kind == 'json' and isinstance(payload, str):
logger.debug(f"Local cache hit(json): {cache_key}")
return json.loads(payload)
if kind == 'obj':
logger.debug(f"Local cache hit(obj): {cache_key}")
return copy.deepcopy(payload)
if isinstance(stored_value, str):
return json.loads(stored_value)
return copy.deepcopy(stored_value)
except Exception:
return stored_value
except Exception:
pass
# הפעלת הפונקציה ושמירה ב-cache
result = await func(*args, **kwargs)
# אל תאחסן תוצאות חסרות (None)
if result is None:
return result
wrote_remote = False
try:
wrote_remote = bool(cache.set(cache_key, result, expire_seconds))
except Exception:
wrote_remote = False
if not wrote_remote:
try:
try:
serialized = json.dumps(result, default=str, ensure_ascii=False)
payload = ('json', serialized)
except Exception:
payload = ('obj', copy.deepcopy(result))
with _local_cache_lock:
_local_cache_store[cache_key] = (time.time() + float(expire_seconds), payload)
_cleanup_local_cache()
except Exception:
pass
logger.debug(f"Cache miss, stored: {cache_key}")
return result
return wrapper
return decorator