הראה קוד מקור ל file_manager

from typing import Any, BinaryIO, Dict, List, Optional, Tuple, Set
import os
import tempfile
import zipfile
import json
from datetime import datetime, timezone
from pathlib import Path
import logging
from contextlib import suppress
import io
import re

try:
    import gridfs  # from pymongo
except Exception:  # pragma: no cover
    gridfs = None  # type: ignore[assignment]

logger = logging.getLogger(__name__)

[תיעוד] class BackupInfo: """מידע על גיבוי"""
[תיעוד] def __init__(self, backup_id: str, user_id: int, created_at: datetime, file_count: int, total_size: int, backup_type: str, status: str, file_path: str, repo: Optional[str], path: Optional[str], metadata: Optional[Dict[str, Any]]): self.backup_id = backup_id self.user_id = user_id self.created_at = created_at self.file_count = file_count self.total_size = total_size self.backup_type = backup_type self.status = status self.file_path = file_path self.repo = repo self.path = path self.metadata = metadata
[תיעוד] class BackupManager: """מנהל גיבויים"""
[תיעוד] def __init__(self): # מצב אחסון: mongo (GridFS) או fs (קבצים) self.storage_mode = os.getenv("BACKUPS_STORAGE", "mongo").strip().lower() # העדף תיקייה מתמשכת עבור גיבויים (נשמרת בין דיפלויים אם קיימת) # נסה לפי סדר: BACKUPS_DIR מהסביבה → /app/backups → /data/backups → /var/lib/code_keeper/backups persistent_candidates = [ os.getenv("BACKUPS_DIR"), "/app/backups", "/data/backups", "/var/lib/code_keeper/backups", ] chosen_dir: Optional[Path] = None for cand in persistent_candidates: if not cand: continue try: p = Path(cand) p.mkdir(parents=True, exist_ok=True) # וידוא שניתן לכתוב test_file = p / ".write_test" try: with open(test_file, "w") as tf: tf.write("ok") test_file.unlink(missing_ok=True) # type: ignore[arg-type] except Exception: # אם אי אפשר לכתוב – נסה מועמד הבא continue chosen_dir = p break except Exception: continue if chosen_dir is None: # נפילה לתיקיית temp אם אין נתיב מתמשך זמין chosen_dir = Path(tempfile.gettempdir()) / "code_keeper_backups" chosen_dir.mkdir(exist_ok=True) self.backup_dir = chosen_dir # תיקיית legacy: תמיכה בקריאה גם מהמיקום הישן (אם השתמש בעבר ב-temp) # נשמור גם על תמיכה ב-"/app/backups" כנתיב חיפוש נוסף אם לא נבחר כבר legacy_candidates: List[Path] = [] try: legacy_candidates.append(Path(tempfile.gettempdir()) / "code_keeper_backups") except Exception: pass try: app_backups = Path("/app/backups") if app_backups != self.backup_dir: legacy_candidates.append(app_backups) except Exception: pass # שמור נתיב legacy ראשי למטרות תאימות (ישומש בחיפוש) self.legacy_backup_dir = legacy_candidates[0] if legacy_candidates else None self.max_backup_size = 100 * 1024 * 1024 # 100MB
# ============================= # GridFS helpers (Mongo storage) # ============================= def _get_gridfs(self): if self.storage_mode != "mongo": return None if gridfs is None: return None try: # שימוש במסד הנתונים הגלובלי הקיים from database import db as global_db mongo_db = getattr(global_db, "db", None) if not mongo_db: return None # אוסף ייעודי "backups" return gridfs.GridFS(mongo_db, collection="backups") except Exception: return None
[תיעוד] def save_backup_bytes(self, data: bytes, metadata: Dict[str, Any]) -> Optional[str]: """שומר ZIP של גיבוי בהתאם למצב האחסון ומחזיר backup_id או None במקרה כשל. אם storage==mongo: שומר ל-GridFS עם המטאדטה. אם storage==fs: שומר לקובץ תחת backup_dir. """ try: backup_id = metadata.get("backup_id") or f"backup_{int(datetime.now(timezone.utc).timestamp())}" # נסה להטמיע/לעדכן metadata.json בתוך ה-ZIP כך שיכלול לפחות backup_id ו-user_id אם סופק try: merged_bytes = data with zipfile.ZipFile(io.BytesIO(data), 'r') as zin: # קרא מטאדטה קיימת אם יש existing_md: Dict[str, Any] = {} with suppress(Exception): raw = zin.read('metadata.json') try: existing_md = json.loads(raw) except Exception: try: text = raw.decode('utf-8', errors='ignore') # פענוח מינימלי existing_md = {} bid_m = re.search(r'"backup_id"\s*:\s*"([^"]+)"', text) uid_m = re.search(r'"user_id"\s*:\s*(\d+)', text) if bid_m: existing_md['backup_id'] = bid_m.group(1) if uid_m: existing_md['user_id'] = int(uid_m.group(1)) except Exception: existing_md = {} # מטאדטה הסופית — metadata הנכנסת גוברת final_md = dict(existing_md) final_md.update(metadata or {}) # ודא ש-backup_id קיים בתוצאה final_md['backup_id'] = final_md.get('backup_id') or backup_id # בנה ZIP חדש עם metadata.json מעודכן out = io.BytesIO() with zipfile.ZipFile(out, 'w', compression=zipfile.ZIP_DEFLATED) as zout: for name in zin.namelist(): if name == 'metadata.json' or name.endswith('/'): continue try: zout.writestr(name, zin.read(name)) except Exception: continue # כתוב metadata.json מעודכן try: zout.writestr('metadata.json', json.dumps(final_md, indent=2)) except Exception: # fallback בלי indent zout.writestr('metadata.json', json.dumps(final_md)) merged_bytes = out.getvalue() data = merged_bytes # עדכן backup_id אם הוכנס ב-final_md backup_id = (final_md.get('backup_id') or backup_id) except Exception: # אם לא הצלחנו לטפל — המשך עם הנתונים המקוריים pass # הבטח זיהוי בקובץ filename = f"{backup_id}.zip" if self.storage_mode == "mongo": fs = self._get_gridfs() if fs is None: # נפילה לאחסון קבצים target_path = self.backup_dir / filename with open(target_path, "wb") as f: f.write(data) return backup_id # שמור ל-GridFS # אם כבר קיים אותו backup_id – מחק ישן with suppress(Exception): for fdoc in fs.find({"filename": filename}): fs.delete(fdoc._id) fs.put(data, filename=filename, metadata=metadata) return backup_id # ברירת מחדל: קבצים target_path = self.backup_dir / filename with open(target_path, "wb") as f: f.write(data) return backup_id except Exception as e: logger.warning(f"save_backup_bytes failed: {e}") return None
[תיעוד] def save_backup_file(self, file_path: str) -> Optional[str]: """שומר קובץ ZIP קיים לאחסון היעד (Mongo/FS) ומחזיר backup_id אם הצליח.""" try: # נסה לקרוא metadata.json מתוך ה-ZIP metadata: Dict[str, Any] = {} try: with zipfile.ZipFile(file_path, 'r') as zf: with suppress(Exception): md_raw = zf.read('metadata.json') metadata = json.loads(md_raw) if md_raw else {} except Exception: metadata = {} if "backup_id" not in metadata: # הפק מזהה מגיבוי metadata["backup_id"] = os.path.splitext(os.path.basename(file_path))[0] with open(file_path, 'rb') as f: data = f.read() return self.save_backup_bytes(data, metadata) except Exception as e: logger.warning(f"save_backup_file failed: {e}") return None
[תיעוד] def list_backups(self, user_id: int) -> List[BackupInfo]: """מחזירה רשימת קבצי ZIP ששייכים למשתמש המבקש בלבד. כל פריט חייב להיות מסווג כשייך ל-user_id דרך אחד מהבאים: - metadata.json בתוך ה-ZIP עם שדה user_id תואם - דפוס מזהה בשם: backup_<user_id>_* ZIPים ללא שיוך ברור למשתמש לא ייכללו כדי למנוע זליגת מידע. """ backups: List[BackupInfo] = [] try: # עבור על כל קובצי ה‑ZIP בכל התיקיות הרלוונטיות (ראשית + legacy/migration) search_dirs: List[Path] = [self.backup_dir] # הוסף נתיבי legacy נוספים אם זמינים extra_legacy: List[Path] = [] try: if getattr(self, "legacy_backup_dir", None): if isinstance(self.legacy_backup_dir, Path) and self.legacy_backup_dir.exists(): extra_legacy.append(self.legacy_backup_dir) except Exception: pass # ודא ש-/app/backups ייסרק גם אם הוא אינו ה-backup_dir try: app_backups = Path("/app/backups") if app_backups.exists() and app_backups != self.backup_dir: extra_legacy.append(app_backups) except Exception: pass for d in extra_legacy: if d not in search_dirs: search_dirs.append(d) seen_paths: Set[str] = set() # קבצים בדיסק — מציגים רק קבצים ששייכים למשתמש for _dir in search_dirs: for backup_file in _dir.glob("*.zip"): try: resolved_path = str(backup_file.resolve()) except Exception: resolved_path = str(backup_file) if resolved_path in seen_paths: continue seen_paths.add(resolved_path) try: # ערכי ברירת מחדל metadata: Optional[Dict[str, Any]] = None backup_id: str = os.path.splitext(os.path.basename(backup_file))[0] owner_user_id: Optional[int] = None created_at: Optional[datetime] = None file_count: int = 0 backup_type: str = "unknown" repo: Optional[str] = None path: Optional[str] = None with zipfile.ZipFile(backup_file, 'r') as zf: # נסה לקרוא metadata.json, אם קיים try: metadata_content = zf.read("metadata.json") try: # json.loads תומך ב-bytes; אם ייכשל ננסה דקדוק חלופי metadata = json.loads(metadata_content) except Exception: # fallback: ננסה לפענח כמחרוזת ולחלץ שדות בסיסיים ב-regex try: text = metadata_content.decode('utf-8', errors='ignore') except Exception: text = str(metadata_content) metadata = {} # backup_id try: m_bid = re.search(r'"backup_id"\s*:\s*"([^"]+)"', text) if m_bid: metadata["backup_id"] = m_bid.group(1) except Exception: pass # user_id try: m_uid = re.search(r'"user_id"\s*:\s*(\d+)', text) if m_uid: metadata["user_id"] = int(m_uid.group(1)) except Exception: pass # created_at try: m_cat = re.search(r'"created_at"\s*:\s*"([^"]+)"', text) if m_cat: metadata["created_at"] = m_cat.group(1) except Exception: pass except Exception: metadata = None # קבע בעלים של ה-ZIP מתוך metadata אם קיים if metadata is not None: try: uid_val = metadata.get("user_id") if isinstance(uid_val, str) and uid_val.isdigit(): owner_user_id = int(uid_val) elif isinstance(uid_val, int): owner_user_id = uid_val except Exception: owner_user_id = None # שלוף נתונים מהמטאדטה אם קיימת if metadata is not None: backup_id = metadata.get("backup_id") or backup_id created_at_str = metadata.get("created_at") if created_at_str: try: created_at = datetime.fromisoformat(created_at_str) # נרמל ל-aware TZ אם חסר if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) except Exception: created_at = None fc_meta = metadata.get("file_count") if isinstance(fc_meta, int): file_count = fc_meta backup_type = metadata.get("backup_type", "unknown") repo = metadata.get("repo") path = metadata.get("path") else: # ZIP כללי ללא מטאדטה backup_type = "generic_zip" # אם אין owner במטאדטה — נסה להסיק משם הקובץ: backup_<user>_* if owner_user_id is None: try: m = re.match(r"^backup_(\d+)_", backup_id) if m: owner_user_id = int(m.group(1)) except Exception: owner_user_id = None # סינון: הצג רק אם שייך למשתמש המבקש if owner_user_id is None or owner_user_id != user_id: # לא שייך למשתמש — דלג continue # אם אין created_at – נפל ל‑mtime של הקובץ if not created_at: try: created_at = datetime.fromtimestamp(os.path.getmtime(resolved_path), tz=timezone.utc) except Exception: created_at = datetime.now(timezone.utc) # אם אין file_count – מנה את הקבצים שאינם תיקיות if file_count == 0: try: with zipfile.ZipFile(resolved_path, 'r') as _zf_count: non_dirs = [n for n in _zf_count.namelist() if not n.endswith('/')] file_count = len(non_dirs) except Exception: file_count = 0 backup_info = BackupInfo( backup_id=backup_id, user_id=owner_user_id if owner_user_id is not None else user_id, created_at=created_at, file_count=file_count, total_size=os.path.getsize(resolved_path), backup_type=backup_type, status="completed", file_path=resolved_path, repo=repo, path=path, metadata=metadata, ) backups.append(backup_info) except Exception as e: logger.warning(f"שגיאה בקריאת גיבוי {backup_file}: {e}") continue # קבצים ב-GridFS (Mongo) – נטען רק של המשתמש try: fs = self._get_gridfs() if fs is not None: # טען את כל הפריטים ובדוק בעלות בקוד כדי לכלול גם legacy ללא metadata.user_id cursor = fs.find() for fdoc in cursor: try: md = getattr(fdoc, 'metadata', None) or {} # קבע backup_id מוקדם לשימושים שונים backup_id = md.get("backup_id") or os.path.splitext(fdoc.filename or "")[0] or str(getattr(fdoc, "_id", "")) if not backup_id: continue if any(b.backup_id == backup_id for b in backups): # כבר קיים מתוך הדיסק continue total_size = int(getattr(fdoc, 'length', 0) or 0) # זיהוי בעלות: metadata.user_id → דפוס בשם → metadata.json מתוך ה-ZIP owner_user_id = None try: uid_val = md.get("user_id") if isinstance(uid_val, str) and uid_val.isdigit(): owner_user_id = int(uid_val) elif isinstance(uid_val, int): owner_user_id = uid_val except Exception: owner_user_id = None if owner_user_id is None: try: m = re.match(r"^backup_(\d+)_", backup_id) if m: owner_user_id = int(m.group(1)) except Exception: owner_user_id = None # אם עדיין לא ידוע — קרא metadata.json מתוך ה-ZIP המקומי local_path = self.backup_dir / f"{backup_id}.zip" if owner_user_id is None: try: if not local_path.exists() or (total_size and local_path.stat().st_size != total_size): grid_out = fs.get(fdoc._id) with open(local_path, 'wb') as lf: lf.write(grid_out.read()) with zipfile.ZipFile(local_path, 'r') as zf: with suppress(Exception): raw = zf.read('metadata.json') md2 = json.loads(raw) if raw else {} u2 = md2.get('user_id') if isinstance(u2, int): owner_user_id = u2 elif isinstance(u2, str) and u2.isdigit(): owner_user_id = int(u2) except Exception: pass # חסום פריטים שאינם שייכים למשתמש if owner_user_id != user_id: continue # מטא נוספים created_at = None created_at_str = md.get("created_at") if created_at_str: with suppress(Exception): created_at = datetime.fromisoformat(created_at_str) if created_at and created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) if not created_at: with suppress(Exception): created_at = getattr(fdoc, 'uploadDate', None) file_count = int(md.get("file_count") or 0) backup_type = md.get("backup_type", "unknown") repo = md.get("repo") path = md.get("path") # ודא עותק מקומי זמני קיים לשימוש בהורדה/שחזור if not local_path.exists() or (total_size and local_path.stat().st_size != total_size): try: grid_out = fs.get(fdoc._id) with open(local_path, 'wb') as lf: lf.write(grid_out.read()) except Exception: # אם נכשל יצירת עותק – דלג והמשך (לא נציג פריט לא שמיש) continue backup_info = BackupInfo( backup_id=backup_id, user_id=owner_user_id if owner_user_id is not None else user_id, created_at=created_at or datetime.now(timezone.utc), file_count=file_count, total_size=total_size or (local_path.stat().st_size if local_path.exists() else 0), backup_type=backup_type, status="completed", file_path=str(local_path), repo=repo, path=path, metadata=md, ) backups.append(backup_info) except Exception: continue except Exception: pass # מיון לפי תאריך יצירה backups.sort(key=lambda x: x.created_at, reverse=True) except Exception as e: logger.error(f"שגיאה ברשימת גיבויים: {e}") return backups
[תיעוד] def restore_from_backup(self, user_id: int, backup_path: str, overwrite: bool = True, purge: bool = False, extra_tags: Optional[List[str]] = None) -> Dict[str, Any]: """משחזר קבצים מ-ZIP למסד הנתונים. - purge=True: מסמן את כל הקבצים הקיימים של המשתמש כלא פעילים לפני השחזור - overwrite=True: שמירה תמיד כגרסה חדשה עבור אותו שם (כברירת מחדל) החזרה: dict עם restored_files ו-errors """ results: Dict[str, Any] = {"restored_files": 0, "errors": []} try: import zipfile from database import db from utils import detect_language_from_filename # פרה-תנאי if not os.path.exists(backup_path): results["errors"].append(f"backup file not found: {backup_path}") return results if purge: try: existing = db.get_user_files(user_id, limit=10000) or [] for doc in existing: try: fname = doc.get('file_name') if fname: db.delete_file(user_id, fname) except Exception as e: results["errors"].append(f"purge failed for {doc.get('file_name')}: {e}") except Exception as e: results["errors"].append(f"purge listing failed: {e}") with zipfile.ZipFile(backup_path, 'r') as zf: names = [n for n in zf.namelist() if not n.endswith('/') and n != 'metadata.json'] for name in names: try: raw = zf.read(name) text: str try: text = raw.decode('utf-8') except Exception: try: text = raw.decode('latin-1') except Exception as e: results["errors"].append(f"decode failed for {name}: {e}") continue lang = detect_language_from_filename(name) # אם יש תגית repo:* — הוסף אותה רק עבור קבצים שנמצאים בשורש הריפו או תחת נתיב התואם לריפו filtered_extra = list(extra_tags or []) try: repo_tags = [t for t in filtered_extra if isinstance(t, str) and t.strip().lower().startswith('repo:')] # כלל זהיר: שמור את תג ה-repo רק אם הנתיב אינו כולל סלאשים רבים/או שהקובץ בשם שאינו גורף (index.html יכול להופיע בכל מקום) — # בפשטות: תמיד נאפשר, אבל נוסיף רק את תג ה-repo האחרון (אם קיים) והיתר נסנן למעלה בשכבת repo.save_file if repo_tags: filtered_extra = [repo_tags[-1]] + [t for t in filtered_extra if not (isinstance(t, str) and t.strip().lower().startswith('repo:'))] except Exception: pass ok = db.save_file(user_id=user_id, file_name=name, code=text, programming_language=lang, extra_tags=filtered_extra) if ok: results["restored_files"] += 1 else: results["errors"].append(f"save failed for {name}") except Exception as e: results["errors"].append(f"restore failed for {name}: {e}") except Exception as e: results["errors"].append(str(e)) return results
[תיעוד] def delete_backups(self, user_id: int, backup_ids: List[str]) -> Dict[str, Any]: """מוחק מספר גיבויי ZIP לפי backup_id ממערכת הקבצים ומ-GridFS (אם בשימוש). החזרה: {"deleted": int, "errors": [str, ...]} """ results: Dict[str, Any] = {"deleted": 0, "errors": []} try: if not backup_ids: return results filenames = [f"{bid}.zip" for bid in backup_ids] # מחיקה ממערכת הקבצים (כולל נתיבי legacy) search_dirs: List[Path] = [self.backup_dir] try: if getattr(self, "legacy_backup_dir", None): if isinstance(self.legacy_backup_dir, Path): search_dirs.append(self.legacy_backup_dir) except Exception: pass try: app_backups = Path("/app/backups") if app_backups != self.backup_dir: search_dirs.append(app_backups) except Exception: pass deleted_fs = 0 for d in search_dirs: for fn in filenames: try: p = d / fn if p.exists(): # בדוק שיוך משתמש אם יש metadata.json try: with zipfile.ZipFile(p, 'r') as zf: md = None with suppress(Exception): raw = zf.read('metadata.json') md = json.loads(raw) if raw else None if md and md.get('user_id') is not None and md.get('user_id') != user_id: # שייך למשתמש אחר — דלג continue except Exception: pass p.unlink() deleted_fs += 1 except Exception as e: results["errors"].append(f"fs:{fn}:{e}") # מחיקה מ-GridFS (אם קיים) fs = None try: fs = self._get_gridfs() except Exception: fs = None if fs is not None: for bid, fn in zip(backup_ids, filenames): try: # שלוף מועמדים לפי filename/backup_id ללא סינון על user_id כדי לא לפספס legacy candidates = [] with suppress(Exception): candidates.extend(list(fs.find({"filename": fn}))) with suppress(Exception): candidates.extend(list(fs.find({"metadata.backup_id": bid}))) seen = set() for fdoc in candidates: try: if getattr(fdoc, '_id', None) in seen: continue seen.add(getattr(fdoc, '_id', None)) md = getattr(fdoc, 'metadata', None) or {} owner_ok = False # 1) metadata.user_id כמספר או מחרוזת uid_val = md.get('user_id') if isinstance(uid_val, int) and uid_val == user_id: owner_ok = True elif isinstance(uid_val, str) and uid_val.isdigit() and int(uid_val) == user_id: owner_ok = True # 2) גיבוי לפי דפוס backup_<user>_* בשם הקובץ if not owner_ok: try: base = os.path.splitext(str(getattr(fdoc, 'filename', '') or ''))[0] m = re.match(r"^backup_(\d+)_", base) if m and int(m.group(1)) == user_id: owner_ok = True except Exception: pass # 3) כמוצא אחרון: אם יש עותק מקומי — פתח וקרא metadata.json לאימות if not owner_ok: try: local_path = self.backup_dir / f"{bid}.zip" if not local_path.exists(): grid_out = fs.get(fdoc._id) with open(local_path, 'wb') as lf: lf.write(grid_out.read()) with zipfile.ZipFile(local_path, 'r') as zf: with suppress(Exception): raw = zf.read('metadata.json') md2 = json.loads(raw) if raw else {} u2 = md2.get('user_id') if (isinstance(u2, int) and u2 == user_id) or (isinstance(u2, str) and u2.isdigit() and int(u2) == user_id): owner_ok = True except Exception: pass if owner_ok: fs.delete(fdoc._id) results["deleted"] += 1 except Exception: continue except Exception as e: results["errors"].append(f"gridfs:{fn}:{e}") # אם אין GridFS — ספר מחיקות FS if fs is None: results["deleted"] += deleted_fs except Exception as e: results["errors"].append(str(e)) return results
[תיעוד] def delete_backup(self, backup_id: str, user_id: int) -> bool: """מחיקת גיבוי""" try: # חפש את הגיבוי בשתי התיקיות (ברירת מחדל + legacy) candidate_files: List[Path] = [] try: candidate_files.extend(list(self.backup_dir.glob(f"{backup_id}.zip"))) except Exception: pass try: if getattr(self, 'legacy_backup_dir', None) and self.legacy_backup_dir.exists(): candidate_files.extend(list(self.legacy_backup_dir.glob(f"{backup_id}.zip"))) except Exception: pass for backup_file in candidate_files: # וידוא שהגיבוי שייך למשתמש אם קיימת מטאדטה try: with zipfile.ZipFile(backup_file, 'r') as zip_file: try: metadata_content = zip_file.read("metadata.json") metadata = json.loads(metadata_content) if metadata.get("user_id") == user_id: backup_file.unlink() logger.info(f"נמחק גיבוי: {backup_id}") return True except Exception: # אין מטאדטה – דלג continue except Exception: continue logger.warning(f"גיבוי לא נמצא או לא שייך למשתמש: {backup_id}") return False except Exception as e: logger.error(f"שגיאה במחיקת גיבוי: {e}") return False
backup_manager = BackupManager()