הראה קוד מקור ל file_manager

from typing import Any, BinaryIO, Dict, List, Optional, Tuple, Set
import os
import tempfile
import zipfile
import json
from datetime import datetime, timezone
from pathlib import Path
import logging
from contextlib import suppress
import io
import re
import shutil
import time

try:
    import gridfs  # from pymongo
except Exception:  # pragma: no cover
    gridfs = None  # type: ignore[assignment]

logger = logging.getLogger(__name__)

# Transitional: access persistence only via facade (avoid direct database imports)
try:
    from src.infrastructure.composition import get_files_facade  # type: ignore
except Exception:  # pragma: no cover
    get_files_facade = None  # type: ignore

[תיעוד] class BackupInfo: """מידע על גיבוי"""
[תיעוד] def __init__(self, backup_id: str, user_id: int, created_at: datetime, file_count: int, total_size: int, backup_type: str, status: str, file_path: str, repo: Optional[str], path: Optional[str], metadata: Optional[Dict[str, Any]]): self.backup_id = backup_id self.user_id = user_id self.created_at = created_at self.file_count = file_count self.total_size = total_size self.backup_type = backup_type self.status = status self.file_path = file_path self.repo = repo self.path = path self.metadata = metadata
[תיעוד] class BackupManager: """מנהל גיבויים"""
[תיעוד] def __init__(self): # מצב אחסון: mongo (GridFS) או fs (קבצים) self.storage_mode = os.getenv("BACKUPS_STORAGE", "mongo").strip().lower() # העדף תיקייה מתמשכת עבור גיבויים (נשמרת בין דיפלויים אם קיימת) # נסה לפי סדר: BACKUPS_DIR מהסביבה → /app/backups → /data/backups → /var/lib/code_keeper/backups persistent_candidates = [ os.getenv("BACKUPS_DIR"), "/app/backups", "/data/backups", "/var/lib/code_keeper/backups", ] chosen_dir: Optional[Path] = None for cand in persistent_candidates: if not cand: continue try: p = Path(cand) p.mkdir(parents=True, exist_ok=True) # וידוא שניתן לכתוב test_file = p / ".write_test" try: with open(test_file, "w") as tf: tf.write("ok") test_file.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: # אם אי אפשר לכתוב – נסה מועמד הבא continue chosen_dir = p break except Exception: continue if chosen_dir is None: # נפילה לתיקיית temp אם אין נתיב מתמשך זמין chosen_dir = Path(tempfile.gettempdir()) / "code_keeper_backups" chosen_dir.mkdir(exist_ok=True) self.backup_dir = chosen_dir # תיקיית legacy: תמיכה בקריאה גם מהמיקום הישן (אם השתמש בעבר ב-temp) # נשמור גם על תמיכה ב-"/app/backups" כנתיב חיפוש נוסף אם לא נבחר כבר legacy_candidates: List[Path] = [] try: legacy_candidates.append(Path(tempfile.gettempdir()) / "code_keeper_backups") except Exception: pass try: app_backups = Path("/app/backups") if app_backups != self.backup_dir: legacy_candidates.append(app_backups) except Exception: pass # שמור נתיב legacy ראשי למטרות תאימות (ישומש בחיפוש) self.legacy_backup_dir = legacy_candidates[0] if legacy_candidates else None self.max_backup_size = 100 * 1024 * 1024 # 100MB # מנגנון התרעה פשוט למניעת הצפה self._last_disk_warn_ts: float = 0.0
def _maybe_alert_low_space(self) -> None: """בדיקת מקום פנוי בדיסק לפני כתיבה והתרעה לאדמין אם נמוך. - סף ברירת מחדל: 200MB (ניתן לשינוי ב‑BACKUPS_DISK_MIN_FREE_BYTES) - Rate limit: התרעה אחת לכל 10 דקות כדי להימנע מספאם """ try: # חישוב מקום פנוי על הפיילסיסטם של תיקיית הגיבויים usage = shutil.disk_usage(self.backup_dir) free_bytes = int(getattr(usage, 'free', 0) or 0) try: # תמיכה נכונה במשתנה סביבה ריק: אם ה‑ENV קיים אך ריק → השתמש בברירת המחדל _env_val = os.getenv("BACKUPS_DISK_MIN_FREE_BYTES") if _env_val is None or not str(_env_val).strip(): threshold = 200 * 1024 * 1024 else: threshold = int(_env_val) except Exception: threshold = 200 * 1024 * 1024 limit = max(1, threshold) if free_bytes <= 0 or free_bytes < limit: now = time.time() if now - self._last_disk_warn_ts < 600: # 10 דקות return self._last_disk_warn_ts = now # נסה לפלוט אירוע מובנה + התראה פנימית try: from observability import emit_event # type: ignore except Exception: emit_event = None # type: ignore try: from internal_alerts import emit_internal_alert # type: ignore except Exception: emit_internal_alert = None # type: ignore msg = "⚠️ הדיסק כמעט מלא – מומלץ לנקות גיבויים או להגדיל נפח" details = { "path": str(self.backup_dir), "free_bytes": int(free_bytes), "threshold_bytes": int(threshold), } try: if emit_event is not None: emit_event("disk_low_space", severity="warn", **details) except Exception: pass try: if emit_internal_alert is not None: # סיכום קצר + פרטים טכניים לצפייה ב‑ChatOps/Telegram emit_internal_alert("disk_low_space", severity="warn", summary=msg, **details) except Exception: pass # NOTE: בעבר נשלחה כאן הודעת DM ישירה לאדמינים דרך BOT_TOKEN/ADMIN_USER_IDS. # היום כל ההתראות אמורות לעבור דרך internal_alerts.emit_internal_alert כדי שמנוע הכללים # (כולל suppress) יהיה מקור האמת, ולא תהיה "עקיפה" של הפייפליין. except Exception: # לא לשבור זרימה בגיבוי — התרעה היא best‑effort return def _is_safe_path(self, target: Path, allow_under: Path) -> bool: """בדיקת בטיחות למסלול לפני מחיקה/ניקוי. - מונע מחיקה של נתיבים מסוכנים ("/", HOME, או ה־PWD הנוכחי) - מאשר רק נתיבים שנמצאים תחת allow_under """ try: rp_target = target.resolve() rp_base = allow_under.resolve() if str(rp_target) == "/" or str(rp_target) == str(Path.home()) or str(rp_target) == str(Path.cwd()): return False # דרוש שהנתיב יהיה מתחת ל־allow_under return str(rp_target).startswith(str(rp_base) + "/") or (str(rp_target) == str(rp_base)) except Exception: return False
[תיעוד] def cleanup_expired_backups( self, retention_days: int | None = None, *, max_per_user: int | None = None, budget_seconds: float | None = None, ) -> dict: """ניקוי גיבויים ישנים ממערכת הקבצים ומ‑GridFS באופן מבוקר. פרמטרים: - retention_days: ימים לשמירת גיבוי לפני מחיקה (ברירת מחדל: BACKUPS_RETENTION_DAYS או 30) - max_per_user: כמות מקסימלית של גיבויים לשמירה לכל משתמש (ברירת מחדל: BACKUPS_MAX_PER_USER או None) - budget_seconds: תקציב זמן לניקוי כדי לא לחסום את ה־worker (ברירת מחדל: BACKUPS_CLEANUP_BUDGET_SECONDS או 3) החזרה: dict עם counters לסריקה/מחיקות ושגיאות. """ from contextlib import suppress import time from datetime import datetime, timedelta, timezone import os # דגלי בטיחות: אפשר לכבות ניקוי רקע לחלוטין if str(os.getenv("DISABLE_BACKGROUND_CLEANUP", "")).lower() in ("1", "true", "yes", "on"): return {"skipped": True, "reason": "disabled_by_env"} # SAFE_MODE → אל תמחק מהדיסק (נחזיר skipped כדי להימנע מהפתעות בסביבת טסטים) if str(os.getenv("SAFE_MODE", "")).lower() in ("1", "true", "yes", "on"): return {"skipped": True, "reason": "safe_mode"} try: if retention_days is None: retention_days = int(os.getenv("BACKUPS_RETENTION_DAYS", "30") or 30) except Exception: retention_days = 30 try: if max_per_user is None: val = os.getenv("BACKUPS_MAX_PER_USER", "") max_per_user = int(val) if val not in (None, "") else None except Exception: max_per_user = None try: if budget_seconds is None: budget_seconds = float(os.getenv("BACKUPS_CLEANUP_BUDGET_SECONDS", "3") or 3) except Exception: budget_seconds = 3.0 now = datetime.now(timezone.utc) cutoff = now - timedelta(days=max(1, int(retention_days or 30))) deadline = time.time() + max(0.1, float(budget_seconds or 3.0)) summary: dict = { "fs_scanned": 0, "fs_deleted": 0, "gridfs_scanned": 0, "gridfs_deleted": 0, "errors": [], "retention_days": int(retention_days or 30), "max_per_user": (int(max_per_user) if isinstance(max_per_user, int) else None), } # --- ניקוי במערכת הקבצים --- try: search_dirs: list[Path] = [self.backup_dir] # הוסף נתיבי legacy בטוחים בלבד with suppress(Exception): if getattr(self, "legacy_backup_dir", None) and isinstance(self.legacy_backup_dir, Path): search_dirs.append(self.legacy_backup_dir) with suppress(Exception): app_backups = Path("/app/backups") if app_backups != self.backup_dir: search_dirs.append(app_backups) # קיבוץ לפי משתמש by_user: dict[int | str, list[tuple[Path, datetime]]] = {} for base_dir in search_dirs: try: for p in base_dir.glob("*.zip"): if time.time() > deadline: break summary["fs_scanned"] += 1 # חילוץ owner ו‑created_at מתוך ה‑ZIP (best‑effort) owner: int | str | None = None created_at: datetime | None = None with suppress(Exception): with zipfile.ZipFile(p, 'r') as zf: with suppress(Exception): md_raw = zf.read('metadata.json') md = json.loads(md_raw) if md_raw else {} uid_val = md.get("user_id") if isinstance(uid_val, int): owner = uid_val elif isinstance(uid_val, str) and uid_val.isdigit(): owner = int(uid_val) cat = md.get("created_at") if isinstance(cat, str): try: created_at = datetime.fromisoformat(cat) if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) except Exception: created_at = None # fallback if created_at is None: with suppress(Exception): created_at = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc) if owner is None: # נסה לחלץ מ‑backup_<user>_* name = p.stem with suppress(Exception): import re m = re.match(r"^backup_(\d+)_", name) if m: owner = int(m.group(1)) key = owner if owner is not None else "unknown" if created_at is None: created_at = now by_user.setdefault(key, []).append((p, created_at)) except Exception as e: summary["errors"].append(f"fs_scan:{base_dir}:{e}") # קבע מועמדים למחיקה לפי retention ו‑max_per_user candidates: list[Path] = [] candidate_set: set[str] = set() def _add_candidate(p: Path) -> None: # הוספה O(1) עם סט למניעת כפילויות במקום 'p not in list' try: rp = str(p.resolve()) except Exception: rp = str(p) if rp in candidate_set: return candidate_set.add(rp) candidates.append(p) for key, items in by_user.items(): # מיון מהחדש לישן items.sort(key=lambda t: t[1], reverse=True) # חריגה ממכסה if isinstance(max_per_user, int) and max_per_user > 0 and len(items) > max_per_user: for p, _dt in items[max_per_user:]: _add_candidate(p) # ישנים מעבר ל‑cutoff for p, dt in items: if dt < cutoff: _add_candidate(p) # מחיקה מבוקרת for p in candidates: if time.time() > deadline: break # ודא שהקובץ באמת נמצא תחת אחד מהנתיבים המותרים allowed_ok = any(self._is_safe_path(p, base) for base in search_dirs if isinstance(base, Path)) if not allowed_ok: summary["errors"].append(f"unsafe_path:{p}") continue try: if p.exists(): p.unlink() summary["fs_deleted"] += 1 except Exception as e: summary["errors"].append(f"fs_delete:{p}:{e}") except Exception as e: summary["errors"].append(f"fs_cleanup:{e}") # --- ניקוי ב‑GridFS (אם קיים) --- try: fs = self._get_gridfs() except Exception: fs = None if fs is not None and time.time() <= deadline: try: by_user_g: dict[int | str, list[tuple[object, datetime]]] = {} # איטרציה עצלה כדי לכבד תקציב זמן; הימנע מ-materialize מלא try: cursor = fs.find() except Exception: cursor = [] for fdoc in cursor: if time.time() > deadline: break summary["gridfs_scanned"] += 1 owner: int | str | None = None created_at: datetime | None = None md = getattr(fdoc, 'metadata', None) or {} with suppress(Exception): uid_val = md.get('user_id') if isinstance(uid_val, int): owner = uid_val elif isinstance(uid_val, str) and uid_val.isdigit(): owner = int(uid_val) with suppress(Exception): created_at = getattr(fdoc, 'uploadDate', None) if created_at and getattr(created_at, 'tzinfo', None) is None: created_at = created_at.replace(tzinfo=timezone.utc) if created_at is None: with suppress(Exception): cat = md.get('created_at') if isinstance(cat, str): created_at = datetime.fromisoformat(cat) if created_at and created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) if created_at is None: created_at = now key = owner if owner is not None else "unknown" by_user_g.setdefault(key, []).append((fdoc, created_at)) # מועמדים למחיקה cand_ids: list[object] = [] cand_id_set: set[object] = set() for key, items in by_user_g.items(): items.sort(key=lambda t: t[1], reverse=True) if isinstance(max_per_user, int) and max_per_user > 0 and len(items) > max_per_user: for fdoc, _dt in items[max_per_user:]: fid = getattr(fdoc, '_id', None) if fid not in cand_id_set: cand_id_set.add(fid) cand_ids.append(fid) for fdoc, dt in items: if dt < cutoff: fid = getattr(fdoc, '_id', None) if fid not in cand_id_set: cand_id_set.add(fid) cand_ids.append(fid) for fid in cand_ids: if time.time() > deadline: break try: if fid is not None: fs.delete(fid) summary["gridfs_deleted"] += 1 except Exception as e: summary["errors"].append(f"gridfs_delete:{e}") except Exception as e: summary["errors"].append(f"gridfs_cleanup:{e}") return summary
# ============================= # GridFS helpers (Mongo storage) # ============================= def _get_gridfs(self): if self.storage_mode != "mongo": return None if gridfs is None: return None try: # שימוש במסד הנתונים הגלובלי הקיים (דרך facade) mongo_db = None if get_files_facade is not None: mongo_db = get_files_facade().get_mongo_db() if not mongo_db: return None # אוסף ייעודי "backups" return gridfs.GridFS(mongo_db, collection="backups") except Exception: return None
[תיעוד] def save_backup_bytes(self, data: bytes, metadata: Dict[str, Any]) -> Optional[str]: """שומר ZIP של גיבוי בהתאם למצב האחסון ומחזיר backup_id או None במקרה כשל. אם storage==mongo: שומר ל-GridFS עם המטאדטה. אם storage==fs: שומר לקובץ תחת backup_dir. """ try: # בדיקת מקום פנוי לפני כתיבה self._maybe_alert_low_space() backup_id = metadata.get("backup_id") or f"backup_{int(datetime.now(timezone.utc).timestamp())}" # נסה להטמיע/לעדכן metadata.json בתוך ה-ZIP כך שיכלול לפחות backup_id ו-user_id אם סופק try: merged_bytes = data with zipfile.ZipFile(io.BytesIO(data), 'r') as zin: # קרא מטאדטה קיימת אם יש existing_md: Dict[str, Any] = {} with suppress(Exception): raw = zin.read('metadata.json') try: existing_md = json.loads(raw) except Exception: try: text = raw.decode('utf-8', errors='ignore') # פענוח מינימלי existing_md = {} bid_m = re.search(r'"backup_id"\s*:\s*"([^"]+)"', text) uid_m = re.search(r'"user_id"\s*:\s*(\d+)', text) if bid_m: existing_md['backup_id'] = bid_m.group(1) if uid_m: existing_md['user_id'] = int(uid_m.group(1)) except Exception: existing_md = {} # מטאדטה הסופית — metadata הנכנסת גוברת final_md = dict(existing_md) final_md.update(metadata or {}) # ודא ש-backup_id קיים בתוצאה final_md['backup_id'] = final_md.get('backup_id') or backup_id # בנה ZIP חדש עם metadata.json מעודכן out = io.BytesIO() with zipfile.ZipFile(out, 'w', compression=zipfile.ZIP_DEFLATED) as zout: for name in zin.namelist(): if name == 'metadata.json' or name.endswith('/'): continue try: zout.writestr(name, zin.read(name)) except Exception: continue # כתוב metadata.json מעודכן try: zout.writestr('metadata.json', json.dumps(final_md, indent=2)) except Exception: # fallback בלי indent zout.writestr('metadata.json', json.dumps(final_md)) merged_bytes = out.getvalue() data = merged_bytes # עדכן backup_id אם הוכנס ב-final_md backup_id = (final_md.get('backup_id') or backup_id) except Exception: # אם לא הצלחנו לטפל — המשך עם הנתונים המקוריים pass # הבטח זיהוי בקובץ filename = f"{backup_id}.zip" if self.storage_mode == "mongo": fs = self._get_gridfs() if fs is None: # נפילה לאחסון קבצים target_path = self.backup_dir / filename with open(target_path, "wb") as f: f.write(data) return backup_id # שמור ל-GridFS # אם כבר קיים אותו backup_id – מחק ישן with suppress(Exception): for fdoc in fs.find({"filename": filename}): fs.delete(fdoc._id) fs.put(data, filename=filename, metadata=metadata) return backup_id # ברירת מחדל: קבצים target_path = self.backup_dir / filename with open(target_path, "wb") as f: f.write(data) return backup_id except Exception as e: logger.warning(f"save_backup_bytes failed: {e}") return None
[תיעוד] def save_backup_file(self, file_path: str) -> Optional[str]: """שומר קובץ ZIP קיים לאחסון היעד (Mongo/FS) ומחזיר backup_id אם הצליח.""" try: # בדיקת מקום פנוי לפני כתיבה/העתקה self._maybe_alert_low_space() # 1) קרא מטאדטה מתוך ה‑ZIP (ללא קריאה של כל הקובץ לזיכרון) metadata: Dict[str, Any] = {} try: with zipfile.ZipFile(file_path, 'r') as zf: with suppress(Exception): md_raw = zf.read('metadata.json') metadata = json.loads(md_raw) if md_raw else {} except Exception: metadata = {} # הפק backup_id אם חסר backup_id = metadata.get("backup_id") or os.path.splitext(os.path.basename(file_path))[0] metadata["backup_id"] = backup_id filename = f"{backup_id}.zip" # 2) שמירה לפי מצב אחסון — הימנע מקריאה מלאה של הקובץ לזיכרון if self.storage_mode == "mongo": fs = self._get_gridfs() if fs is None: # נפילה לאחסון קבצים target_path = self.backup_dir / filename try: # העתקה חסכונית בזיכרון import shutil shutil.copyfile(file_path, target_path) except Exception: # fallback לכתיבה בבלוקים אם shutil נכשל with open(file_path, 'rb') as src, open(target_path, 'wb') as dst: while True: chunk = src.read(1024 * 1024) if not chunk: break dst.write(chunk) return backup_id # GridFS – מחיקה של עותק קודם (בשם זהה) ושמירה בזרימה with suppress(Exception): for fdoc in fs.find({"filename": filename}): fs.delete(fdoc._id) with open(file_path, 'rb') as fobj: fs.put(fobj, filename=filename, metadata=metadata) # type: ignore[arg-type] return backup_id # ברירת מחדל: קבצים target_path = self.backup_dir / filename try: import shutil shutil.copyfile(file_path, target_path) except Exception: with open(file_path, 'rb') as src, open(target_path, 'wb') as dst: while True: chunk = src.read(1024 * 1024) if not chunk: break dst.write(chunk) return backup_id except Exception as e: logger.warning(f"save_backup_file failed: {e}") return None
[תיעוד] def list_backups(self, user_id: int) -> List[BackupInfo]: """מחזירה רשימת קבצי ZIP ששייכים למשתמש המבקש בלבד. כל פריט חייב להיות מסווג כשייך ל-user_id דרך אחד מהבאים: - metadata.json בתוך ה-ZIP עם שדה user_id תואם - דפוס מזהה בשם: backup_<user_id>_* ZIPים ללא שיוך ברור למשתמש לא ייכללו כדי למנוע זליגת מידע. """ backups: List[BackupInfo] = [] try: # עבור על כל קובצי ה‑ZIP בכל התיקיות הרלוונטיות (ראשית + legacy/migration) search_dirs: List[Path] = [self.backup_dir] # הוסף נתיבי legacy נוספים אם זמינים extra_legacy: List[Path] = [] try: if getattr(self, "legacy_backup_dir", None): if isinstance(self.legacy_backup_dir, Path) and self.legacy_backup_dir.exists(): extra_legacy.append(self.legacy_backup_dir) except Exception: pass # ודא ש-/app/backups ייסרק גם אם הוא אינו ה-backup_dir try: app_backups = Path("/app/backups") if app_backups.exists() and app_backups != self.backup_dir: extra_legacy.append(app_backups) except Exception: pass for d in extra_legacy: if d not in search_dirs: search_dirs.append(d) seen_paths: Set[str] = set() # דה-דופ אזהרות עבור קבצי ZIP לא תקינים כדי למנוע הצפת לוגים try: _invalid_warned: Set[str] = getattr(self, "_warned_invalid_zips") # type: ignore[assignment] except Exception: _invalid_warned = set() # קבצים בדיסק — מציגים רק קבצים ששייכים למשתמש for _dir in search_dirs: for backup_file in _dir.glob("*.zip"): try: resolved_path = str(backup_file.resolve()) except Exception: resolved_path = str(backup_file) if resolved_path in seen_paths: continue seen_paths.add(resolved_path) # דלג על קבצים ריקים או קבצים שאינם ZIP, עם לוג עדין פעם אחת לכל נתיב try: st = backup_file.stat() if int(getattr(st, 'st_size', 0) or 0) == 0: continue except Exception: pass try: # ערכי ברירת מחדל metadata: Optional[Dict[str, Any]] = None backup_id: str = os.path.splitext(os.path.basename(backup_file))[0] owner_user_id: Optional[int] = None created_at: Optional[datetime] = None file_count: int = 0 backup_type: str = "unknown" repo: Optional[str] = None path: Optional[str] = None zip_entries: Optional[List[str]] = None with zipfile.ZipFile(backup_file, 'r') as zf: # נסה לקרוא metadata.json, אם קיים try: metadata_content = zf.read("metadata.json") try: # json.loads תומך ב-bytes; אם ייכשל ננסה דקדוק חלופי metadata = json.loads(metadata_content) except Exception: # fallback: ננסה לפענח כמחרוזת ולחלץ שדות בסיסיים ב-regex try: text = metadata_content.decode('utf-8', errors='ignore') except Exception: text = str(metadata_content) metadata = {} # backup_id try: m_bid = re.search(r'"backup_id"\s*:\s*"([^"]+)"', text) if m_bid: metadata["backup_id"] = m_bid.group(1) except Exception: pass # user_id try: m_uid = re.search(r'"user_id"\s*:\s*(\d+)', text) if m_uid: metadata["user_id"] = int(m_uid.group(1)) except Exception: pass # created_at try: m_cat = re.search(r'"created_at"\s*:\s*"([^"]+)"', text) if m_cat: metadata["created_at"] = m_cat.group(1) except Exception: pass except Exception: metadata = None # קבע בעלים של ה-ZIP מתוך metadata אם קיים if metadata is not None: try: uid_val = metadata.get("user_id") if isinstance(uid_val, str) and uid_val.isdigit(): owner_user_id = int(uid_val) elif isinstance(uid_val, int): owner_user_id = uid_val except Exception: owner_user_id = None # שלוף נתונים מהמטאדטה אם קיימת if metadata is not None: backup_id = metadata.get("backup_id") or backup_id created_at_str = metadata.get("created_at") if created_at_str: try: created_at = datetime.fromisoformat(created_at_str) # נרמל ל-aware TZ אם חסר if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) except Exception: created_at = None fc_meta = metadata.get("file_count") if isinstance(fc_meta, int): file_count = fc_meta backup_type = metadata.get("backup_type", "unknown") repo = metadata.get("repo") path = metadata.get("path") else: # ZIP כללי ללא מטאדטה backup_type = "generic_zip" # אם אין owner במטאדטה — נסה להסיק משם הקובץ: backup_<user>_* if owner_user_id is None: try: m = re.match(r"^backup_(\d+)_", backup_id) if m: owner_user_id = int(m.group(1)) except Exception: owner_user_id = None # סינון: הצג רק אם שייך למשתמש המבקש if owner_user_id is None or owner_user_id != user_id: # לא שייך למשתמש — דלג continue # אם אין created_at – נפל ל‑mtime של הקובץ if not created_at: try: created_at = datetime.fromtimestamp(os.path.getmtime(resolved_path), tz=timezone.utc) except Exception: created_at = datetime.now(timezone.utc) # אם אין file_count – מנה את הקבצים שאינם תיקיות if file_count == 0: try: if zip_entries is None: zip_entries = zf.namelist() non_dirs = [n for n in zip_entries if not n.endswith('/')] file_count = len(non_dirs) except Exception: file_count = 0 backup_info = BackupInfo( backup_id=backup_id, user_id=owner_user_id if owner_user_id is not None else user_id, created_at=created_at, file_count=file_count, total_size=os.path.getsize(resolved_path), backup_type=backup_type, status="completed", file_path=resolved_path, repo=repo, path=path, metadata=metadata, ) backups.append(backup_info) except zipfile.BadZipFile: try: if resolved_path not in _invalid_warned: logger.info(f"דלג על קובץ גיבוי שאינו ZIP: {resolved_path}") _invalid_warned.add(resolved_path) setattr(self, "_warned_invalid_zips", _invalid_warned) except Exception: logger.info(f"דלג על קובץ גיבוי שאינו ZIP: {resolved_path}") continue except Exception as e: # הורדת רמת הלוג למניעת הצפה; דה-דופ לפי נתיב try: if resolved_path not in _invalid_warned: logger.info(f"דלג על גיבוי לא תקין {backup_file}: {e}") _invalid_warned.add(resolved_path) setattr(self, "_warned_invalid_zips", _invalid_warned) except Exception: logger.info(f"דלג על גיבוי לא תקין {backup_file}: {e}") continue # קבצים ב-GridFS (Mongo) – נטען רק של המשתמש try: fs = self._get_gridfs() if fs is not None: # טען את כל הפריטים ובדוק בעלות בקוד כדי לכלול גם legacy ללא metadata.user_id cursor = fs.find() for fdoc in cursor: try: md = getattr(fdoc, 'metadata', None) or {} # קבע backup_id מוקדם לשימושים שונים backup_id = md.get("backup_id") or os.path.splitext(fdoc.filename or "")[0] or str(getattr(fdoc, "_id", "")) if not backup_id: continue if any(b.backup_id == backup_id for b in backups): # כבר קיים מתוך הדיסק continue total_size = int(getattr(fdoc, 'length', 0) or 0) # זיהוי בעלות: metadata.user_id → דפוס בשם → metadata.json מתוך ה-ZIP owner_user_id = None try: uid_val = md.get("user_id") if isinstance(uid_val, str) and uid_val.isdigit(): owner_user_id = int(uid_val) elif isinstance(uid_val, int): owner_user_id = uid_val except Exception: owner_user_id = None if owner_user_id is None: try: m = re.match(r"^backup_(\d+)_", backup_id) if m: owner_user_id = int(m.group(1)) except Exception: owner_user_id = None # אם עדיין לא ידוע — קרא metadata.json מתוך ה-ZIP המקומי local_path = self.backup_dir / f"{backup_id}.zip" if owner_user_id is None: try: if not local_path.exists() or (total_size and local_path.stat().st_size != total_size): grid_out = fs.get(fdoc._id) with open(local_path, 'wb') as lf: lf.write(grid_out.read()) with zipfile.ZipFile(local_path, 'r') as zf: with suppress(Exception): raw = zf.read('metadata.json') md2 = json.loads(raw) if raw else {} u2 = md2.get('user_id') if isinstance(u2, int): owner_user_id = u2 elif isinstance(u2, str) and u2.isdigit(): owner_user_id = int(u2) except Exception: pass # חסום פריטים שאינם שייכים למשתמש if owner_user_id != user_id: continue # מטא נוספים created_at = None created_at_str = md.get("created_at") if created_at_str: with suppress(Exception): created_at = datetime.fromisoformat(created_at_str) if created_at and created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) if not created_at: with suppress(Exception): created_at = getattr(fdoc, 'uploadDate', None) file_count = int(md.get("file_count") or 0) backup_type = md.get("backup_type", "unknown") repo = md.get("repo") path = md.get("path") # ודא עותק מקומי זמני קיים לשימוש בהורדה/שחזור if not local_path.exists() or (total_size and local_path.stat().st_size != total_size): try: grid_out = fs.get(fdoc._id) with open(local_path, 'wb') as lf: lf.write(grid_out.read()) except Exception: # אם נכשל יצירת עותק – דלג והמשך (לא נציג פריט לא שמיש) continue backup_info = BackupInfo( backup_id=backup_id, user_id=owner_user_id if owner_user_id is not None else user_id, created_at=created_at or datetime.now(timezone.utc), file_count=file_count, total_size=total_size or (local_path.stat().st_size if local_path.exists() else 0), backup_type=backup_type, status="completed", file_path=str(local_path), repo=repo, path=path, metadata=md, ) backups.append(backup_info) except Exception: continue except Exception: pass # אם לא נמצאו כלל גיבויים למשתמש — אפשר מצב תאימות רכה עבור אדמין/דגל ENV try: allow_legacy = False # אדמין? (לפי ENV ADMIN_USER_IDS) try: admins_raw = os.getenv("ADMIN_USER_IDS", "") if admins_raw: admin_ids = {int(x.strip()) for x in admins_raw.split(',') if x.strip().isdigit()} if int(user_id) in admin_ids: allow_legacy = True except Exception: pass # או דגל תאימות ייעודי if not allow_legacy: flag = str(os.getenv("BACKUPS_SHOW_ALL_IF_EMPTY", "")).strip().lower() allow_legacy = flag in ("1", "true", "yes", "on") except Exception: allow_legacy = False if allow_legacy and not backups: try: # בטעינת תאימות: הצג את כל קבצי ה‑ZIP מכל הנתיבים הידועים, גם ללא user_id מפורש. # מיפוי נתיבים ייחודי למניעת כפילויות seen_paths_all: Set[str] = set() # build search_dirs כמו קודם search_dirs_legacy: List[Path] = [self.backup_dir] with suppress(Exception): if getattr(self, "legacy_backup_dir", None) and isinstance(self.legacy_backup_dir, Path) and self.legacy_backup_dir.exists(): search_dirs_legacy.append(self.legacy_backup_dir) with suppress(Exception): app_backups = Path("/app/backups") if app_backups != self.backup_dir and app_backups.exists(): search_dirs_legacy.append(app_backups) for d in search_dirs_legacy: with suppress(Exception): for p in d.glob("*.zip"): try: rp = str(p.resolve()) except Exception: rp = str(p) if rp in seen_paths_all: continue seen_paths_all.add(rp) # נסה לקרוא metadata.json — אך אל תדרוש user_id metadata: Optional[Dict[str, Any]] = None backup_id = os.path.splitext(os.path.basename(p))[0] created_at = None file_count = 0 backup_type = "unknown" repo = None path = None with suppress(Exception): with zipfile.ZipFile(p, 'r') as zf: with suppress(Exception): md_raw = zf.read('metadata.json') metadata = json.loads(md_raw) if md_raw else None # שדות עזר אם קיימים if metadata: with suppress(Exception): bid = metadata.get('backup_id') if isinstance(bid, str) and bid: backup_id = bid with suppress(Exception): cat = metadata.get('created_at') if isinstance(cat, str) and cat: created_at = datetime.fromisoformat(cat) if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) with suppress(Exception): fc = metadata.get('file_count') if isinstance(fc, int): file_count = fc with suppress(Exception): backup_type = metadata.get('backup_type', backup_type) with suppress(Exception): repo = metadata.get('repo') with suppress(Exception): path = metadata.get('path') # אם לא קיים file_count — חשב if file_count == 0: try: non_dirs = [n for n in zf.namelist() if not n.endswith('/')] file_count = len(non_dirs) except Exception: file_count = 0 if not created_at: with suppress(Exception): created_at = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc) try: total_size = p.stat().st_size except Exception: total_size = 0 # ודא שייכות למשתמש גם במצב תאימות: מטאדטה או דפוס שם המכיל את המזהה owner_ok = False try: if metadata is not None: u = metadata.get('user_id') if isinstance(u, int) and int(u) == int(user_id): owner_ok = True elif isinstance(u, str) and u.isdigit() and int(u) == int(user_id): owner_ok = True except Exception: owner_ok = False if not owner_ok: try: base = os.path.splitext(os.path.basename(p))[0] # חפש את user_id כמילה/חלק מופרד בקו תחתון m = re.search(rf"(?:^|_)({int(user_id)})($|_|\b)", base) if m: owner_ok = True except Exception: owner_ok = False if not owner_ok: # ללא הוכחת בעלות — אל תציג כדי למנוע זליגה continue backups.append(BackupInfo( backup_id=backup_id, user_id=int(user_id), created_at=created_at or datetime.now(timezone.utc), file_count=file_count, total_size=total_size, backup_type=backup_type, status="completed", file_path=rp, repo=repo, path=path, metadata=metadata, )) except Exception: # תאימות לאחור היא best-effort בלבד pass # מיון לפי תאריך יצירה backups.sort(key=lambda x: x.created_at, reverse=True) except Exception as e: logger.error(f"שגיאה ברשימת גיבויים: {e}") return backups
[תיעוד] def restore_from_backup(self, user_id: int, backup_path: str, overwrite: bool = True, purge: bool = False, extra_tags: Optional[List[str]] = None) -> Dict[str, Any]: """משחזר קבצים מ-ZIP למסד הנתונים. - purge=True: מסמן את כל הקבצים הקיימים של המשתמש כלא פעילים לפני השחזור - overwrite=True: שמירה תמיד כגרסה חדשה עבור אותו שם (כברירת מחדל) החזרה: dict עם restored_files ו-errors """ results: Dict[str, Any] = {"restored_files": 0, "errors": []} try: import zipfile from utils import detect_language_from_filename facade = get_files_facade() if get_files_facade is not None else None if facade is None: results["errors"].append("DB unavailable") return results # פרה-תנאי if not os.path.exists(backup_path): results["errors"].append(f"backup file not found: {backup_path}") return results if purge: try: existing = facade.get_user_files(user_id, limit=1000, projection={"file_name": 1}) or [] for doc in existing: try: fname = doc.get('file_name') if fname: facade.delete_file(user_id, fname) except Exception as e: results["errors"].append(f"purge failed for {doc.get('file_name')}: {e}") except Exception as e: results["errors"].append(f"purge listing failed: {e}") with zipfile.ZipFile(backup_path, 'r') as zf: names = [n for n in zf.namelist() if not n.endswith('/') and n != 'metadata.json'] for name in names: try: raw = zf.read(name) text: str try: text = raw.decode('utf-8') except Exception: try: text = raw.decode('latin-1') except Exception as e: results["errors"].append(f"decode failed for {name}: {e}") continue lang = detect_language_from_filename(name) # אם יש תגית repo:* — הוסף אותה רק עבור קבצים שנמצאים בשורש הריפו או תחת נתיב התואם לריפו filtered_extra = list(extra_tags or []) try: repo_tags = [t for t in filtered_extra if isinstance(t, str) and t.strip().lower().startswith('repo:')] # כלל זהיר: שמור את תג ה-repo רק אם הנתיב אינו כולל סלאשים רבים/או שהקובץ בשם שאינו גורף (index.html יכול להופיע בכל מקום) — # בפשטות: תמיד נאפשר, אבל נוסיף רק את תג ה-repo האחרון (אם קיים) והיתר נסנן למעלה בשכבת repo.save_file if repo_tags: filtered_extra = [repo_tags[-1]] + [t for t in filtered_extra if not (isinstance(t, str) and t.strip().lower().startswith('repo:'))] except Exception: pass ok = facade.save_file(user_id=user_id, file_name=name, code=text, programming_language=lang, extra_tags=filtered_extra) if ok: results["restored_files"] += 1 else: results["errors"].append(f"save failed for {name}") except Exception as e: results["errors"].append(f"restore failed for {name}: {e}") except Exception as e: results["errors"].append(str(e)) return results
[תיעוד] def delete_backups(self, user_id: int, backup_ids: List[str]) -> Dict[str, Any]: """מוחק מספר גיבויי ZIP לפי backup_id ממערכת הקבצים ומ-GridFS (אם בשימוש). החזרה: {"deleted": int, "errors": [str, ...]} """ results: Dict[str, Any] = {"deleted": 0, "errors": []} try: if not backup_ids: return results filenames = [f"{bid}.zip" for bid in backup_ids] # מחיקה ממערכת הקבצים (כולל נתיבי legacy) search_dirs: List[Path] = [self.backup_dir] try: if getattr(self, "legacy_backup_dir", None): if isinstance(self.legacy_backup_dir, Path): search_dirs.append(self.legacy_backup_dir) except Exception: pass try: app_backups = Path("/app/backups") if app_backups != self.backup_dir: search_dirs.append(app_backups) except Exception: pass deleted_fs = 0 for d in search_dirs: for fn in filenames: try: p = d / fn if p.exists(): # בדוק שיוך משתמש אם יש metadata.json try: with zipfile.ZipFile(p, 'r') as zf: md = None with suppress(Exception): raw = zf.read('metadata.json') md = json.loads(raw) if raw else None if md and md.get('user_id') is not None and md.get('user_id') != user_id: # שייך למשתמש אחר — דלג continue except Exception: pass p.unlink() deleted_fs += 1 except Exception as e: results["errors"].append(f"fs:{fn}:{e}") # מחיקה מ-GridFS (אם קיים) fs = None try: fs = self._get_gridfs() except Exception: fs = None if fs is not None: for bid, fn in zip(backup_ids, filenames): try: # שלוף מועמדים לפי filename/backup_id ללא סינון על user_id כדי לא לפספס legacy candidates = [] with suppress(Exception): candidates.extend(list(fs.find({"filename": fn}))) with suppress(Exception): candidates.extend(list(fs.find({"metadata.backup_id": bid}))) seen = set() for fdoc in candidates: try: if getattr(fdoc, '_id', None) in seen: continue seen.add(getattr(fdoc, '_id', None)) md = getattr(fdoc, 'metadata', None) or {} owner_ok = False # 1) metadata.user_id כמספר או מחרוזת uid_val = md.get('user_id') if isinstance(uid_val, int) and uid_val == user_id: owner_ok = True elif isinstance(uid_val, str) and uid_val.isdigit() and int(uid_val) == user_id: owner_ok = True # 2) גיבוי לפי דפוס backup_<user>_* בשם הקובץ if not owner_ok: try: base = os.path.splitext(str(getattr(fdoc, 'filename', '') or ''))[0] m = re.match(r"^backup_(\d+)_", base) if m and int(m.group(1)) == user_id: owner_ok = True except Exception: pass # 3) כמוצא אחרון: אם יש עותק מקומי — פתח וקרא metadata.json לאימות if not owner_ok: try: local_path = self.backup_dir / f"{bid}.zip" if not local_path.exists(): grid_out = fs.get(fdoc._id) with open(local_path, 'wb') as lf: lf.write(grid_out.read()) with zipfile.ZipFile(local_path, 'r') as zf: with suppress(Exception): raw = zf.read('metadata.json') md2 = json.loads(raw) if raw else {} u2 = md2.get('user_id') if (isinstance(u2, int) and u2 == user_id) or (isinstance(u2, str) and u2.isdigit() and int(u2) == user_id): owner_ok = True except Exception: pass if owner_ok: fs.delete(fdoc._id) results["deleted"] += 1 except Exception: continue except Exception as e: results["errors"].append(f"gridfs:{fn}:{e}") # אם אין GridFS — ספר מחיקות FS if fs is None: results["deleted"] += deleted_fs except Exception as e: results["errors"].append(str(e)) return results
[תיעוד] def delete_backup(self, backup_id: str, user_id: int) -> bool: """מחיקת גיבוי""" try: # חפש את הגיבוי בשתי התיקיות (ברירת מחדל + legacy) candidate_files: List[Path] = [] try: candidate_files.extend(list(self.backup_dir.glob(f"{backup_id}.zip"))) except Exception: pass try: if getattr(self, 'legacy_backup_dir', None) and self.legacy_backup_dir.exists(): candidate_files.extend(list(self.legacy_backup_dir.glob(f"{backup_id}.zip"))) except Exception: pass for backup_file in candidate_files: # וידוא שהגיבוי שייך למשתמש אם קיימת מטאדטה try: with zipfile.ZipFile(backup_file, 'r') as zip_file: try: metadata_content = zip_file.read("metadata.json") metadata = json.loads(metadata_content) if metadata.get("user_id") == user_id: backup_file.unlink() logger.info(f"נמחק גיבוי: {backup_id}") return True except Exception: # אין מטאדטה – דלג continue except Exception: continue logger.warning(f"גיבוי לא נמצא או לא שייך למשתמש: {backup_id}") return False except Exception as e: logger.error(f"שגיאה במחיקת גיבוי: {e}") return False
backup_manager = BackupManager()