import logging
import os
from types import SimpleNamespace
from datetime import timezone
from typing import Any, Dict, List, Optional, Tuple
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING, TEXT
from config import config
logger = logging.getLogger(__name__)
[תיעוד]
class DatabaseManager:
"""אחראי על חיבור MongoDB והגדרת אינדקסים."""
[תיעוד]
def __init__(self):
self.client = None
self.db = None
self.collection = None
self.large_files_collection = None
self.backup_ratings_collection = None
self.internal_shares_collection = None
self._repo = None
self.connect()
[תיעוד]
def connect(self):
# Docs build / CI: אפשר לנטרל חיבור למסד כדי למנוע שגיאות בזמן בניית דוקס
disable_db = str(os.getenv("DISABLE_DB", "")).lower() in {"1", "true", "yes"} or \
str(os.getenv("SPHINX_MOCK_IMPORTS", "")).lower() in {"1", "true", "yes"}
def _init_noop_collections():
class NoOpCollection:
def insert_one(self, *args, **kwargs):
return SimpleNamespace(inserted_id=None)
def update_one(self, *args, **kwargs):
return SimpleNamespace(acknowledged=True, modified_count=0)
def find_one(self, *args, **kwargs):
return None
def aggregate(self, *args, **kwargs):
return []
def create_indexes(self, *args, **kwargs):
return None
def list_indexes(self, *args, **kwargs):
return []
def drop_index(self, *args, **kwargs):
return None
def find(self, *args, **kwargs):
return []
self.client = None
self.db = SimpleNamespace(users=NoOpCollection())
self.collection = NoOpCollection()
self.large_files_collection = NoOpCollection()
self.backup_ratings_collection = NoOpCollection()
logger.info("DB disabled (docs/CI mode) — using no-op collections")
if disable_db:
_init_noop_collections()
return
try:
self.client = MongoClient(
config.MONGODB_URL,
maxPoolSize=50,
minPoolSize=5,
maxIdleTimeMS=30000,
waitQueueTimeoutMS=5000,
serverSelectionTimeoutMS=3000,
socketTimeoutMS=20000,
connectTimeoutMS=10000,
retryWrites=True,
retryReads=True,
tz_aware=True,
tzinfo=timezone.utc,
)
self.db = self.client[config.DATABASE_NAME]
self.collection = self.db.code_snippets
self.large_files_collection = self.db.large_files
self.backup_ratings_collection = self.db.backup_ratings
self.internal_shares_collection = self.db.internal_shares
self.client.admin.command('ping')
self._create_indexes()
logger.info("התחברות למסד הנתונים הצליחה עם Connection Pooling מתקדם")
except Exception as e:
if disable_db:
_init_noop_collections()
logger.warning("DB connection failed; falling back to no-op collections for docs/CI")
return
logger.error(f"שגיאה בהתחברות למסד הנתונים: {e}")
raise
# --- Lazy repository accessor to avoid circular imports ---
def _get_repo(self):
if self._repo is None:
from .repository import Repository # local import to avoid circular dependency
self._repo = Repository(self)
return self._repo
def _create_indexes(self):
indexes = [
IndexModel([("user_id", ASCENDING)]),
IndexModel([("file_name", ASCENDING)]),
IndexModel([("programming_language", ASCENDING)]),
IndexModel([("tags", ASCENDING)]),
IndexModel([("created_at", DESCENDING)]),
IndexModel([("user_id", ASCENDING), ("file_name", ASCENDING), ("version", DESCENDING)]),
# אינדקס משופר לתמיכה במיון file_name, version לאחר match על user_id,is_active
IndexModel([
("user_id", ASCENDING),
("is_active", ASCENDING),
("file_name", ASCENDING),
("version", DESCENDING),
], name="user_active_file_latest_idx"),
IndexModel([
("user_id", ASCENDING),
("programming_language", ASCENDING),
("created_at", DESCENDING),
], name="user_lang_date_idx"),
IndexModel([
("user_id", ASCENDING),
("tags", ASCENDING),
("updated_at", DESCENDING),
], name="user_tags_updated_idx"),
IndexModel([
("user_id", ASCENDING),
("is_active", ASCENDING),
("programming_language", ASCENDING),
], name="user_active_lang_idx"),
IndexModel([
("user_id", ASCENDING),
("is_active", ASCENDING),
("updated_at", DESCENDING),
], name="user_active_recent_idx"),
IndexModel([
("programming_language", ASCENDING),
("tags", ASCENDING),
("created_at", DESCENDING),
], name="lang_tags_date_idx"),
IndexModel([("code", TEXT), ("description", TEXT), ("file_name", TEXT)], name="full_text_search_idx"),
]
large_files_indexes = [
IndexModel([("user_id", ASCENDING)]),
IndexModel([("file_name", ASCENDING)]),
IndexModel([("programming_language", ASCENDING)]),
IndexModel([("created_at", DESCENDING)]),
IndexModel([("file_size", ASCENDING)]),
IndexModel([("lines_count", ASCENDING)]),
IndexModel([("user_id", ASCENDING), ("file_name", ASCENDING)]),
IndexModel([
("user_id", ASCENDING),
("programming_language", ASCENDING),
("file_size", ASCENDING),
], name="user_lang_size_idx"),
IndexModel([
("user_id", ASCENDING),
("is_active", ASCENDING),
("created_at", DESCENDING),
], name="user_active_date_large_idx"),
IndexModel([
("programming_language", ASCENDING),
("file_size", ASCENDING),
("lines_count", ASCENDING),
], name="lang_size_lines_idx"),
IndexModel([
("user_id", ASCENDING),
("tags", ASCENDING),
("file_size", DESCENDING),
], name="user_tags_size_idx"),
]
# backup_ratings indexes
backup_ratings_indexes = [
IndexModel([("user_id", ASCENDING), ("backup_id", ASCENDING)], name="user_backup_unique", unique=True),
IndexModel([("created_at", DESCENDING)], name="created_at_desc"),
]
try:
self.collection.create_indexes(indexes)
self.large_files_collection.create_indexes(large_files_indexes)
if self.backup_ratings_collection is not None:
self.backup_ratings_collection.create_indexes(backup_ratings_indexes)
# אינדקסים לשיתופים פנימיים: TTL על expires_at + אינדקסים לשימוש
if self.internal_shares_collection is not None:
internal_shares_indexes = [
IndexModel([("share_id", ASCENDING)], name="share_id_unique", unique=True),
IndexModel([("created_at", DESCENDING)], name="created_at_desc"),
# TTL על expires_at (Date). אם יישמר כמחרוזת ISO, לא יעבוד — נוודא Date בצד הכותב
IndexModel([("expires_at", ASCENDING)], name="expires_ttl", expireAfterSeconds=0),
]
self.internal_shares_collection.create_indexes(internal_shares_indexes)
except Exception as e:
msg = str(e)
if 'IndexOptionsConflict' in msg or 'already exists with a different name' in msg:
try:
existing = list(self.collection.list_indexes())
for idx in existing:
name = idx.get('name', '')
is_text_index = ('textIndexVersion' in idx) or name.endswith('_text')
if (
is_text_index or
name in {
'user_lang_date_idx',
'user_tags_updated_idx',
'user_active_lang_idx',
'user_active_recent_idx',
'lang_tags_date_idx',
'full_text_search_idx'
} or
name.startswith('user_id_') or name.startswith('file_name_')
):
try:
self.collection.drop_index(name)
except Exception:
pass
try:
self.collection.drop_index([('code', 'text'), ('description', 'text'), ('file_name', 'text')])
except Exception:
pass
self.collection.create_indexes(indexes)
self.large_files_collection.create_indexes(large_files_indexes)
if self.backup_ratings_collection is not None:
self.backup_ratings_collection.create_indexes(backup_ratings_indexes)
except Exception:
logger.warning("נכשל עדכון אינדקסים לאחר קונפליקט")
else:
logger.warning(f"שגיאה ביצירת אינדקסים: {e}")
[תיעוד]
def close(self):
if self.client:
self.client.close()
[תיעוד]
def close_connection(self):
self.close()
# --- Backward-compatible CRUD API delegating to Repository ---
# התאמות שמיות כדי להתאים לדוקס הישנים: שמרנו שמות מתודות היסטוריים
# שממפות למימושים בפועל ב-Repository.
# --- Aliases for "snippet" nomenclature ---
[תיעוד]
def save_snippet(self, snippet) -> bool:
return self._get_repo().save_code_snippet(snippet)
[תיעוד]
def search_snippets(self, user_id: int, search_term: str = "", programming_language: str = None, tags: List[str] = None, limit: int = 20) -> List[Dict]:
return self._get_repo().search_code(user_id, search_term, programming_language, tags, limit)
[תיעוד]
def get_snippet(self, user_id: int, file_name: str) -> Optional[Dict]:
return self._get_repo().get_file(user_id, file_name)
[תיעוד]
def get_user_snippets(self, user_id: int, limit: int = 50) -> List[Dict]:
return self._get_repo().get_user_files(user_id, limit)
[תיעוד]
def delete_snippet(self, user_id: int, file_name: str) -> bool:
return self._get_repo().delete_file(user_id, file_name)
[תיעוד]
def delete_all_user_snippets(self, user_id: int) -> int:
# מממש כמחיקה רכה של כל הקבצים הפעילים של המשתמש
try:
files = [doc.get('file_name') for doc in (self._get_repo().get_user_files(user_id, limit=1000) or []) if isinstance(doc, dict)]
if not files:
return 0
return int(self._get_repo().soft_delete_files_by_names(user_id, files) or 0)
except Exception:
return 0
[תיעוד]
def get_user_statistics(self, user_id: int) -> Dict[str, Any]:
return self._get_repo().get_user_stats(user_id)
[תיעוד]
def get_global_statistics(self) -> Dict[str, Any]:
# מימוש בסיסי: אגרגציה גלובלית על כל הקבצים הפעילים
try:
pipeline = [
{"$match": {"is_active": True}},
{"$group": {
"_id": None,
"total_files": {"$sum": 1},
"languages": {"$addToSet": "$programming_language"},
}},
]
res = list(self.collection.aggregate(pipeline, allowDiskUse=True)) if self.collection else []
if res:
out = dict(res[0])
out.pop('_id', None)
return out
return {"total_files": 0, "languages": []}
except Exception:
return {"total_files": 0, "languages": []}
[תיעוד]
def save_code_snippet(self, snippet) -> bool:
return self._get_repo().save_code_snippet(snippet)
[תיעוד]
def save_file(self, user_id: int, file_name: str, code: str, programming_language: str, extra_tags: List[str] = None) -> bool:
return self._get_repo().save_file(user_id, file_name, code, programming_language, extra_tags)
[תיעוד]
def get_latest_version(self, user_id: int, file_name: str) -> Optional[Dict]:
return self._get_repo().get_latest_version(user_id, file_name)
[תיעוד]
def get_file(self, user_id: int, file_name: str) -> Optional[Dict]:
return self._get_repo().get_file(user_id, file_name)
[תיעוד]
def get_all_versions(self, user_id: int, file_name: str) -> List[Dict]:
return self._get_repo().get_all_versions(user_id, file_name)
[תיעוד]
def get_version(self, user_id: int, file_name: str, version: int) -> Optional[Dict]:
return self._get_repo().get_version(user_id, file_name, version)
[תיעוד]
def get_user_files(self, user_id: int, limit: int = 50) -> List[Dict]:
return self._get_repo().get_user_files(user_id, limit)
[תיעוד]
def search_code(self, user_id: int, query: str, programming_language: str = None, tags: List[str] = None, limit: int = 20) -> List[Dict]:
return self._get_repo().search_code(user_id, query, programming_language, tags, limit)
[תיעוד]
def get_user_files_by_repo(self, user_id: int, repo_tag: str, page: int = 1, per_page: int = 50) -> Tuple[List[Dict], int]:
return self._get_repo().get_user_files_by_repo(user_id, repo_tag, page, per_page)
[תיעוד]
def delete_file(self, user_id: int, file_name: str) -> bool:
return self._get_repo().delete_file(user_id, file_name)
[תיעוד]
def soft_delete_files_by_names(self, user_id: int, file_names: List[str]) -> int:
return self._get_repo().soft_delete_files_by_names(user_id, file_names)
[תיעוד]
def delete_file_by_id(self, file_id: str) -> int:
return self._get_repo().delete_file_by_id(file_id)
[תיעוד]
def get_file_by_id(self, file_id: str) -> Optional[Dict]:
return self._get_repo().get_file_by_id(file_id)
[תיעוד]
def get_user_stats(self, user_id: int) -> Dict[str, Any]:
return self._get_repo().get_user_stats(user_id)
[תיעוד]
def rename_file(self, user_id: int, old_name: str, new_name: str) -> bool:
return self._get_repo().rename_file(user_id, old_name, new_name)
# Large files API
[תיעוד]
def save_large_file(self, large_file) -> bool:
return self._get_repo().save_large_file(large_file)
[תיעוד]
def get_large_file(self, user_id: int, file_name: str) -> Optional[Dict]:
return self._get_repo().get_large_file(user_id, file_name)
[תיעוד]
def get_large_file_by_id(self, file_id: str) -> Optional[Dict]:
return self._get_repo().get_large_file_by_id(file_id)
[תיעוד]
def get_user_large_files(self, user_id: int, page: int = 1, per_page: int = 8) -> Tuple[List[Dict], int]:
return self._get_repo().get_user_large_files(user_id, page, per_page)
[תיעוד]
def delete_large_file(self, user_id: int, file_name: str) -> bool:
return self._get_repo().delete_large_file(user_id, file_name)
[תיעוד]
def delete_large_file_by_id(self, file_id: str) -> bool:
return self._get_repo().delete_large_file_by_id(file_id)
[תיעוד]
def get_all_user_files_combined(self, user_id: int) -> Dict[str, List[Dict]]:
return self._get_repo().get_all_user_files_combined(user_id)
# Backup ratings API
[תיעוד]
def save_backup_rating(self, user_id: int, backup_id: str, rating: str) -> bool:
return self._get_repo().save_backup_rating(user_id, backup_id, rating)
[תיעוד]
def get_backup_rating(self, user_id: int, backup_id: str) -> Optional[str]:
return self._get_repo().get_backup_rating(user_id, backup_id)
[תיעוד]
def delete_backup_ratings(self, user_id: int, backup_ids: List[str]) -> int:
return self._get_repo().delete_backup_ratings(user_id, backup_ids)
# Backup notes API (מאוחסן יחד עם דירוגים באותה קולקציה)
[תיעוד]
def save_backup_note(self, user_id: int, backup_id: str, note: str) -> bool:
return self._get_repo().save_backup_note(user_id, backup_id, note)
[תיעוד]
def get_backup_note(self, user_id: int, backup_id: str) -> Optional[str]:
return self._get_repo().get_backup_note(user_id, backup_id)
# Users and tokens
[תיעוד]
def save_github_token(self, user_id: int, token: str) -> bool:
return self._get_repo().save_github_token(user_id, token)
[תיעוד]
def get_github_token(self, user_id: int) -> Optional[str]:
return self._get_repo().get_github_token(user_id)
[תיעוד]
def delete_github_token(self, user_id: int) -> bool:
return self._get_repo().delete_github_token(user_id)
[תיעוד]
def save_selected_repo(self, user_id: int, repo_name: str) -> bool:
return self._get_repo().save_selected_repo(user_id, repo_name)
[תיעוד]
def get_selected_repo(self, user_id: int) -> Optional[str]:
return self._get_repo().get_selected_repo(user_id)
[תיעוד]
def save_user(self, user_id: int, username: str = None) -> bool:
return self._get_repo().save_user(user_id, username)
# Google Drive tokens & preferences
[תיעוד]
def save_drive_tokens(self, user_id: int, token_data: Dict[str, Any]) -> bool:
return self._get_repo().save_drive_tokens(user_id, token_data)
[תיעוד]
def get_drive_tokens(self, user_id: int) -> Optional[Dict[str, Any]]:
return self._get_repo().get_drive_tokens(user_id)
[תיעוד]
def delete_drive_tokens(self, user_id: int) -> bool:
return self._get_repo().delete_drive_tokens(user_id)
[תיעוד]
def save_drive_prefs(self, user_id: int, prefs: Dict[str, Any]) -> bool:
return self._get_repo().save_drive_prefs(user_id, prefs)
[תיעוד]
def get_drive_prefs(self, user_id: int) -> Optional[Dict[str, Any]]:
return self._get_repo().get_drive_prefs(user_id)