"""Handlers עבור מסמכים וקבצים הנשלחים לבוט."""
from __future__ import annotations
import base64
import json
import logging
import os
import re
import tempfile
import time
import zipfile
from datetime import datetime, timezone
from io import BytesIO
from typing import Any, Awaitable, Callable, Iterable, List, Optional, Protocol, Sequence
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.constants import ParseMode
from telegram.ext import ContextTypes
# שימוש ב-FilesFacade דרך Composition Root כדי להימנע מתלות ישירה ב-DB
from typing import Optional as _Optional # local alias to avoid collision
from file_manager import backup_manager
from html import escape as html_escape
logger = logging.getLogger(__name__)
class _ReporterProto(Protocol):
"""Protocol for activity reporter."""
def report_activity(self, user_id: int) -> None: ...
class _MetricProto(Protocol):
"""Protocol for Prometheus Counter metric."""
def labels(self, **labelkwargs) -> _MetricProto: ...
def inc(self, amount: float = 1) -> None: ...
[תיעוד]
class DocumentHandler:
"""אחראי על טיפול בכל המסלולים של קבצים שמגיעים לבוט."""
[תיעוד]
def __init__(
self,
notify_admins: Callable[[ContextTypes.DEFAULT_TYPE, str], Awaitable[None]],
get_reporter: Callable[[], Optional[_ReporterProto]],
log_user_activity: Callable[[Update, ContextTypes.DEFAULT_TYPE], Awaitable[None]],
encodings_to_try: Sequence[str] | Callable[[], Sequence[str]],
emit_event: Callable[..., object] | None,
errors_total: Optional[_MetricProto],
) -> None:
self._notify_admins = notify_admins
self._get_reporter = get_reporter
self._log_user_activity = log_user_activity
self._encodings_provider: Callable[[], Sequence[str]] | None = (
encodings_to_try if callable(encodings_to_try) else None
)
if callable(encodings_to_try):
try:
initial_encodings: Sequence[str] = encodings_to_try()
except Exception as exc:
logger.debug("Dynamic encoding provider failed during init: %s", exc)
initial_encodings = ()
else:
initial_encodings = encodings_to_try
self._encodings_to_try = self._normalize_encodings(initial_encodings)
if not self._encodings_to_try:
# מנע מצב שבו אין קידודים בכלל
self._encodings_to_try = ("utf-8",)
self._last_encodings_attempted = self._encodings_to_try
self._emit_event = emit_event
self._errors_total = errors_total
self._files_facade: Optional[Any] = None
self._files_facade_initialized = False
@staticmethod
def _normalize_encodings(values: Sequence[str] | Iterable[str]) -> tuple[str, ...]:
cleaned: list[str] = []
for value in values:
if not value:
continue
try:
text = str(value).strip()
except Exception:
continue
if not text:
continue
cleaned.append(text)
return tuple(cleaned)
def _current_encodings(self) -> tuple[str, ...]:
provider = self._encodings_provider
if provider is not None:
try:
resolved = self._normalize_encodings(provider())
except Exception as exc:
logger.debug("Dynamic encoding provider failed: %s", exc)
else:
if resolved:
self._encodings_to_try = resolved
return resolved
return self._encodings_to_try
def _resolve_files_facade(self) -> Optional[Any]:
if self._files_facade_initialized:
return self._files_facade
self._files_facade_initialized = True
try:
from src.infrastructure.composition import get_files_facade # type: ignore
except Exception as exc:
logger.debug("Files facade unavailable: %s", exc)
self._files_facade = None
return None
try:
self._files_facade = get_files_facade()
except Exception as exc:
logger.warning("Failed to initialize files facade: %s", exc)
self._files_facade = None
return self._files_facade
def _save_code_snippet(
self,
*,
user_id: int,
file_name: str,
language: str,
content: str,
description: str = "",
tags: Optional[List[str]] = None,
) -> bool:
if self._save_code_snippet_via_facade(
user_id=user_id,
file_name=file_name,
language=language,
content=content,
description=description,
tags=tags,
):
return True
return False
def _save_code_snippet_via_facade(
self,
*,
user_id: int,
file_name: str,
language: str,
content: str,
description: str = "",
tags: Optional[List[str]] = None,
) -> bool:
facade = self._resolve_files_facade()
if facade is None:
return False
try:
return bool(
facade.save_code_snippet( # type: ignore[attr-defined]
user_id=user_id,
file_name=file_name,
code=content,
programming_language=language,
description=description,
tags=tags,
)
)
except Exception as exc:
logger.warning("FilesFacade save_code_snippet failed: %s", exc)
return False
def _save_large_file(
self,
*,
user_id: int,
file_name: str,
language: str,
content: str,
file_size: int,
lines_count: int,
) -> bool:
if self._save_large_file_via_facade(
user_id=user_id,
file_name=file_name,
language=language,
content=content,
file_size=file_size,
lines_count=lines_count,
):
return True
return False
def _save_large_file_via_facade(
self,
*,
user_id: int,
file_name: str,
language: str,
content: str,
file_size: int,
lines_count: int,
) -> bool:
facade = self._resolve_files_facade()
if facade is None:
return False
try:
return bool(
facade.save_large_file( # type: ignore[attr-defined]
user_id=user_id,
file_name=file_name,
content=content,
programming_language=language,
file_size=file_size,
lines_count=lines_count,
)
)
except Exception as exc:
logger.warning("FilesFacade save_large_file failed: %s", exc)
return False
def _get_latest_version_entry(self, user_id: int, file_name: str) -> Optional[dict]:
return self._get_latest_version_via_facade(user_id, file_name)
def _get_large_file_entry(self, user_id: int, file_name: str) -> Optional[dict]:
return self._get_large_file_via_facade(user_id, file_name)
def _get_latest_version_via_facade(self, user_id: int, file_name: str) -> Optional[dict]:
facade = self._resolve_files_facade()
if facade is None:
return None
try:
doc = facade.get_latest_version(user_id, file_name) # type: ignore[attr-defined]
return doc or None
except Exception:
return None
def _get_large_file_via_facade(self, user_id: int, file_name: str) -> Optional[dict]:
facade = self._resolve_files_facade()
if facade is None:
return None
try:
doc = facade.get_large_file(user_id, file_name) # type: ignore[attr-defined]
return doc or None
except Exception:
return None
def _save_selected_repo(self, user_id: int, repo_full: str) -> bool:
return self._save_selected_repo_via_facade(user_id, repo_full)
def _save_selected_repo_via_facade(self, user_id: int, repo_full: str) -> bool:
facade = self._resolve_files_facade()
if facade is None:
return False
try:
logger.debug("Trying to save selected repo via FilesFacade for %s", repo_full)
return bool(facade.save_selected_repo(user_id, repo_full)) # type: ignore[attr-defined]
except Exception as exc:
logger.warning("FilesFacade save_selected_repo failed: %s", exc)
return False
[תיעוד]
async def handle_document(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""נתיב ראשי לטיפול בקובץ שנשלח."""
logger.info("DEBUG: upload_mode = %s", context.user_data.get("upload_mode"))
logger.info("DEBUG: waiting_for_github_upload = %s", context.user_data.get("waiting_for_github_upload"))
upload_mode = context.user_data.get("upload_mode")
if upload_mode == "github_restore_zip_to_repo":
await self._handle_github_restore_zip_to_repo(update, context)
return
if upload_mode == "github_create_repo_from_zip":
await self._handle_github_create_repo_from_zip(update, context)
return
if context.user_data.get("waiting_for_github_upload") or upload_mode == "github":
await self._handle_github_direct_upload(update, context)
return
if upload_mode == "zip_import":
await self._handle_zip_import(update, context)
return
if upload_mode == "zip_create":
await self._handle_zip_create(update, context)
return
await self._log_user_activity(update, context)
await self._handle_textual_file(update, context)
async def _handle_github_restore_zip_to_repo(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
document = update.message.document
user_id = update.effective_user.id
logger.info(
"GitHub restore-to-repo ZIP received: file_name=%s, size=%s",
document.file_name,
document.file_size,
)
await update.message.reply_text("⏳ מוריד קובץ ZIP...")
file = await context.bot.get_file(document.file_id)
buf = BytesIO()
try:
await file.download_to_memory(buf)
buf.seek(0)
if not zipfile.is_zipfile(buf):
await update.message.reply_text("❌ הקובץ שהועלה אינו ZIP תקין.")
return
# חשוב: סגירה דטרמיניסטית של ה-ZIP כדי להימנע מ-Unraisable Exception בזמן GC
with zipfile.ZipFile(buf, "r") as zf:
all_names = [n for n in zf.namelist() if not n.endswith("/")]
members = [
n
for n in all_names
if not (n.startswith("__MACOSX/") or n.split("/")[-1].startswith("._"))
]
top_levels = set()
for name in zf.namelist():
if "/" in name and not name.startswith("__MACOSX/"):
top_levels.add(name.split("/", 1)[0])
common_root = list(top_levels)[0] if len(top_levels) == 1 else None
def strip_root(path: str) -> str:
if common_root and path.startswith(common_root + "/"):
return path[len(common_root) + 1 :]
return path
files: List[tuple[str, bytes]] = []
for name in members:
raw = zf.read(name)
clean = strip_root(name)
if clean:
files.append((clean, raw))
if not files:
await update.message.reply_text("❌ לא נמצאו קבצים בתוך ה-ZIP")
return
finally:
try:
buf.close()
except Exception:
pass
from github import Github
from github.InputGitTreeElement import InputGitTreeElement
github_handler = context.bot_data.get("github_handler")
session = github_handler.get_user_session(user_id)
token = github_handler.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
await update.message.reply_text("❌ אין טוקן או ריפו נבחר")
return
expected_repo_full = context.user_data.get("zip_restore_expected_repo_full")
repo_full_effective = expected_repo_full or repo_full
if expected_repo_full and expected_repo_full != repo_full:
logger.warning(
"[restore_zip] Target mismatch: expected=%s, got=%s. Proceeding with expected (locked) target.",
expected_repo_full,
repo_full,
)
try:
await update.message.reply_text(
f"⚠️ נמצא פער בין היעד הנוכחי ({repo_full}) ליעד הנעול. נשתמש ביעד הנעול: {expected_repo_full}"
)
except Exception:
pass
if not expected_repo_full:
try:
context.user_data["zip_restore_expected_repo_full"] = repo_full
except Exception:
pass
g = Github(token)
try:
repo = g.get_repo(repo_full_effective)
except Exception as err:
logger.exception("[restore_zip] Locked target not accessible: %s", err)
fallback_used = False
if repo_full and repo_full != repo_full_effective:
try:
expected_owner = (expected_repo_full or repo_full_effective).split("/")[0]
current_owner = repo_full.split("/")[0]
except Exception:
expected_owner = None
current_owner = None
if expected_owner and current_owner and current_owner == expected_owner:
try:
await update.message.reply_text(
f"⚠️ היעד הנעול {repo_full_effective} לא נגיש. מנסה להשתמש ביעד הנוכחי {repo_full} (אותו בעלים)."
)
except Exception:
pass
try:
repo = g.get_repo(repo_full)
repo_full_effective = repo_full
fallback_used = True
except Exception as err2:
logger.exception("[restore_zip] Fallback to current repo failed: %s", err2)
if "repo" not in locals():
await update.message.reply_text(
f"❌ היעד {repo_full_effective} לא נגיש ואין נפילה בטוחה. עצירה. אנא בחרו ריפו מחדש."
)
raise
target_branch = repo.default_branch or "main"
purge_first = bool(context.user_data.get("github_restore_zip_purge"))
await update.message.reply_text(
("🧹 מנקה קבצים קיימים...\n" if purge_first else "")
+ f"📤 מעלה {len(files)} קבצים לריפו {repo_full_effective} (branch: {target_branch})..."
)
base_ref = repo.get_git_ref(f"heads/{target_branch}")
base_commit = repo.get_git_commit(base_ref.object.sha)
base_tree = base_commit.tree
new_tree_elements: List[InputGitTreeElement] = []
text_exts = (
".md",
".txt",
".json",
".yml",
".yaml",
".xml",
".py",
".js",
".ts",
".tsx",
".css",
".scss",
".html",
".sh",
".gitignore",
)
for path, raw in files:
is_text = path.lower().endswith(text_exts)
try:
if is_text:
text = raw.decode("utf-8")
blob = repo.create_git_blob(text, "utf-8")
else:
b64 = base64.b64encode(raw).decode("ascii")
blob = repo.create_git_blob(b64, "base64")
except Exception:
b64 = base64.b64encode(raw).decode("ascii")
blob = repo.create_git_blob(b64, "base64")
elem = InputGitTreeElement(path=path, mode="100644", type="blob", sha=blob.sha)
new_tree_elements.append(elem)
if purge_first:
new_tree = repo.create_git_tree(new_tree_elements)
else:
new_tree = repo.create_git_tree(new_tree_elements, base_tree)
commit_message = "Restore from ZIP via bot: replace " + ("with purge" if purge_first else "update only")
new_commit = repo.create_git_commit(commit_message, new_tree, [base_commit])
base_ref.edit(new_commit.sha)
logger.info(
"[restore_zip] Restore commit created: %s, files_added=%s, purge=%s",
new_commit.sha,
len(new_tree_elements),
purge_first,
)
await update.message.reply_text("✅ השחזור הועלה לריפו בהצלחה")
except Exception as err:
logger.exception("GitHub restore-to-repo failed: %s", err)
await update.message.reply_text(f"❌ שגיאה בשחזור לריפו: {err}")
await self._maybe_alert_oom(context, err, "בשחזור ZIP לריפו")
finally:
context.user_data["upload_mode"] = None
context.user_data.pop("github_restore_zip_purge", None)
context.user_data.pop("zip_restore_expected_repo_full", None)
async def _handle_github_create_repo_from_zip(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
document = update.message.document
user_id = update.effective_user.id
logger.info(
"GitHub create-repo-from-zip received: file_name=%s, size=%s",
document.file_name,
document.file_size,
)
await update.message.reply_text("⏳ מוריד קובץ ZIP...")
tg_file = await context.bot.get_file(document.file_id)
buf = BytesIO()
await tg_file.download_to_memory(buf)
buf.seek(0)
if not zipfile.is_zipfile(buf):
await update.message.reply_text("❌ הקובץ שהועלה אינו ZIP תקין.")
return
with zipfile.ZipFile(buf, "r") as zf:
names_all = zf.namelist()
file_names = [
n
for n in names_all
if not n.endswith("/")
and not n.startswith("__MACOSX/")
and not n.split("/")[-1].startswith("._")
]
if not file_names:
await update.message.reply_text("❌ ה‑ZIP ריק.")
return
top_levels = set()
for name in names_all:
if "/" in name and not name.startswith("__MACOSX/"):
top_levels.add(name.split("/", 1)[0])
common_root = list(top_levels)[0] if len(top_levels) == 1 else None
repo_name = context.user_data.get("new_repo_name")
if not repo_name:
base_guess = None
if common_root:
base_guess = common_root
elif document.file_name:
base_guess = os.path.splitext(os.path.basename(document.file_name))[0]
if not base_guess:
base_guess = f"repo-{int(time.time())}"
repo_name = re.sub(r"\s+", "-", base_guess)
repo_name = re.sub(r"[^A-Za-z0-9._-]", "-", repo_name).strip(".-_") or f"repo-{int(time.time())}"
github_handler = context.bot_data.get("github_handler")
token = github_handler.get_user_token(user_id) if github_handler else None
if not token:
await update.message.reply_text("❌ אין טוקן GitHub. שלח /github כדי להתחבר.")
return
await update.message.reply_text(
f"📦 יוצר ריפו חדש: <code>{repo_name}</code>", parse_mode=ParseMode.HTML
)
from github import Github
g = Github(token)
user = g.get_user()
repo = user.create_repo(
name=repo_name,
private=bool(context.user_data.get("new_repo_private", True)),
auto_init=False,
)
repo_full = repo.full_name
# עדכן סשן בזיכרון תמיד (הריפו נוצר בגיטהאב בפועל),
# ובצע ניקוי מצבים תלויים כמו _apply_repo_selection
try:
sess = github_handler.get_user_session(user_id)
sess["selected_repo"] = repo_full
sess["selected_folder"] = None
except Exception as err:
logger.warning("Failed updating github session after repo creation: %s", err)
# נקה מצבים ישנים ב-context כדי שפעולות הבאות ישתמשו בריפו החדש
for _key in (
"upload_target_folder", "upload_target_branch",
"waiting_for_manual_repo", "zip_restore_expected_repo_full",
"github_restore_zip_purge", "pending_repo_restore_zip_path",
"repos", "repos_cache_time",
):
context.user_data.pop(_key, None)
# שמור למסד נתונים (ריפו + איפוס תיקיית יעד)
# _save_selected_repo בולע exceptions ומחזיר False בכישלון,
# לכן בודקים את ערך ההחזרה ולא מסתמכים על try/except
if self._save_selected_repo(user_id, repo_full):
try:
facade = self._resolve_files_facade()
if facade is not None and hasattr(facade, "save_selected_folder"):
facade.save_selected_folder(user_id, None)
except Exception as err:
logger.warning("Failed saving selected folder to DB: %s", err)
else:
logger.warning("Failed saving selected repo to DB for user %s", user_id)
await update.message.reply_text("📤 מעלה את קבצי ה‑ZIP לריפו החדש...")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
names_all = zf.namelist()
members = [
n
for n in names_all
if not n.endswith("/")
and not n.startswith("__MACOSX/")
and not n.split("/")[-1].startswith("._")
]
top_levels = set()
for name in names_all:
if "/" in name and not name.startswith("__MACOSX/"):
top_levels.add(name.split("/", 1)[0])
common_root = list(top_levels)[0] if len(top_levels) == 1 else None
def strip_root(path: str) -> str:
if common_root and path.startswith(common_root + "/"):
return path[len(common_root) + 1 :]
return path
files = []
for name in members:
data = zf.read(name)
clean = strip_root(name)
if clean:
files.append((clean, data))
from github.GithubException import GithubException
target_branch = repo.default_branch or "main"
base_ref = None
base_commit = None
base_tree = None
try:
base_ref = repo.get_git_ref(f"heads/{target_branch}")
base_commit = repo.get_git_commit(base_ref.object.sha)
base_tree = base_commit.tree
except GithubException as exc:
logger.info("No base ref found for new repo (expected for empty repo): %s", exc)
if base_commit is None:
created_count = 0
for path, raw in files:
try:
try:
text = raw.decode("utf-8")
repo.create_file(
path=path,
message="Initial import from ZIP via bot",
content=text,
branch=target_branch,
)
except UnicodeDecodeError:
repo.create_file(
path=path,
message="Initial import from ZIP via bot (binary)",
content=raw,
branch=target_branch,
)
created_count += 1
except Exception as err:
logger.warning("[create_repo_from_zip] Failed to create file %s: %s", path, err)
await update.message.reply_text(
f"✅ נוצר ריפו חדש והוזנו {created_count} קבצים\n🔗 <a href=\"https://github.com/{repo_full}\">{repo_full}</a>",
parse_mode=ParseMode.HTML,
)
return
from github.InputGitTreeElement import InputGitTreeElement
text_exts = (
".md",
".txt",
".json",
".yml",
".yaml",
".xml",
".py",
".js",
".ts",
".tsx",
".css",
".scss",
".html",
".sh",
".gitignore",
)
new_tree_elems: List[InputGitTreeElement] = []
for path, raw in files:
try:
if path.lower().endswith(text_exts):
blob = repo.create_git_blob(raw.decode("utf-8"), "utf-8")
else:
blob = repo.create_git_blob(base64.b64encode(raw).decode("ascii"), "base64")
except Exception:
blob = repo.create_git_blob(base64.b64encode(raw).decode("ascii"), "base64")
new_tree_elems.append(InputGitTreeElement(path=path, mode="100644", type="blob", sha=blob.sha))
new_tree = repo.create_git_tree(new_tree_elems, base_tree)
commit_message = "Initial import from ZIP via bot"
parents = [base_commit]
new_commit = repo.create_git_commit(commit_message, new_tree, parents)
base_ref.edit(new_commit.sha)
await update.message.reply_text(
f"✅ נוצר ריפו חדש והוזנו {len(new_tree_elems)} קבצים\n🔗 <a href=\"https://github.com/{repo_full}\">{repo_full}</a>",
parse_mode=ParseMode.HTML,
)
except Exception as err:
logger.exception("Create new repo from ZIP failed: %s", err)
await update.message.reply_text(f"❌ שגיאה ביצירת ריפו מ‑ZIP: {err}")
await self._maybe_alert_oom(context, err, "ביצירת ריפו מ‑ZIP")
finally:
context.user_data["upload_mode"] = None
for key in ("new_repo_name", "new_repo_private"):
context.user_data.pop(key, None)
async def _handle_github_direct_upload(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
github_handler = context.bot_data.get("github_handler")
if github_handler:
await github_handler.handle_file_upload(update, context)
async def _handle_zip_import(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
document = update.message.document
user_id = update.effective_user.id
logger.info(
"ZIP import received: file_name=%s, mime_type=%s, size=%s",
document.file_name,
document.mime_type,
document.file_size,
)
await update.message.reply_text("⏳ מוריד קובץ ZIP...")
file = await context.bot.get_file(document.file_id)
buf = BytesIO()
await file.download_to_memory(buf)
buf.seek(0)
tmp_dir = tempfile.gettempdir()
safe_name = (document.file_name or "repo.zip")
if not safe_name.lower().endswith(".zip"):
safe_name += ".zip"
tmp_path = os.path.join(tmp_dir, safe_name)
with open(tmp_path, "wb") as fh:
fh.write(buf.getvalue())
if not zipfile.is_zipfile(tmp_path):
logger.warning("Uploaded file is not a valid ZIP: %s", tmp_path)
await update.message.reply_text("❌ הקובץ שהועלה אינו ZIP תקין.")
return
repo_tag: List[str] = []
try:
with zipfile.ZipFile(tmp_path, "r") as zf:
md = json.loads(zf.read("metadata.json"))
if md.get("repo"):
repo_tag = [f"repo:{md['repo']}"]
except Exception:
repo_tag = []
if not repo_tag:
try:
def _parse_repo_full_from_label(label: str) -> str:
if not isinstance(label, str) or not label:
return ""
base = label.strip().strip("/").strip()
base = re.sub(r"\.zip$", "", base, flags=re.IGNORECASE)
parts = base.split("-") if "-" in base else [base]
if len(parts) < 2:
return ""
owner = parts[0]
tail = parts[1:]
while tail:
last = tail[-1]
is_sha = bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", last))
is_branch_hint = last.lower() in {"main", "master", "develop", "dev", "release"}
if is_sha or is_branch_hint:
tail = tail[:-1]
else:
break
if not tail:
return ""
repo_name = "-".join(tail)
if not owner or not repo_name:
return ""
return f"{owner}/{repo_name}"
guessed_full = ""
with zipfile.ZipFile(tmp_path, "r") as zf:
all_names = zf.namelist()
top_levels = {
n.split("/", 1)[0]
for n in all_names
if "/" in n and not n.startswith("__MACOSX/")
}
common_root = list(top_levels)[0] if len(top_levels) == 1 else None
if common_root:
guessed_full = _parse_repo_full_from_label(common_root)
if not guessed_full and safe_name:
name_wo_ext = os.path.splitext(os.path.basename(safe_name))[0]
guessed_full = _parse_repo_full_from_label(name_wo_ext)
if guessed_full:
repo_tag = [f"repo:{guessed_full}"]
except Exception:
repo_tag = []
results = backup_manager.restore_from_backup(
user_id=user_id,
backup_path=tmp_path,
overwrite=True,
purge=False,
extra_tags=repo_tag,
)
restored = results.get("restored_files", 0)
errors = results.get("errors", [])
if errors:
preview = "\n".join([str(err) for err in errors[:3]])
msg = (
f"⚠️ הייבוא הושלם חלקית: {restored} קבצים נשמרו\n"
f"שגיאות: {len(errors)}\n"
f"דוגמאות:\n{preview}"
)
else:
msg = f"✅ יובאו {restored} קבצים בהצלחה"
await update.message.reply_text(msg)
except Exception as err:
logger.exception("ZIP import failed: %s", err)
await update.message.reply_text(f"❌ שגיאה בייבוא ZIP: {err}")
finally:
context.user_data["upload_mode"] = None
async def _handle_zip_create(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
document = update.message.document
user_id = update.effective_user.id
logger.info(
"ZIP create mode: received file for bundle: %s (%s bytes)",
document.file_name,
document.file_size,
)
file = await context.bot.get_file(document.file_id)
buf = BytesIO()
await file.download_to_memory(buf)
raw = buf.getvalue()
items = context.user_data.get("zip_create_items")
if items is None:
items = []
context.user_data["zip_create_items"] = items
safe_name = (document.file_name or f"file_{len(items)+1}").strip() or f"file_{len(items)+1}"
items.append({"filename": safe_name, "bytes": raw})
await update.message.reply_text(
f"✅ נוסף: <code>{html_escape(safe_name)}</code> (סה""כ {len(items)} קבצים)",
parse_mode=ParseMode.HTML,
)
except Exception as err:
logger.exception("zip_create collect failed: %s", err)
await update.message.reply_text(f"❌ שגיאה בהוספת הקובץ ל‑ZIP: {err}")
async def _handle_textual_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
document = update.message.document
user_id = update.effective_user.id
if document.file_size > 20 * 1024 * 1024:
await update.message.reply_text(
"❌ הקובץ גדול מדי!\n📏 הגודל המקסימלי המותר הוא 20MB"
)
return
await update.message.reply_text("⏳ מוריד את הקובץ...")
file = await context.bot.get_file(document.file_id)
file_bytes = BytesIO()
await file.download_to_memory(file_bytes)
file_bytes.seek(0)
raw_bytes = file_bytes.read()
handled_zip = await self._maybe_store_zip_copy(update, context, document, raw_bytes)
if handled_zip:
return
content, detected_encoding = self._decode_bytes(raw_bytes)
if content is None:
attempted = getattr(self, "_last_encodings_attempted", self._current_encodings())
attempted_display = [str(enc) for enc in attempted if enc]
logger.error("❌ לא ניתן לקרוא את הקובץ באף קידוד: %s", attempted_display)
if self._emit_event is not None:
try:
self._emit_event(
"file_read_unreadable",
severity="error",
attempted_encodings=",".join(attempted_display),
)
except Exception:
pass
if self._errors_total is not None:
try:
self._errors_total.labels(code="E_FILE_UNREADABLE").inc()
except Exception:
pass
await update.message.reply_text(
"❌ לא ניתן לקרוא את הקובץ!\n"
+ f"📝 ניסיתי את הקידודים: {', '.join(attempted_display)}\n"
+ "💡 אנא ודא שזהו קובץ טקסט/קוד ולא קובץ בינארי"
)
return
file_name = document.file_name or "untitled.txt"
# זיהוי שפה חייב לקבל גם את התוכן (למשל block.md עם קוד Python מובהק)
try:
from services import code_service # type: ignore
language = code_service.detect_language(content or "", file_name)
except Exception:
from utils import detect_language_from_filename
language = detect_language_from_filename(file_name)
if len(content) > 4096:
await self._store_large_file(update, context, user_id, file_name, language, content, detected_encoding)
else:
await self._store_regular_file(update, context, user_id, file_name, language, content, detected_encoding)
reporter = self._get_reporter()
if reporter is not None:
try:
reporter.report_activity(user_id)
except Exception:
pass
except Exception as err:
logger.error("שגיאה בטיפול בקובץ: %s", err)
if self._emit_event is not None:
try:
self._emit_event("file_process_error", severity="error", error=str(err))
except Exception:
pass
if self._errors_total is not None:
try:
self._errors_total.labels(code="E_FILE_PROCESS").inc()
except Exception:
pass
await update.message.reply_text("❌ שגיאה בעיבוד הקובץ")
def _decode_bytes(self, raw_bytes: bytes) -> tuple[Optional[str], Optional[str]]:
encodings = self._current_encodings()
self._last_encodings_attempted = encodings
for encoding in encodings:
try:
content = raw_bytes.decode(encoding)
except UnicodeDecodeError:
continue
else:
logger.info("✅ הקובץ נקרא בהצלחה בקידוד: %s", encoding)
if self._emit_event is not None:
try:
self._emit_event("file_read_success", severity="info", encoding=str(encoding))
except Exception:
pass
return content, encoding
return None, None
async def _maybe_store_zip_copy(
self,
update: Update,
context: ContextTypes.DEFAULT_TYPE,
document,
raw_bytes: bytes,
) -> bool:
try:
is_zip_hint = ((document.mime_type or "").lower() == "application/zip") or (
(document.file_name or "").lower().endswith(".zip")
)
is_zip_actual = False
try:
is_zip_actual = zipfile.is_zipfile(BytesIO(raw_bytes))
except Exception:
is_zip_actual = False
if not (is_zip_hint and is_zip_actual):
return False
backup_id = f"upload_{update.effective_user.id}_{int(datetime.now(timezone.utc).timestamp())}"
target_path = backup_manager.backup_dir / f"{backup_id}.zip"
try:
try:
with zipfile.ZipFile(BytesIO(raw_bytes), "r") as ztest:
try:
ztest.getinfo("metadata.json")
md_bytes = raw_bytes
except KeyError:
md = {
"backup_id": backup_id,
"backup_type": "generic_zip",
"user_id": update.effective_user.id,
"created_at": datetime.now(timezone.utc).isoformat(),
"original_filename": document.file_name,
"source": "uploaded_document",
}
out_buf = BytesIO()
with zipfile.ZipFile(out_buf, "w", compression=zipfile.ZIP_DEFLATED) as zout:
for name in ztest.namelist():
zout.writestr(name, ztest.read(name))
zout.writestr("metadata.json", json.dumps(md, indent=2))
md_bytes = out_buf.getvalue()
except Exception:
md_bytes = raw_bytes
try:
backup_manager.save_backup_bytes(
md_bytes,
{
"backup_id": backup_id,
"backup_type": "generic_zip",
"user_id": update.effective_user.id,
"created_at": datetime.now(timezone.utc).isoformat(),
"original_filename": document.file_name,
"source": "uploaded_document",
},
)
except Exception:
with open(target_path, "wb") as fzip:
fzip.write(md_bytes)
await update.message.reply_text(
"✅ קובץ ZIP נשמר בהצלחה לרשימת ה‑ZIP השמורים.\n"
"📦 ניתן למצוא אותו תחת: '📚' > '📦 קבצי ZIP' או ב‑Batch/GitHub."
)
return True
except Exception as err:
logger.warning("Failed to persist uploaded ZIP: %s", err)
except Exception:
pass
return False
async def _store_large_file(
self,
update: Update,
context: ContextTypes.DEFAULT_TYPE,
user_id: int,
file_name: str,
language: str,
content: str,
detected_encoding: Optional[str],
) -> None:
size_bytes = len(content.encode("utf-8"))
lines_count = len(content.split("\n"))
success = self._save_large_file(
user_id=user_id,
file_name=file_name,
language=language,
content=content,
file_size=size_bytes,
lines_count=lines_count,
)
if self._emit_event is not None:
try:
self._emit_event(
"file_saved",
severity="info",
user_id=int(user_id),
language=str(language),
size_bytes=int(size_bytes),
large=True,
)
except Exception:
pass
if not success:
await update.message.reply_text("❌ שגיאה בשמירת הקובץ")
return
from utils import get_language_emoji
emoji = get_language_emoji(language)
try:
saved_large = self._get_large_file_entry(user_id, file_name) or {}
fid = str(saved_large.get("_id") or "")
except Exception:
fid = ""
keyboard = [
[
InlineKeyboardButton(
"👁️ הצג קוד",
callback_data=f"view_direct_id:{fid}" if fid else f"view_direct_{file_name}",
),
InlineKeyboardButton("📚 הצג קבצים גדולים", callback_data="show_large_files"),
],
[
InlineKeyboardButton(
"🔗 שתף קוד",
callback_data=f"share_menu_id:{fid}" if fid else "share_menu_id:",
)
],
[InlineKeyboardButton("🏠 תפריט ראשי", callback_data="main")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"✅ **הקובץ נשמר בהצלחה!**\n\n"
+ f"📄 **שם:** `{file_name}`\n"
+ f"{emoji} **שפה:** {language}\n"
+ f"🔤 **קידוד:** {detected_encoding}\n"
+ f"💾 **גודל:** {len(content):,} תווים\n"
+ f"📏 **שורות:** {lines_count:,}\n\n"
+ "🎮 בחר פעולה מהכפתורים החכמים:",
reply_markup=reply_markup,
parse_mode="Markdown",
)
try:
context.user_data["last_save_success"] = {
"file_name": file_name,
"language": language,
"note": "",
"file_id": fid,
}
except Exception:
pass
async def _store_regular_file(
self,
update: Update,
context: ContextTypes.DEFAULT_TYPE,
user_id: int,
file_name: str,
language: str,
content: str,
detected_encoding: Optional[str],
) -> None:
size_bytes = len(content.encode("utf-8"))
success = self._save_code_snippet(
user_id=user_id,
file_name=file_name,
language=language,
content=content,
)
if self._emit_event is not None:
try:
self._emit_event(
"file_saved",
severity="info",
user_id=int(user_id),
language=str(language),
size_bytes=int(size_bytes),
large=False,
)
except Exception:
pass
if not success:
await update.message.reply_text("❌ שגיאה בשמירת הקובץ")
return
from utils import get_language_emoji
emoji = get_language_emoji(language)
try:
saved_doc = self._get_latest_version_entry(user_id, file_name) or {}
fid = str(saved_doc.get("_id") or "")
except Exception:
fid = ""
keyboard = [
[
InlineKeyboardButton(
"👁️ הצג קוד",
callback_data=f"view_direct_id:{fid}" if fid else f"view_direct_{file_name}",
),
InlineKeyboardButton(
"✏️ ערוך",
callback_data=f"edit_code_direct_{file_name}",
),
],
[
InlineKeyboardButton(
"📥 הורד",
callback_data=f"download_direct_{file_name}",
),
InlineKeyboardButton(
"📚 היסטוריה",
callback_data=f"versions_file_{file_name}",
),
],
[
InlineKeyboardButton(
"🔗 שתף קוד",
callback_data=f"share_menu_id:{fid}" if fid else "share_menu_id:",
)
],
[InlineKeyboardButton("📚 הצג את כל הקבצים", callback_data="files")],
[InlineKeyboardButton("🏠 תפריט ראשי", callback_data="main")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"✅ **הקובץ נשמר בהצלחה!**\n\n"
+ f"📄 **שם:** `{file_name}`\n"
+ f"{emoji} **שפה:** {language}\n"
+ f"🔤 **קידוד:** {detected_encoding}\n"
+ f"💾 **גודל:** {len(content)} תווים\n\n"
+ "🎮 בחר פעולה מהכפתורים החכמים:",
reply_markup=reply_markup,
parse_mode="Markdown",
)
try:
context.user_data["last_save_success"] = {
"file_name": file_name,
"language": language,
"note": "",
"file_id": fid,
}
except Exception:
pass
async def _maybe_alert_oom(self, context: ContextTypes.DEFAULT_TYPE, err: Exception, suffix: str) -> None:
try:
msg = str(err)
if isinstance(err, MemoryError) or "Ran out of memory" in msg or "out of memory" in msg.lower():
try:
notifier = self._notify_admins
if notifier is not None:
await notifier(context, f"🚨 OOM {suffix}: {msg}")
except Exception:
pass
except Exception:
pass