# FIXED: Changed from Markdown to HTML parsing (2025-01-10)
# This fixes Telegram parsing errors with special characters in suggestions
import asyncio
import json
import logging
import os
import re
import time
import zipfile
from datetime import datetime, timezone
import tempfile
import shutil
from html import escape
from io import BytesIO
from typing import Any, Dict, Optional
from github import Github, GithubException
from github.InputGitTreeElement import InputGitTreeElement
from telegram import (
InlineKeyboardButton,
InlineKeyboardMarkup,
InlineQueryResultArticle,
InputTextMessageContent,
Update,
)
from telegram.error import BadRequest
from telegram.ext import (
ContextTypes,
ConversationHandler,
)
from repo_analyzer import RepoAnalyzer
from config import config
from file_manager import backup_manager
from utils import TelegramUtils
# הגדרת לוגר
logger = logging.getLogger(__name__)
# מצבי שיחה
REPO_SELECT, FILE_UPLOAD, FOLDER_SELECT = range(3)
# מגבלות קבצים גדולים
MAX_INLINE_FILE_BYTES = 5 * 1024 * 1024 # 5MB לשליחה ישירה בבוט
MAX_ZIP_TOTAL_BYTES = 50 * 1024 * 1024 # 50MB לקובץ ZIP אחד
MAX_ZIP_FILES = 500 # מקסימום קבצים ב-ZIP אחד
# מגבלות ייבוא ריפו (ייבוא תוכן, לא גיבוי)
IMPORT_MAX_FILE_BYTES = 1 * 1024 * 1024 # 1MB לקובץ יחיד
IMPORT_MAX_TOTAL_BYTES = 20 * 1024 * 1024 # 20MB לכל הייבוא
IMPORT_MAX_FILES = 2000 # הגבלה סבירה למספר קבצים
IMPORT_SKIP_DIRS = {".git", ".github", "__pycache__", "node_modules", "dist", "build"}
# מגבלות עזר לשליפת תאריכי ענפים למיון
MAX_BRANCH_DATE_FETCH = 120 # אם יש יותר מזה — נוותר על מיון לפי תאריך (למעט ברירת המחדל)
# תצוגת קובץ חלקית
VIEW_LINES_PER_PAGE = 80
def _safe_rmtree_tmp(target_path: str) -> None:
"""מחיקה בטוחה של תיקייה תחת /tmp בלבד, עם סורגי בטיחות.
יזרוק חריגה אם הנתיב אינו תחת /tmp או שגוי.
"""
try:
if not target_path:
return
rp_target = os.path.realpath(target_path)
rp_base = os.path.realpath("/tmp")
if not rp_target.startswith(rp_base + os.sep):
raise RuntimeError(f"Refusing to delete non-tmp path: {rp_target}")
if rp_target in {"/", os.path.expanduser("~"), os.getcwd()}:
raise RuntimeError(f"Refusing to delete unsafe path: {rp_target}")
shutil.rmtree(rp_target, ignore_errors=True)
except Exception:
# לא מפסיק את הזרימה במקרה של שגיאה בניקוי
pass
[תיעוד]
def safe_html_escape(text):
"""Escape text for Telegram HTML; preserves \n/\r/\t and keeps existing HTML entities."""
if text is None:
return ""
s = escape(str(text))
# נקה תווים בלתי נראים
s = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", s)
# נקה תווי בקרה אך השאר \n, \r, \t
s = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]", "", s)
return s
[תיעוד]
class GitHubMenuHandler:
[תיעוד]
def __init__(self):
self.user_sessions: Dict[int, Dict[str, Any]] = {}
self.last_api_call: Dict[int, float] = {}
[תיעוד]
def get_user_session(self, user_id: int) -> Dict[str, Any]:
"""מחזיר או יוצר סשן משתמש בזיכרון"""
if user_id not in self.user_sessions:
# נסה לטעון ריפו מועדף מהמסד
from database import db
selected_repo = db.get_selected_repo(user_id)
self.user_sessions[user_id] = {
"selected_repo": selected_repo, # טען מהמסד נתונים
"selected_folder": None, # None = root של הריפו
"github_token": None,
}
return self.user_sessions[user_id]
[תיעוד]
async def show_browse_ref_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""תפריט בחירת ref (ענף/תג) עם עימוד וטאבים."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
g = Github(token)
repo = g.get_repo(repo_full)
current_ref = context.user_data.get("browse_ref") or (getattr(repo, "default_branch", None) or "main")
tab = context.user_data.get("browse_ref_tab") or "branches"
kb = []
# טאבים
tabs = [
InlineKeyboardButton("🌿 ענפים", callback_data="browse_refs_branches_page_0"),
InlineKeyboardButton("🏷 תגיות", callback_data="browse_refs_tags_page_0"),
]
kb.append(tabs)
if tab == "branches":
page = int(context.user_data.get("browse_refs_branches_page", 0))
try:
items = list(repo.get_branches())
except Exception:
items = []
page_size = 10
start = page * page_size
end = min(start + page_size, len(items))
for br in items[start:end]:
label = "✅ " + br.name if br.name == current_ref else br.name
kb.append([InlineKeyboardButton(label, callback_data=f"browse_select_ref:{br.name}")])
# עימוד
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"browse_refs_branches_page_{page-1}"))
if end < len(items):
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"browse_refs_branches_page_{page+1}"))
if nav:
kb.append(nav)
else:
page = int(context.user_data.get("browse_refs_tags_page", 0))
try:
items = list(repo.get_tags())
except Exception:
items = []
page_size = 10
start = page * page_size
end = min(start + page_size, len(items))
for tg in items[start:end]:
name = getattr(tg, "name", "")
label = "✅ " + name if name == current_ref else name
kb.append([InlineKeyboardButton(label, callback_data=f"browse_select_ref:{name}")])
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"browse_refs_tags_page_{page-1}"))
if end < len(items):
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"browse_refs_tags_page_{page+1}"))
if nav:
kb.append(nav)
# תחתית
kb.append([InlineKeyboardButton("🔙 חזרה", callback_data="github_menu")])
await query.edit_message_text(
f"בחר/י ref לדפדוף (נוכחי: <code>{safe_html_escape(current_ref)}</code>)",
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
)
[תיעוד]
async def show_browse_search_results(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""חיפוש לפי שם קובץ (prefix/contains) עם עימוד ותוצאות לפתיחה."""
# שימוש ב-Contents API: אין חיפוש שמות ישיר; נשתמש ב-Search API code:in:path/name
query = update.callback_query if hasattr(update, "callback_query") else None
user_id = (query.from_user.id if query else update.message.from_user.id)
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
q = (context.user_data.get("browse_search_query") or "").strip()
page = int(context.user_data.get("browse_search_page", 1))
if not (token and repo_full and q):
if query:
await query.edit_message_text("❌ חסרים נתונים לחיפוש")
else:
await update.message.reply_text("❌ חסרים נתונים לחיפוש")
return
g = Github(token)
# הפורמט: repo:owner/name in:path <query>
try:
owner, name = repo_full.split("/", 1)
except ValueError:
owner, name = repo_full, ""
# בניית שאילתה: נחפש במחרוזת הנתיב בלבד (in:name לא נתמך ב-code search)
q_safe = (q or "").replace('"', ' ').strip()
term = f'"{q_safe}"' if (" " in q_safe) else q_safe
gh_query = f"repo:{owner}/{name} in:path {term}"
try:
# PyGithub מחזיר PaginatedList; נהפוך לרשימה בטוחה עם הגבלה כדי למנוע 403/timeout
results = list(g.search_code(query=gh_query, order="desc"))
except BadRequest as br:
# ננהל את טלגרם "message is not modified" בעדינות
if "message is not modified" in str(br).lower():
try:
await query.answer("אין שינוי בתוצאה")
except Exception:
pass
return
raise
except Exception as e:
try:
if hasattr(update, "callback_query") and update.callback_query:
await update.callback_query.answer(f"שגיאה בחיפוש: {str(e)}", show_alert=True)
else:
await update.message.reply_text(f"❌ שגיאה בחיפוש: {str(e)}")
except Exception:
pass
return
# עימוד ידני
per_page = 10
items = results # כבר רשימה
if not items:
msg = f"🔎 אין תוצאות עבור <code>{safe_html_escape(q)}</code> ב-<code>{safe_html_escape(repo_full)}</code>"
if query:
await query.edit_message_text(msg, parse_mode="HTML", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔙 חזרה", callback_data="github_menu")]]))
else:
await update.message.reply_text(msg, parse_mode="HTML")
return
total = len(items)
start = (page - 1) * per_page
end = min(start + per_page, total)
shown = items[start:end]
# אפס מיפוי אינדקסים למסך זה (ל-callback קצרים)
context.user_data["browse_idx_map"] = {}
# סימון מצב: תצוגת תוצאות חיפוש פעילה (לצורך חזרה אחורה מתצוגת קובץ)
context.user_data["last_results_were_search"] = True
kb = []
for it in shown:
try:
path = getattr(it, "path", None) or getattr(it, "name", "")
if not path:
continue
view_cb = self._mk_cb(context, "browse_select_view", path)
# כפתור יחיד: "path 👁️" לצפייה בקובץ
kb.append([
InlineKeyboardButton(f"{path} 👁️", callback_data=view_cb)
])
except Exception:
continue
nav = []
total_pages = max(1, (total + per_page - 1) // per_page)
if page > 1:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"browse_search_page:{page-1}"))
if page < total_pages:
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"browse_search_page:{page+1}"))
if nav:
kb.append(nav)
kb.append([InlineKeyboardButton("🔙 חזרה", callback_data="github_menu")])
text = f"🔎 תוצאות חיפוש עבור <code>{safe_html_escape(q)}</code> — מציג {len(shown)} מתוך {total}"
if query:
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
else:
await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
[תיעוד]
async def check_rate_limit(self, github_client: Github, update_or_query) -> bool:
"""בודק את מגבלת ה-API של GitHub"""
try:
rate_limit = github_client.get_rate_limit()
core_limit = rate_limit.core
if core_limit.remaining < 10:
reset_time = core_limit.reset
minutes_until_reset = max(1, int((reset_time - time.time()) / 60))
error_message = (
f"⏳ חריגה ממגבלת GitHub API\n"
f"נותרו רק {core_limit.remaining} בקשות\n"
f"המגבלה תתאפס בעוד {minutes_until_reset} דקות\n\n"
f"💡 נסה שוב מאוחר יותר"
)
# בדוק אם זה callback query או update רגיל
if hasattr(update_or_query, "answer"):
# זה callback query
await update_or_query.answer(error_message, show_alert=True)
else:
# זה update רגיל
await update_or_query.message.reply_text(error_message)
return False
return True
except Exception as e:
logger.error(f"Error checking rate limit: {e}")
return True # במקרה של שגיאה, נמשיך בכל זאת
[תיעוד]
async def apply_rate_limit_delay(self, user_id: int):
"""מוסיף השהייה בין בקשות API"""
current_time = time.time()
last_call = self.last_api_call.get(user_id, 0)
# אם עברו פחות מ-2 שניות מהבקשה האחרונה, נחכה
time_since_last = current_time - last_call
if time_since_last < 2:
await asyncio.sleep(2 - time_since_last)
self.last_api_call[user_id] = time.time()
[תיעוד]
def get_user_token(self, user_id: int) -> Optional[str]:
"""מקבל טוקן של משתמש - מהסשן או מהמסד נתונים"""
session = self.get_user_session(user_id)
# נסה מהסשן
token = session.get("github_token")
if token:
return token
# נסה מהמסד נתונים
from database import db
token = db.get_github_token(user_id)
if token:
# שמור בסשן לשימוש מהיר
session["github_token"] = token
return token
# --- Helpers to keep Telegram callback_data <= 64 bytes ---
def _mk_cb(self, context: ContextTypes.DEFAULT_TYPE, prefix: str, path: str) -> str:
"""יוצר callback_data בטוח. אם ארוך מדי, משתמש באינדקס זמני במפה ב-context.user_data."""
safe_path = path or ""
data = f"{prefix}:{safe_path}"
try:
if len(data.encode('utf-8')) <= 64:
return data
except Exception:
if len(data) <= 64:
return data
idx_map = context.user_data.get("browse_idx_map")
if not isinstance(idx_map, dict):
idx_map = {}
context.user_data["browse_idx_map"] = idx_map
idx = str(len(idx_map) + 1)
idx_map[idx] = safe_path
return f"{prefix}_i:{idx}"
def _get_path_from_cb(self, context: ContextTypes.DEFAULT_TYPE, data: str, prefix: str) -> str:
"""שחזור נתיב מתוך callback_data רגיל או ממופה (_i:)."""
try:
if data.startswith(prefix + ":"):
return data.split(":", 1)[1]
if data.startswith(prefix + "_i:"):
idx = data.split(":", 1)[1]
m = context.user_data.get("browse_idx_map") or {}
return m.get(idx, "")
except Exception:
return ""
return ""
async def _render_file_view(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""מציג דף תצוגה חלקית של קובץ עם כפתורי 'הצג עוד', 'הורד', 'חזרה'."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_name = session.get("selected_repo") or "repo"
path = context.user_data.get("view_file_path") or ""
data = context.user_data.get("view_file_text") or ""
page = int(context.user_data.get("view_page_index", 0))
# חישוב חלוקה לשורות
lines = data.splitlines()
total_lines = len(lines)
start = page * VIEW_LINES_PER_PAGE
end = min(start + VIEW_LINES_PER_PAGE, total_lines)
chunk = "\n".join(lines[start:end])
# טקסט לתצוגה + גודל ושפה מזוהה
size_bytes = int(context.user_data.get("view_file_size", 0) or 0)
lang = context.user_data.get("view_detected_language") or "text"
header = (
f"📄 תצוגת קובץ\n"
f"📁 <code>{safe_html_escape(repo_name)}</code>\n"
f"📄 <code>{safe_html_escape(path)}</code>\n"
f"🔤 שפה: <code>{safe_html_escape(lang)}</code> | 💾 גודל: <code>{format_bytes(size_bytes)}</code>\n"
f"שורות {start+1}-{end} מתוך {total_lines}\n\n"
)
# בניית מקלדת
rows = [[InlineKeyboardButton("🔙 חזרה", callback_data="view_back")],
[InlineKeyboardButton("⬇️ הורד", callback_data=self._mk_cb(context, "browse_select_download", path))]]
# כפתור שיתוף קישור לקובץ – רק במסך התצוגה (לא ברשימה)
rows.append([InlineKeyboardButton("🔗 שתף קישור", callback_data=self._mk_cb(context, "share_selected_links_single", path))])
if end < total_lines:
rows.append([InlineKeyboardButton("הצג עוד ⤵️", callback_data="view_more")])
try:
# הדגשת תחביר קיימת במודול code_processor.highlight_code; נשתמש בה ואז ננקה ל-Telegram
try:
from services import code_service as code_processor
# פורמט שמירת שורות כברירת מחדל
lower_path = (path or '').lower()
# אם YAML – נסה צביעה ישירה, אחרת כללי
if lower_path.endswith('.yml') or lower_path.endswith('.yaml'):
try:
highlighted_html = code_processor.highlight_code(chunk, 'yaml', 'html')
body = highlighted_html or f"<pre>{safe_html_escape(chunk)}</pre>"
except Exception:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
else:
# שמירת שורות בכוח עבור סוגי קבצים רגישים לעיצוב
force_pre_exts = ('.md', '.markdown', '.py')
if lower_path.endswith(force_pre_exts):
body = f"<pre>{safe_html_escape(chunk)}</pre>"
else:
# נסה היילייט; אם יוצרת בלגן, fallback ל-pre
try:
highlighted_html = code_processor.highlight_code(chunk, lang, 'html')
if not highlighted_html or '\n' not in chunk:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
else:
body = highlighted_html
except Exception:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
except Exception:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
await query.edit_message_text(header + body, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(rows))
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
[תיעוד]
async def show_import_branch_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג בחירת ענף לייבוא ריפו (עימוד)."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo") or ""
if not (token and repo_full):
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
# הודעת טעינה בזמן שליפת הענפים
try:
await TelegramUtils.safe_edit_message_text(query, "⏳ טוען רשימת ענפים…")
except Exception:
pass
g = Github(token)
try:
repo = g.get_repo(repo_full)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת ריפו: {e}")
return
try:
branches = list(repo.get_branches())
# מיין: main ראשון; אחריו לפי עדכון commit אחרון (חדש→ישן)
def _branch_sort_key(br):
try:
# commit.last_modified לא קיים תמיד; ניקח commit.commit.author.date
return br.commit.commit.author.date
except Exception:
return datetime.min.replace(tzinfo=timezone.utc)
# רשימת ענפים מלאה
if len(branches) <= MAX_BRANCH_DATE_FETCH:
try:
branches_sorted = sorted(branches, key=_branch_sort_key, reverse=True)
except Exception:
branches_sorted = branches
else:
branches_sorted = branches
# הוצא main לראש (אם קיים)
main_idx = next((i for i, b in enumerate(branches_sorted) if (b.name == 'main' or b.name == 'master')), None)
if main_idx is not None:
main_br = branches_sorted.pop(main_idx)
branches_sorted.insert(0, main_br)
branches = branches_sorted
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בשליפת ענפים: {e}")
return
page = int(context.user_data.get("import_branches_page", 0))
page_size = 8
total_pages = max(1, (len(branches) + page_size - 1) // page_size)
start = page * page_size
end = min(start + page_size, len(branches))
keyboard = []
# מיפוי אסימונים קצרים לשמות ענפים כדי לעמוד במגבלת 64 בתים של Telegram
token_map = context.user_data.setdefault("import_branch_token_map", {})
# תצוגה אחידה: main ראשון (כבר מוקפץ למעלה במיון) ואז כל הענפים – ממוינים מהחדש לישן
for idx, br in enumerate(branches[start:end]):
token = f"i{start + idx}"
token_map[token] = br.name
label = f"🌿 {br.name}"
keyboard.append([InlineKeyboardButton(label, callback_data=f"import_repo_select_branch:{token}")])
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"import_repo_branches_page_{page-1}"))
nav.append(InlineKeyboardButton(f"📄 {page+1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"import_repo_branches_page_{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="github_menu")])
await query.edit_message_text(
"⬇️ בחר/י ענף לייבוא קבצים מהריפו:", reply_markup=InlineKeyboardMarkup(keyboard)
)
async def _confirm_import_repo(self, update: Update, context: ContextTypes.DEFAULT_TYPE, branch: str):
"""מסך אישור לייבוא עם הסבר קצר כדי למנוע בלבול עם גיבויים."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_full = session.get("selected_repo") or ""
text = (
f"⬇️ ייבוא ריפו מ-GitHub\n\n"
f"זהו <b>ייבוא קבצים</b> ולא יצירת גיבוי ZIP.\n"
f"נוריד ZIP רשמי, נחלץ ל-/tmp, נקלט לקבצים במסד עם תגיות:\n"
f"<code>repo:{repo_full}</code>, <code>source:github</code>\n\n"
f"נכבד מגבלות גודל/כמות, נדלג על בינאריים ו-<code>.git</code> ותיקיות מיותרות.\n"
f"ענף: <code>{branch}</code>\n\n"
f"להמשיך?"
)
kb = [
[InlineKeyboardButton("✅ כן, ייבא", callback_data="import_repo_start")],
[InlineKeyboardButton("❌ ביטול", callback_data="import_repo_cancel")],
]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
[תיעוד]
async def import_repo_from_zip(self, update: Update, context: ContextTypes.DEFAULT_TYPE, repo_full: str, branch: str):
"""מוריד ZIP רשמי של GitHub (zipball) לענף, מחלץ ל-tmp, ומקליט קבצים ל-DB עם תגיות repo/source.
שמירה: CodeSnippet לקבצים טקסטואליים קטנים (עד IMPORT_MAX_FILE_BYTES) עד סך IMPORT_MAX_TOTAL_BYTES ומקס' IMPORT_MAX_FILES.
מדלג על בינאריים, קבצי ענק, ותיקיות מיותרות. מנקה tmp בסוף.
"""
query = update.callback_query
user_id = query.from_user.id
token = self.get_user_token(user_id)
if not token:
await query.edit_message_text("❌ חסר טוקן GitHub")
return
g = Github(token)
try:
repo = g.get_repo(repo_full)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת ריפו: {e}")
return
await query.edit_message_text("⏳ מוריד ZIP רשמי ומייבא קבצים… זה עשוי לקחת עד דקה.")
import requests
import zipfile as _zip
tmp_dir = None
zip_path = None
extracted_dir = None
saved = 0
updated = 0
total_bytes = 0
skipped = 0
try:
# קבלת קישור zipball עבור branch
try:
url = repo.get_archive_link("zipball", ref=branch)
except TypeError:
# גרסאות PyGithub ישנות לא מקבלות ref; ננסה ללא ref
url = repo.get_archive_link("zipball")
resp = requests.get(url, timeout=60)
resp.raise_for_status()
# עבודה ב-/tmp בלבד
tmp_dir = tempfile.mkdtemp(prefix="codebot-gh-import-")
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
# חליצה לתת-תיקייה ייעודית
extracted_dir = os.path.join(tmp_dir, "repo")
os.makedirs(extracted_dir, exist_ok=True)
with _zip.ZipFile(zip_path, 'r') as zf:
zf.extractall(extracted_dir)
# מצא שורש (github zip מוסיף תיקיית prefix)
# נבחר תיקייה הראשונה מתחת extracted_dir
roots = [os.path.join(extracted_dir, d) for d in os.listdir(extracted_dir)]
root = None
for p in roots:
if os.path.isdir(p):
root = p
break
if not root:
await query.edit_message_text("❌ לא נמצאו קבצים לאחר חליצה")
return
from database import db
from utils import detect_language_from_filename
repo_tag = f"repo:{repo_full}"
source_tag = "source:github"
# מעבר על קבצים
for cur_dir, dirnames, filenames in os.walk(root):
# סינון תיקיות מיותרות
dirnames[:] = [d for d in dirnames if d not in IMPORT_SKIP_DIRS]
for name in filenames:
# דלג על קבצי ZIP עצמם או קבצים מוסתרים ענקיים
if name.endswith('.zip'):
skipped += 1
continue
file_path = os.path.join(cur_dir, name)
rel_path = os.path.relpath(file_path, root)
# דלג על נתיבים חשודים
if rel_path.startswith('.'):
skipped += 1
continue
try:
# קרא כ-bytes ובדוק בינארי/גודל
with open(file_path, 'rb') as fh:
raw = fh.read(IMPORT_MAX_FILE_BYTES + 1)
if len(raw) > IMPORT_MAX_FILE_BYTES:
skipped += 1
continue
# heuristic: אם יש אפס-בייטים רבים → כנראה בינארי
if b"\x00" in raw:
skipped += 1
continue
try:
text = raw.decode('utf-8')
except Exception:
try:
text = raw.decode('latin-1')
except Exception:
skipped += 1
continue
if total_bytes + len(raw) > IMPORT_MAX_TOTAL_BYTES:
continue
if saved >= IMPORT_MAX_FILES:
continue
lang = detect_language_from_filename(rel_path)
# בדוק אם קיים כבר עבור אותו ריפו (לפי תגית repo:)
prev_doc = db.get_latest_version(user_id, rel_path)
prev_tags = (prev_doc.get('tags') or []) if isinstance(prev_doc, dict) else []
existed_for_repo = any((isinstance(t, str) and t == repo_tag) for t in prev_tags)
ok = db.save_file(user_id=user_id, file_name=rel_path, code=text, programming_language=lang, extra_tags=[repo_tag, source_tag])
if ok:
if existed_for_repo:
updated += 1
else:
saved += 1
total_bytes += len(raw)
else:
skipped += 1
except Exception:
skipped += 1
await query.edit_message_text(
f"✅ ייבוא הושלם: {saved} חדשים, {updated} עודכנו, {skipped} דילוגים.\n"
f"🔖 תיוג: <code>{repo_tag}</code> (ו-<code>{source_tag}</code>)\n\n"
f"ℹ️ זהו ייבוא תוכן — לא נוצר גיבוי ZIP.\n"
f"תוכל למצוא את הקבצים ב׳🗂 לפי ריפו׳.",
parse_mode="HTML",
)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בייבוא: {e}")
finally:
# ניקוי בטוח של tmp ושל קובץ ה-ZIP
try:
if zip_path and os.path.exists(zip_path):
os.remove(zip_path)
except Exception:
pass
_safe_rmtree_tmp(extracted_dir or "")
_safe_rmtree_tmp(tmp_dir or "")
[תיעוד]
async def github_menu_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט GitHub"""
user_id = update.effective_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
# נקה דגלי זרימת "הדבק קוד" אם היו פעילים, כדי למנוע תקיעה בזרימה
try:
context.user_data.pop("waiting_for_paste_content", None)
context.user_data.pop("waiting_for_paste_filename", None)
context.user_data.pop("paste_content", None)
except Exception:
pass
# בנה הודעת סטטוס
status_msg = "<b>🔧 תפריט GitHub</b>\n\n"
if token:
status_msg += "🔑 <b>מחובר ל-GitHub</b>\n"
else:
status_msg += "🔒 <b>לא מחובר</b>\n"
if session.get("selected_repo"):
status_msg += f"📁 ריפו נבחר: <code>{session['selected_repo']}</code>\n"
if session.get("selected_folder"):
status_msg += f"📂 תיקיית יעד: <code>{session['selected_folder']}</code>\n"
keyboard = []
# כפתור הגדרת טוקן
if not token:
keyboard.append(
[InlineKeyboardButton("🔑 הגדר טוקן GitHub", callback_data="set_token")]
)
# כפתור בחירת ריפו - זמין רק עם טוקן
if token:
keyboard.append([InlineKeyboardButton("📁 בחר ריפו", callback_data="select_repo")])
# יצירת ריפו חדש מ-ZIP גם ללא ריפו נבחר
keyboard.append([InlineKeyboardButton("🆕 צור ריפו חדש מּZIP", callback_data="github_create_repo_from_zip")])
# כפתורי העלאה - מוצגים רק אם יש ריפו נבחר
if token and session.get("selected_repo"):
# העבר את "בחר תיקיית יעד" למעלה, ישירות אחרי "בחר ריפו"
keyboard.append([InlineKeyboardButton("📂 בחר תיקיית יעד", callback_data="set_folder")])
# ניווט בריפו
keyboard.append([InlineKeyboardButton("🗃️ עיין בריפו", callback_data="browse_repo")])
# כפתור העלאה
keyboard.append([InlineKeyboardButton("📤 העלה קובץ חדש", callback_data="upload_file")])
# פעולות נוספות בטוחות
keyboard.append(
[InlineKeyboardButton("📥 הורד קובץ מהריפו", callback_data="download_file_menu")]
)
# כפתור ייבוא ריפו (ZIP רשמי → ייבוא קבצים ל-DB)
keyboard.append(
[InlineKeyboardButton("⬇️ הורד ריפו", callback_data="github_import_repo")]
)
# ריכוז פעולות מחיקה בתפריט משנה
keyboard.append(
[InlineKeyboardButton("🧨 מחק קובץ/ריפו שלם", callback_data="danger_delete_menu")]
)
# התראות חכמות
keyboard.append(
[InlineKeyboardButton("🔔 התראות חכמות", callback_data="notifications_menu")]
)
# תפריט גיבוי/שחזור מרוכז
keyboard.append(
[InlineKeyboardButton("🧰 גיבוי ושחזור", callback_data="github_backup_menu")]
)
# כפתור ניתוח ריפו - תמיד מוצג אם יש טוקן
if token:
keyboard.append([InlineKeyboardButton("🔍 נתח ריפו", callback_data="analyze_repo")])
keyboard.append([InlineKeyboardButton("✅ בדוק תקינות ריפו", callback_data="validate_repo")])
# כפתור יציאה (מחיקת טוקן) כאשר יש טוקן
keyboard.append(
[InlineKeyboardButton("🚪 התנתק מגיטהאב", callback_data="logout_github")]
)
# כפתור הצגת הגדרות
keyboard.append(
[InlineKeyboardButton("📋 הצג הגדרות נוכחיות", callback_data="show_current")]
)
# כפתור סגירה
keyboard.append([InlineKeyboardButton("❌ סגור", callback_data="close_menu")])
reply_markup = InlineKeyboardMarkup(keyboard)
if update.callback_query:
await update.callback_query.edit_message_text(
status_msg, reply_markup=reply_markup, parse_mode="HTML"
)
else:
await update.message.reply_text(
status_msg, reply_markup=reply_markup, parse_mode="HTML"
)
[תיעוד]
async def handle_menu_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle menu button clicks"""
query = update.callback_query
logger.info(
f"📱 GitHub handler received callback: {query.data} from user {query.from_user.id}"
)
await query.answer()
user_id = query.from_user.id
session = self.get_user_session(user_id)
if query.data == "select_repo":
await self.show_repo_selection(query, context)
elif query.data == "browse_repo":
# מצב עיון בריפו עם תצוגת קבצים
context.user_data["browse_action"] = "view"
context.user_data["browse_path"] = ""
context.user_data["browse_page"] = 0
context.user_data["multi_mode"] = False
context.user_data["multi_selection"] = []
await self.show_repo_browser(update, context)
elif query.data == "upload_file":
if not session.get("selected_repo"):
await query.edit_message_text("❌ קודם בחר ריפו!\nשלח /github ובחר 'בחר ריפו'")
else:
folder_display = session.get("selected_folder") or "root"
keyboard = [
[InlineKeyboardButton("✍️ הדבק קוד", callback_data="upload_paste_code")],
[InlineKeyboardButton("🗂 לפי ריפו", callback_data="gh_upload_cat:repos")],
[InlineKeyboardButton("📦 קבצי ZIP", callback_data="gh_upload_cat:zips")],
[InlineKeyboardButton("📂 קבצים גדולים", callback_data="gh_upload_cat:large")],
[InlineKeyboardButton("📁 שאר הקבצים", callback_data="gh_upload_cat:other")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
f"📤 <b>העלאת קובץ לריפו</b>\n"
f"ריפו: <code>{session['selected_repo']}</code>\n"
f"📂 תיקייה: <code>{folder_display}</code>\n\n"
f"בחר מקור להעלאה:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
return
elif query.data == "cancel_paste_flow":
# ביטול מפורש של זרימת "הדבק קוד": נקה דגלים וחזור לתפריט העלאה
try:
context.user_data.pop("waiting_for_paste_content", None)
context.user_data.pop("waiting_for_paste_filename", None)
context.user_data.pop("paste_content", None)
except Exception:
pass
# נווט חזרה למסך "העלה קובץ חדש"
# על ידי קריאה עצמית לסעיף upload_file
if not session.get("selected_repo"):
await query.edit_message_text("❌ קודם בחר ריפו!\nשלח /github ובחר 'בחר ריפו'")
else:
folder_display = session.get("selected_folder") or "root"
keyboard = [
[InlineKeyboardButton("✍️ הדבק קוד", callback_data="upload_paste_code")],
[InlineKeyboardButton("🗂 לפי ריפו", callback_data="gh_upload_cat:repos")],
[InlineKeyboardButton("📦 קבצי ZIP", callback_data="gh_upload_cat:zips")],
[InlineKeyboardButton("📂 קבצים גדולים", callback_data="gh_upload_cat:large")],
[InlineKeyboardButton("📁 שאר הקבצים", callback_data="gh_upload_cat:other")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
f"📤 <b>העלאת קובץ לריפו</b>\n"
f"ריפו: <code>{session['selected_repo']}</code>\n"
f"📂 תיקייה: <code>{folder_display}</code>\n\n"
f"בחר מקור להעלאה:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
return
elif query.data == "upload_paste_code":
# התחלת זרימת "הדבק קוד"
# נקה דגלים ישנים
try:
context.user_data.pop("waiting_for_paste_content", None)
context.user_data.pop("waiting_for_paste_filename", None)
context.user_data.pop("paste_content", None)
except Exception:
pass
context.user_data["waiting_for_paste_content"] = True
await query.edit_message_text(
"✍️ שלח/י כאן את הקוד להעלאה כטקסט.\n\n"
"לאחר מכן אבקש את שם הקובץ (כולל סיומת).",
reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton("🔙 חזור", callback_data="upload_file"),
InlineKeyboardButton("❌ ביטול", callback_data="cancel_paste_flow"),
]
]),
)
return
elif query.data == "gh_upload_cat:repos":
await self.show_upload_repos(update, context)
elif query.data == "gh_upload_cat:zips":
# הצג את כל קבצי ה‑ZIP ששמורים בבוט, ללא סינון לפי ריפו
try:
context.user_data['zip_back_to'] = 'github_upload'
context.user_data.pop('github_backup_context_repo', None)
except Exception:
pass
backup_handler = context.bot_data.get('backup_handler')
if backup_handler is None:
try:
from backup_menu_handler import BackupMenuHandler # טעינה עצלה למניעת תלות מעגלית
backup_handler = BackupMenuHandler()
context.bot_data['backup_handler'] = backup_handler
except Exception as e:
await query.edit_message_text(f"❌ רכיב גיבוי לא זמין: {e}")
return
try:
await backup_handler._show_backups_list(update, context, page=1)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת קבצי ZIP: {e}")
elif query.data.startswith("gh_upload_zip_browse:"):
# עיון בקובץ ZIP שמור ובחירת קובץ מתוכו להעלאה לריפו
backup_id = query.data.split(":", 1)[1]
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים (בחר ריפו עם /github)")
return
try:
infos = backup_manager.list_backups(user_id)
match = next((b for b in infos if getattr(b, 'backup_id', '') == backup_id), None)
if not match or not match.file_path or not os.path.exists(match.file_path):
await query.edit_message_text("❌ הגיבוי לא נמצא בדיסק")
return
# קרא שמות קבצים מתוך ה‑ZIP (ללא תיקיות ו-metadata.json)
import zipfile as _zip
names: list[str] = []
with _zip.ZipFile(match.file_path, 'r') as zf:
for n in zf.namelist():
if n.endswith('/'):
continue
if n == 'metadata.json':
continue
names.append(n)
if not names:
await query.edit_message_text("ℹ️ אין קבצים ב‑ZIP")
return
# עימוד בסיסי
page = int(context.user_data.get('gh_zip_browse_page', 1))
PAGE = 10
total = len(names)
total_pages = (total + PAGE - 1) // PAGE
if page < 1:
page = 1
if page > total_pages:
page = total_pages
start = (page - 1) * PAGE
end = min(start + PAGE, total)
slice_names = names[start:end]
# שמירת מיפוי שמות בקאש הסשן כדי להימנע מ-callback ארוך מדי
try:
cache = context.user_data.setdefault('gh_zip_cache', {})
cache[backup_id] = {'names': names}
except Exception:
pass
# בנה כפתורים לבחירת קובץ להעלאה + עימוד + חזרה (לפי אינדקס)
kb = []
for idx, n in enumerate(slice_names, start=start):
safe_label = n if len(n) <= 64 else (n[:30] + '…' + n[-30:])
kb.append([InlineKeyboardButton(safe_label, callback_data=f"gh_upload_zip_select_idx:{backup_id}:{idx}")])
# עימוד
nav = []
if page > 1:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"gh_upload_zip_page:{backup_id}:{page-1}"))
if page < total_pages:
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"gh_upload_zip_page:{backup_id}:{page+1}"))
if nav:
kb.append(nav)
# חזור לרשימת ה‑ZIPים של העלאה
kb.append([InlineKeyboardButton("🔙 חזור", callback_data="gh_upload_cat:zips")])
await query.edit_message_text(
f"בחר קובץ מתוך ZIP להעלאה לריפו:\n<code>{backup_id}</code>\nעמוד {page}/{total_pages}",
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בקריאת ZIP: {e}")
elif query.data.startswith("gh_upload_zip_page:"):
# ניווט עמודים בעיון ה‑ZIP
try:
_, backup_id, page_str = query.data.split(":", 2)
context.user_data['gh_zip_browse_page'] = max(1, int(page_str))
# הביא מחדש את אותו מסך
await self.handle_menu_callback(update, context)
# החלף את ה-callback ל-browse כדי להפעיל את העדכון
update.callback_query.data = f"gh_upload_zip_browse:{backup_id}"
await self.handle_menu_callback(update, context)
except Exception:
await query.answer("שגיאת עימוד", show_alert=True)
elif query.data.startswith("gh_upload_zip_select_idx:"):
# בחירת קובץ מתוך ZIP לפי אינדקס כדי לעמוד במגבלת 64 בתים של callback_data
try:
_, backup_id, idx_str = query.data.split(":", 2)
idx = int(idx_str)
except Exception:
await query.answer("בקשה לא תקפה", show_alert=True)
return
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים (בחר ריפו עם /github)")
return
# מצא את הנתיב הפנימי לפי הקאש; אם לא קיים — טען מחדש מה‑ZIP
inner_path = None
try:
cache = context.user_data.get('gh_zip_cache', {})
names = (cache.get(backup_id) or {}).get('names') or []
if 0 <= idx < len(names):
inner_path = names[idx]
except Exception:
inner_path = None
if not inner_path:
try:
infos = backup_manager.list_backups(user_id)
match = next((b for b in infos if getattr(b, 'backup_id', '') == backup_id), None)
if not match or not match.file_path or not os.path.exists(match.file_path):
await query.edit_message_text("❌ הגיבוי לא נמצא בדיסק")
return
import zipfile as _zip
with _zip.ZipFile(match.file_path, 'r') as zf:
all_names = [n for n in zf.namelist() if not n.endswith('/') and n != 'metadata.json']
if 0 <= idx < len(all_names):
inner_path = all_names[idx]
else:
await query.edit_message_text("❌ פריט לא קיים ב‑ZIP")
return
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בקריאת ZIP: {e}")
return
# המשך זרימת העלאה זהה לבחירה לפי מחרוזת
try:
infos = backup_manager.list_backups(user_id)
match = next((b for b in infos if getattr(b, 'backup_id', '') == backup_id), None)
if not match or not match.file_path or not os.path.exists(match.file_path):
await query.edit_message_text("❌ הגיבוי לא נמצא בדיסק")
return
import zipfile as _zip
with _zip.ZipFile(match.file_path, 'r') as zf:
try:
raw = zf.read(inner_path)
except Exception:
await query.edit_message_text("❌ קובץ לא נמצא בתוך ה‑ZIP")
return
try:
content_text = raw.decode('utf-8')
except Exception:
try:
content_text = raw.decode('latin-1')
except Exception as e:
await query.edit_message_text(f"❌ לא ניתן לפענח את הקובץ: {e}")
return
target_folder = (context.user_data.get("upload_target_folder") or session.get("selected_folder") or "").strip("/")
target_path = f"{target_folder}/{inner_path}" if target_folder else inner_path
import re as _re
target_path = _re.sub(r"/+", "/", target_path.strip("/"))
g = Github(token)
repo = g.get_repo(repo_name)
branch = context.user_data.get("upload_target_branch") or repo.default_branch or "main"
try:
existing = repo.get_contents(target_path, ref=branch)
result = repo.update_file(
path=target_path,
message=f"Update {inner_path} via Telegram bot",
content=content_text,
sha=existing.sha,
branch=branch,
)
await query.edit_message_text(f"✅ הקובץ עודכן בהצלחה ל-<code>{target_path}</code>", parse_mode="HTML")
except Exception:
result = repo.create_file(
path=target_path,
message=f"Upload {inner_path} via Telegram bot",
content=content_text,
branch=branch,
)
await query.edit_message_text(f"✅ הקובץ הועלה בהצלחה ל-<code>{target_path}</code>", parse_mode="HTML")
kb = [
[InlineKeyboardButton("➕ העלה קובץ נוסף מה‑ZIP", callback_data=f"gh_upload_zip_browse:{backup_id}")],
[InlineKeyboardButton("🔙 חזור", callback_data="gh_upload_cat:zips")],
]
await query.message.reply_text("🎯 בחר פעולה:", reply_markup=InlineKeyboardMarkup(kb))
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בהעלאה: {e}")
elif query.data.startswith("gh_upload_zip_select:"):
# בחירת קובץ ספציפי מתוך ZIP להעלאה לריפו
try:
_, backup_id, inner_path = query.data.split(":", 2)
except Exception:
await query.answer("בקשה לא תקפה", show_alert=True)
return
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים (בחר ריפו עם /github)")
return
# בדוק את ה‑ZIP והוצא את התוכן של הקובץ
try:
infos = backup_manager.list_backups(user_id)
match = next((b for b in infos if getattr(b, 'backup_id', '') == backup_id), None)
if not match or not match.file_path or not os.path.exists(match.file_path):
await query.edit_message_text("❌ הגיבוי לא נמצא בדיסק")
return
import zipfile as _zip
with _zip.ZipFile(match.file_path, 'r') as zf:
try:
raw = zf.read(inner_path)
except Exception:
await query.edit_message_text("❌ קובץ לא נמצא בתוך ה‑ZIP")
return
# המרת תוכן לטקסט (utf-8 או latin-1)
try:
content_text = raw.decode('utf-8')
except Exception:
try:
content_text = raw.decode('latin-1')
except Exception as e:
await query.edit_message_text(f"❌ לא ניתן לפענח את הקובץ: {e}")
return
# יעד: נתיב התיקייה שנבחרה + שם הקובץ המקורי מה‑ZIP
target_folder = (context.user_data.get("upload_target_folder") or session.get("selected_folder") or "").strip("/")
target_path = f"{target_folder}/{inner_path}" if target_folder else inner_path
# ודא שימוש בנתיב נקי ללא כפילויות '/'
import re as _re
target_path = _re.sub(r"/+", "/", target_path.strip("/"))
# בצע יצירה/עדכון
g = Github(token)
repo = g.get_repo(repo_name)
branch = context.user_data.get("upload_target_branch") or repo.default_branch or "main"
try:
existing = repo.get_contents(target_path, ref=branch)
result = repo.update_file(
path=target_path,
message=f"Update {inner_path} via Telegram bot",
content=content_text,
sha=existing.sha,
branch=branch,
)
await query.edit_message_text(f"✅ הקובץ עודכן בהצלחה ל-<code>{target_path}</code>", parse_mode="HTML")
except Exception:
result = repo.create_file(
path=target_path,
message=f"Upload {inner_path} via Telegram bot",
content=content_text,
branch=branch,
)
await query.edit_message_text(f"✅ הקובץ הועלה בהצלחה ל-<code>{target_path}</code>", parse_mode="HTML")
# הצע פעולות המשך: בחר קובץ נוסף מה‑ZIP או חזור
kb = [
[InlineKeyboardButton("➕ העלה קובץ נוסף מה‑ZIP", callback_data=f"gh_upload_zip_browse:{backup_id}")],
[InlineKeyboardButton("🔙 חזור", callback_data="gh_upload_cat:zips")],
]
await query.message.reply_text("🎯 בחר פעולה:", reply_markup=InlineKeyboardMarkup(kb))
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בהעלאה: {e}")
elif query.data == "gh_upload_cat:large":
await self.upload_large_files_menu(update, context)
elif query.data == "gh_upload_cat:other":
await self.show_upload_other_files(update, context)
# --- Create new repository from ZIP ---
elif query.data == "github_create_repo_from_zip":
# הכנה לזרימת יצירת ריפו חדש מתוך ZIP
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
if not token:
await query.edit_message_text("❌ אין חיבור ל-GitHub. שלח /github כדי להגדיר טוקן")
return
# נקה דגלים ישנים כדי למנוע בלבול בקליטת המסמכים
context.user_data["waiting_for_github_upload"] = False
context.user_data["upload_mode"] = "github_create_repo_from_zip"
# ברירת מחדל: ריפו פרטי
context.user_data["new_repo_private"] = True
vis_text = "פרטי" if context.user_data.get("new_repo_private", True) else "ציבורי"
kb = [
[InlineKeyboardButton("✍️ הקלד שם ריפו", callback_data="github_new_repo_name")],
[
InlineKeyboardButton(
"🔒 פרטי ✅" if context.user_data.get("new_repo_private", True) else "🔒 פרטי",
callback_data="github_set_new_repo_visibility:1"
),
InlineKeyboardButton(
"🌐 ציבורי ✅" if not context.user_data.get("new_repo_private", True) else "🌐 ציבורי",
callback_data="github_set_new_repo_visibility:0"
),
],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
help_txt = (
"🆕 <b>יצירת ריפו חדש מּZIP</b>\n\n"
"1) ניתן להקליד שם לריפו (ללא רווחים)\n"
"2) בחר אם הריפו יהיה <b>פרטי</b> או <b>ציבורי</b>\n"
"3) שלח עכשיו קובץ ZIP עם כל הקבצים\n\n"
"אם לא תוקלד שם, ננסה לחלץ שם מתיקיית-הבסיס בּZIP או משם הקובץ.\n"
"ברירת מחדל: <code>repo-<timestamp></code>\n\n"
f"נראות נוכחית: <b>{vis_text}</b>\n"
"לאחר השליחה, ניצור ריפו לפי בחירתך ונפרוס את התוכן ב-commit אחד."
)
await query.edit_message_text(help_txt, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb))
return
elif query.data == "github_new_repo_name":
# בקשת שם לריפו החדש
context.user_data["waiting_for_new_repo_name"] = True
await query.edit_message_text(
"✏️ הקלד שם לריפו החדש (מותר אותיות, מספרים, נקודות, מקפים וקו תחתון).\nשלח טקסט עכשיו.",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔙 חזור", callback_data="github_create_repo_from_zip")]])
)
return
elif query.data.startswith("github_set_new_repo_visibility:"):
# בחירת נראות (פרטי/ציבורי) לריפו החדש
flag = query.data.split(":", 1)[1]
is_private = flag == "1"
context.user_data["new_repo_private"] = is_private
vis_text = "פרטי" if is_private else "ציבורי"
kb = [
[InlineKeyboardButton("✍️ הקלד שם ריפו", callback_data="github_new_repo_name")],
[
InlineKeyboardButton(
"🔒 פרטי ✅" if is_private else "🔒 פרטי",
callback_data="github_set_new_repo_visibility:1"
),
InlineKeyboardButton(
"🌐 ציבורי ✅" if not is_private else "🌐 ציבורי",
callback_data="github_set_new_repo_visibility:0"
),
],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
help_txt = (
"🆕 <b>יצירת ריפו חדש מּZIP</b>\n\n"
"1) ניתן להקליד שם לריפו (ללא רווחים)\n"
"2) בחר אם הריפו יהיה <b>פרטי</b> או <b>ציבורי</b>\n"
"3) שלח עכשיו קובץ ZIP עם כל הקבצים\n\n"
"אם לא תוקלד שם, ננסה לחלץ שם מתיקיית-הבסיס בּZIP או משם הקובץ.\n"
"ברירת מחדל: <code>repo-<timestamp></code>\n\n"
f"נראות נוכחית: <b>{vis_text}</b>\n"
"לאחר השליחה, ניצור ריפו לפי בחירתך ונפרוס את התוכן ב-commit אחד."
)
try:
await query.edit_message_text(help_txt, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb))
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
try:
await query.answer("עודכנה הנראות", show_alert=False)
except Exception:
pass
return
elif query.data.startswith("gh_upload_repo:"):
tag = query.data.split(":", 1)[1]
await self.show_upload_repo_files(update, context, tag)
elif query.data.startswith("other_files_page_"):
try:
p = int(query.data.split("_")[-1])
except Exception:
p = 1
# שמירת עמוד, כדי שלא נקפוץ לעמוד הראשון אחרי פעולה
context.user_data["other_files_page"] = max(1, p)
await self.show_upload_other_files(update, context)
elif query.data.startswith("repo_files_page:"):
# פורמט: repo_files_page:<repo_tag>:<page>
try:
_, repo_tag, page_s = query.data.split(":", 2)
page = int(page_s)
except Exception:
repo_tag, page = None, 1
if repo_tag:
# שמור עמוד נוכחי לכל תגית
d = context.user_data.get("repo_files_page") or {}
d[repo_tag] = page
context.user_data["repo_files_page"] = d
await self.show_upload_repo_files(update, context, repo_tag)
elif query.data.startswith("gh_upload_large:"):
file_id = query.data.split(":", 1)[1]
await self.handle_large_file_upload(update, context, file_id)
# הוסר: "upload_saved" — הזרימה כלולה ב"העלה קובץ חדש"
elif query.data.startswith("repos_page_"):
page = int(query.data.split("_")[2])
await self.show_repos(update, context, page)
elif query.data.startswith("upload_saved_"):
file_id = query.data.split("_")[2]
# Show pre-upload check screen before actual upload
context.user_data["pending_saved_file_id"] = file_id
await self.show_pre_upload_check(update, context)
elif query.data == "choose_upload_branch":
await self.show_upload_branch_menu(update, context)
elif query.data.startswith("upload_branches_page_"):
try:
p = int(query.data.split("_")[-1])
except Exception:
p = 0
context.user_data["upload_branches_page"] = max(0, p)
await self.show_upload_branch_menu(update, context)
elif query.data.startswith("upload_select_branch:"):
br = query.data.split(":", 1)[1]
context.user_data["upload_target_branch"] = br
await self.show_pre_upload_check(update, context)
elif query.data == "choose_upload_folder":
await self.show_upload_folder_menu(update, context)
elif query.data.startswith("upload_select_folder:"):
# בחירת תיקייה מתוך דפדפן הריפו
folder_path = query.data.split(":", 1)[1]
# normalize to no leading/trailing slashes
folder_norm = (folder_path or "").strip("/")
context.user_data["upload_target_folder"] = folder_norm
await self.show_pre_upload_check(update, context)
elif query.data == "upload_folder_root":
context.user_data["upload_target_folder"] = ""
await self.show_pre_upload_check(update, context)
elif query.data == "upload_folder_current":
context.user_data["upload_target_folder"] = (session.get("selected_folder") or "")
await self.show_pre_upload_check(update, context)
elif query.data == "upload_folder_custom":
await self.ask_upload_folder(update, context)
elif query.data == "upload_folder_create":
if hasattr(self, "create_upload_folder"):
await self.create_upload_folder(update, context)
else:
await query.answer("אין פעולה זמינה ליצירת תיקייה", show_alert=True)
elif query.data == "confirm_saved_upload":
file_id = context.user_data.get("pending_saved_file_id")
if not file_id:
await query.edit_message_text("❌ לא נמצא קובץ ממתין להעלאה")
else:
await self.handle_saved_file_upload(update, context, file_id)
elif query.data == "refresh_saved_checks":
await self.show_pre_upload_check(update, context)
elif query.data == "back_to_menu":
await self.github_menu_command(update, context)
elif query.data == "folder_select_done":
# סיום בחירת תיקייה דרך הדפדפן והצגת מצב
context.user_data.pop("folder_select_mode", None)
await self.github_menu_command(update, context)
elif query.data.startswith("folder_set_session:"):
folder_path = query.data.split(":", 1)[1]
session["selected_folder"] = (folder_path or "").strip("/") or None
await query.answer(f"✅ תיקיית יעד עודכנה ל-{session['selected_folder'] or 'root'}", show_alert=False)
# יציאה ממסך בחירת תיקייה וחזרה לתפריט הראשי כדי למנוע שגיאת "Message is not modified"
context.user_data.pop("folder_select_mode", None)
await self.github_menu_command(update, context)
elif query.data == "noop":
await query.answer(cache_time=0) # לא עושה כלום, רק לכפתור התצוגה
# --- New: logout GitHub token from menu ---
elif query.data == "logout_github":
from database import db
removed = db.delete_github_token(user_id)
try:
session["github_token"] = None
# נקה גם בחירות קודמות כאשר מתנתקים
session["selected_repo"] = None
session["selected_folder"] = None
except Exception:
pass
# נקה קאש ריפוזיטוריז
context.user_data.pop("repos", None)
context.user_data.pop("repos_cache_time", None)
if removed:
await query.edit_message_text("🔐 התנתקת מ-GitHub והטוקן נמחק.⏳ מרענן תפריט...")
else:
await query.edit_message_text("ℹ️ לא נמצא טוקן או שאירעה שגיאה.⏳ מרענן תפריט...")
# refresh the menu after logout
await self.github_menu_command(update, context)
return
elif query.data == "github_import_repo":
# פתיחת זרימת ייבוא ריפו (בחירת ענף → ייבוא)
repo_full = session.get("selected_repo")
if not repo_full:
await query.edit_message_text("❌ קודם בחר ריפו!\nשלח /github ובחר 'בחר ריפו'")
return
await self.show_import_branch_menu(update, context)
return
elif query.data.startswith("import_repo_branches_page_"):
try:
p = int(query.data.rsplit("_", 1)[-1])
except Exception:
p = 0
context.user_data["import_branches_page"] = max(0, p)
await self.show_import_branch_menu(update, context)
return
elif query.data.startswith("import_repo_select_branch:"):
token = query.data.split(":", 1)[1]
token_map = context.user_data.get("import_branch_token_map", {})
branch = token_map.get(token, token)
context.user_data["import_repo_branch"] = branch
await self._confirm_import_repo(update, context, branch)
return
elif query.data == "import_repo_start":
# התחלת ייבוא בפועל
repo_full = session.get("selected_repo") or ""
branch = context.user_data.get("import_repo_branch")
if not repo_full or not branch:
await query.edit_message_text("❌ חסרים נתונים ליבוא. בחר ריפו וענף מחדש.")
return
await self.import_repo_from_zip(update, context, repo_full, branch)
return
elif query.data == "import_repo_cancel":
await self.github_menu_command(update, context)
return
elif query.data == "analyze_repo":
logger.info(f"🔍 User {query.from_user.id} clicked 'analyze_repo' button")
await self.show_analyze_repo_menu(update, context)
elif query.data == "analyze_current_repo":
# נתח את הריפו הנבחר
logger.info(f"📊 User {query.from_user.id} analyzing current repo")
session = self.get_user_session(query.from_user.id)
repo_url = f"https://github.com/{session['selected_repo']}"
await self.analyze_repository(update, context, repo_url)
elif query.data == "back_to_github_menu":
await self.github_menu_command(update, context)
elif query.data == "analyze_other_repo":
logger.info(f"🔄 User {query.from_user.id} wants to analyze another repo")
await self.analyze_another_repo(update, context)
elif query.data == "show_suggestions":
await self.show_improvement_suggestions(update, context)
elif query.data == "show_full_analysis":
await self.show_full_analysis(update, context)
elif query.data == "download_analysis_json":
await self.download_analysis_json(update, context)
elif query.data == "github_backup_menu":
await self.show_github_backup_menu(update, context)
elif query.data == "github_backup_db_list":
# מעבר לרשימת "גיבויי DB אחרונים" מתוך תפריט GitHub, עם חזרה ל-GitHub
try:
backup_handler = context.bot_data.get('backup_handler')
if backup_handler is None:
from backup_menu_handler import BackupMenuHandler
backup_handler = BackupMenuHandler()
context.bot_data['backup_handler'] = backup_handler
# קבע הקשר חזרה ל-GitHub והסר סינון לפי ריפו לרשימה זו
context.user_data['zip_back_to'] = 'github'
try:
context.user_data.pop('github_backup_context_repo')
except Exception:
pass
await backup_handler._show_backups_list(update, context, page=1)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת גיבויים: {e}")
elif query.data == "github_restore_zip_to_repo":
# התחלת שחזור ZIP ידני לריפו: הגדר מצב העלאה ובקש בחירת purge
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
try:
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
try:
await query.answer("❌ חסר טוקן או ריפו נבחר", show_alert=True)
except Exception:
pass
return
# ודא שניקינו דגלים ישנים של העלאה רגילה כדי למנוע בלבול
context.user_data["waiting_for_github_upload"] = False
context.user_data["upload_mode"] = "github_restore_zip_to_repo"
# נעל את יעד הריפו הצפוי לשחזור (חגורת בטיחות נגד ריפו אחר)
try:
context.user_data["zip_restore_expected_repo_full"] = repo_full
except Exception:
# לא קריטי אם נכשלת שמירת סטייט - נאתר בהמשך
pass
kb = [
[InlineKeyboardButton("🧹 מחיקה מלאה לפני העלאה", callback_data="github_restore_zip_setpurge:1")],
[InlineKeyboardButton("🚫 אל תמחק, רק עדכן", callback_data="github_restore_zip_setpurge:0")],
[InlineKeyboardButton("❌ ביטול", callback_data="github_backup_menu")],
]
try:
await query.edit_message_text(
"בחר מצב שחזור ZIP לריפו, ואז שלח קובץ ZIP עכשיו:",
reply_markup=InlineKeyboardMarkup(kb),
)
except BadRequest as br:
if "message is not modified" not in str(br).lower():
await query.message.reply_text(
"בחר מצב שחזור ZIP לריפו, ואז שלח קובץ ZIP עכשיו:",
reply_markup=InlineKeyboardMarkup(kb),
)
else:
try:
await query.answer("אין שינוי בתצוגה", show_alert=False)
except Exception:
pass
elif query.data.startswith("github_restore_zip_setpurge:"):
# טיפול בבחירת מצב מחיקה/עדכון לפני העלאה
purge_flag = query.data.split(":", 1)[1] == "1"
# ודא שניקינו דגלים ישנים של העלאה רגילה כדי למנוע בלבול
context.user_data["waiting_for_github_upload"] = False
context.user_data["upload_mode"] = "github_restore_zip_to_repo"
context.user_data["github_restore_zip_purge"] = purge_flag
# השאר את היעד הצפוי אם כבר נקבע קודם
if not context.user_data.get("zip_restore_expected_repo_full"):
try:
context.user_data["zip_restore_expected_repo_full"] = session.get("selected_repo")
except Exception:
pass
await query.edit_message_text(
("🧹 יבוצע ניקוי לפני העלאה. " if purge_flag else "🔁 ללא מחיקה. ") +
"שלח עכשיו קובץ ZIP לשחזור לריפו."
)
return
elif query.data == "github_restore_zip_list":
# הצג רשימת גיבויים (ZIP) של הריפו הנוכחי לצורך שחזור לריפו
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_full = session.get("selected_repo")
if not repo_full:
await query.edit_message_text("❌ קודם בחר ריפו!")
return
backups = backup_manager.list_backups(user_id)
# סנן רק גיבויים עם metadata של אותו ריפו
backups = [b for b in backups if getattr(b, 'repo', None) == repo_full]
if not backups:
await query.edit_message_text(
f"ℹ️ אין גיבויי ZIP שמורים עבור הריפו:\n<code>{repo_full}</code>",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔙 חזור", callback_data="github_backup_menu")]])
)
return
# הצג עד 10 אחרונים
items = backups[:10]
lines = [f"בחר גיבוי לשחזור לריפו:\n<code>{repo_full}</code>\n"]
kb = []
for b in items:
lines.append(f"• {b.backup_id} — {b.created_at.strftime('%d/%m/%Y %H:%M')} — {int(b.total_size/1024)}KB")
kb.append([InlineKeyboardButton("♻️ שחזר גיבוי זה לריפו", callback_data=f"github_restore_zip_from_backup:{b.backup_id}")])
kb.append([InlineKeyboardButton("🔙 חזור", callback_data="github_backup_menu")])
await query.edit_message_text("\n".join(lines), reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
return
elif query.data.startswith("github_restore_zip_from_backup:"):
# קבל backup_id ואז פתח את תהליך השחזור-לריפו עם קובץ ה-ZIP הזה
backup_id = query.data.split(":", 1)[1]
user_id = query.from_user.id
info_list = backup_manager.list_backups(user_id)
match = next((b for b in info_list if b.backup_id == backup_id), None)
if not match or not match.file_path or not os.path.exists(match.file_path):
await query.edit_message_text("❌ הגיבוי לא נמצא בדיסק")
return
# הגדר purge? בקש בחירה
context.user_data["pending_repo_restore_zip_path"] = match.file_path
# נעל את יעד הריפו הצפוי עבור השחזור מתוך גיבוי
try:
context.user_data["zip_restore_expected_repo_full"] = self.get_user_session(user_id).get("selected_repo")
except Exception:
pass
await query.edit_message_text(
"האם למחוק קודם את התוכן בריפו לפני העלאה?",
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("🧹 מחיקה מלאה לפני העלאה", callback_data="github_repo_restore_backup_setpurge:1")],
[InlineKeyboardButton("🚫 אל תמחק, רק עדכן", callback_data="github_repo_restore_backup_setpurge:0")],
[InlineKeyboardButton("❌ ביטול", callback_data="github_backup_menu")],
])
)
return
elif query.data == "github_backup_help":
help_text = (
"<b>הסבר על הכפתורים:</b>\n\n"
"📦 <b>הורד גיבוי ZIP של הריפו</b>: יוצר ומוריד ZIP של כל התוכן (או תיקייה נוכחית), וגם שומר כגיבוי לשימוש עתידי.\n\n"
"♻️ <b>שחזר ZIP לריפו (פריסה והחלפה)</b>: שלח ZIP מהמחשב, והבוט יפרוס אותו לריפו בקומיט אחד. ניתן לבחור מחיקה מלאה לפני או עדכון בלבד.\n\n"
"📂 <b>שחזר מגיבוי שמור לריפו</b>: בחר ZIP ששמור בבוט עבור הריפו הזה, והבוט יפרוס אותו לריפו (מחיקה/עדכון לפי בחירה).\n\n"
"🏷 <b>נקודת שמירה בגיט</b>: יוצר תגית/ענף נקודת שמירה של הריפו הנוכחי כדי שתוכל לחזור אליה.\n\n"
"↩️ <b>חזרה לנקודת שמירה</b>: פעולות לשחזור מצב מהרפרנס של נקודת שמירה (תגית/ענף) — כולל יצירת ענף/PR לשחזור.\n\n"
"🗂 <b>גיבויי DB אחרונים</b>: מציג גיבויים של קבצים בבוט עצמו (לא קשור ל‑GitHub).\n\n"
"♻️ <b>שחזור מגיבוי (ZIP)</b>: שחזור מלא לקבצים בבוט עצמו מקובץ ZIP. מוחק את כל הקבצים בבוט ואז משחזר.\n\n"
"🔙 <b>חזור</b>: חזרה לתפריט הראשי של GitHub."
)
try:
await query.edit_message_text(help_text, parse_mode="HTML", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔙 חזור", callback_data="github_backup_menu")]]))
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
return
elif query.data == "backup_menu":
# האצלת תצוגת תפריט הגיבוי/שחזור של DB ל-BackupMenuHandler
backup_handler = context.bot_data.get('backup_handler')
if backup_handler:
await backup_handler.show_backup_menu(update, context)
else:
await query.edit_message_text("❌ רכיב גיבוי לא זמין")
elif query.data == "back_to_analysis":
await self.show_full_analysis(update, context)
elif query.data == "back_to_analysis_menu":
await self.show_analyze_results_menu(update, context)
elif query.data == "back_to_summary":
await self.show_analyze_results_menu(update, context)
elif query.data == "choose_my_repo":
await self.show_repos(update, context)
elif query.data == "enter_repo_url":
await self.request_repo_url(update, context)
elif query.data.startswith("suggestion_"):
suggestion_index = int(query.data.split("_")[1])
await self.show_suggestion_details(update, context, suggestion_index)
elif query.data == "show_current":
current_repo = session.get("selected_repo", "לא נבחר")
current_folder = session.get("selected_folder") or "root"
has_token = "✅" if self.get_user_token(user_id) else "❌"
keyboard = [[InlineKeyboardButton("🔙 חזרה לתפריט", callback_data="github_menu")]]
await query.edit_message_text(
f"📊 <b>הגדרות נוכחיות:</b>\n\n"
f"📁 ריפו: <code>{current_repo}</code>\n"
f"📂 תיקייה: <code>{current_folder}</code>\n"
f"🔑 טוקן מוגדר: {has_token}\n\n"
f"💡 טיפ: השתמש ב-'בחר תיקיית יעד' כדי לשנות את מיקום ההעלאה",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(keyboard),
)
elif query.data == "set_token":
await query.edit_message_text(
"🔑 שלח לי את הטוקן של GitHub:\n\n"
"הטוקן יישמר בצורה מאובטחת לחשבון שלך לצורך שימוש עתידי.\n"
"תוכל להסיר אותו בכל עת עם הפקודה /github_logout.\n\n"
"💡 טיפ: צור טוקן ב:\n"
"https://github.com/settings/tokens"
)
return REPO_SELECT
elif query.data == "set_folder":
# פתח דפדפן ריפו לבחירת תיקיה אמיתית מתוך הריפו
# סימון מצב בחירת תיקיה עבור session
context.user_data["folder_select_mode"] = "session"
# אתחל מצב דפדוף
current = (session.get("selected_folder") or "").strip("/")
context.user_data["browse_action"] = "download"
context.user_data["browse_path"] = current
context.user_data["browse_page"] = 0
context.user_data["multi_mode"] = False
context.user_data["multi_selection"] = []
await self.show_repo_browser(update, context)
elif query.data.startswith("folder_"):
folder = query.data.replace("folder_", "")
if folder == "custom":
# בקש קלט לתיקייה מותאמת אישית
context.user_data["waiting_for_selected_folder"] = True
await query.edit_message_text(
"✏️ הקלד שם תיקייה (לדוגמה: src/images)\n"
"השאר ריק או הקלד / כדי לבחור root"
)
return FOLDER_SELECT
elif folder == "root":
session["selected_folder"] = None
await query.answer("✅ תיקייה עודכנה ל-root", show_alert=False)
await self.github_menu_command(update, context)
else:
session["selected_folder"] = folder.replace("_", "/")
await query.answer(f"✅ תיקייה עודכנה ל-{session['selected_folder']}", show_alert=False)
await self.github_menu_command(update, context)
elif query.data in ("create_folder", "upload_folder_create"):
# בקש מהמשתמש נתיב תיקייה חדשה ליצירה (ניצור .gitkeep בתוך התיקייה)
return_to_pre = (query.data == "upload_folder_create")
context.user_data["waiting_for_new_folder_path"] = True
context.user_data["return_to_pre_upload"] = return_to_pre
keyboard = [[InlineKeyboardButton("🔙 חזור", callback_data="create_folder_back"), InlineKeyboardButton("❌ ביטול", callback_data="create_folder_cancel")]]
await query.edit_message_text(
"➕ יצירת תיקייה חדשה\n\n"
"✏️ כתוב נתיב תיקייה חדשה (לדוגמה: src/new/section).\n"
"ניצור קובץ .gitkeep בתוך התיקייה כדי ש‑Git ישמור אותה.",
reply_markup=InlineKeyboardMarkup(keyboard)
)
return REPO_SELECT
elif query.data == "create_folder_back":
# חזרה למסך "תיקיית יעד"
context.user_data["waiting_for_new_folder_path"] = False
await self.show_upload_folder_menu(update, context)
return REPO_SELECT
elif query.data == "create_folder_cancel":
# ביטול יצירת תיקייה וחזרה לתפריט GitHub
context.user_data["waiting_for_new_folder_path"] = False
context.user_data["return_to_pre_upload"] = False
await self.github_menu_command(update, context)
return REPO_SELECT
elif query.data == "github_menu":
# חזרה לתפריט הראשי של GitHub
context.user_data["waiting_for_github_upload"] = False
context.user_data["in_github_menu"] = False
context.user_data.pop("folder_select_mode", None)
# נקה דגל סינון גיבויים לפי ריפו, אם קיים
# נקה דגלים זמניים של יצירת ריפו חדש
try:
context.user_data.pop("waiting_for_new_repo_name", None)
context.user_data.pop("new_repo_name", None)
if context.user_data.get("upload_mode") == "github_create_repo_from_zip":
context.user_data.pop("upload_mode", None)
context.user_data.pop("new_repo_private", None)
except Exception:
pass
try:
context.user_data.pop("github_backup_context_repo", None)
except Exception:
pass
await self.github_menu_command(update, context)
return ConversationHandler.END
elif query.data == "git_checkpoint":
await self.git_checkpoint(update, context)
elif query.data.startswith("git_checkpoint_doc:"):
parts = query.data.split(":", 2)
kind = parts[1] if len(parts) > 1 else ""
name = parts[2] if len(parts) > 2 else ""
await self.create_checkpoint_doc(update, context, kind, name)
elif query.data == "git_checkpoint_doc_skip":
kb = [[InlineKeyboardButton("🔙 חזור", callback_data="back_to_menu")]]
await query.edit_message_text(
"✅ נקודת שמירה נוצרה. ניתן לחזור לתפריט או להעלות קבצים שמורים.",
reply_markup=InlineKeyboardMarkup(kb),
)
elif query.data == "restore_checkpoint_menu":
await self.show_restore_checkpoint_menu(update, context)
elif query.data.startswith("restore_tags_page_"):
try:
p = int(query.data.split("_")[-1])
except Exception:
p = 0
context.user_data["restore_tags_page"] = max(0, p)
await self.show_restore_checkpoint_menu(update, context)
elif query.data.startswith("restore_select_tag:"):
tag_name = query.data.split(":", 1)[1]
await self.show_restore_tag_actions(update, context, tag_name)
elif query.data.startswith("restore_branch_from_tag:"):
tag_name = query.data.split(":", 1)[1]
await self.create_branch_from_tag(update, context, tag_name)
elif query.data.startswith("open_pr_from_branch:"):
branch_name = query.data.split(":", 1)[1]
await self.open_pr_from_branch(update, context, branch_name)
elif query.data.startswith("restore_revert_pr_from_tag:"):
tag_name = query.data.split(":", 1)[1]
await self.create_revert_pr_from_tag(update, context, tag_name)
elif query.data == "close_menu":
await query.edit_message_text("👋 התפריט נסגר")
elif query.data.startswith("repo_"):
if query.data == "repo_manual":
await query.edit_message_text(
"✏️ הקלד שם ריפו בפורמט:\n"
"<code>owner/repository</code>\n\n"
"לדוגמה: <code>amirbiron/CodeBot</code>",
parse_mode="HTML",
)
return REPO_SELECT
else:
repo_name = query.data.replace("repo_", "")
session["selected_repo"] = repo_name
# איפוס תיקיות יעד ישנות בעת בחירת ריפו חדש
session["selected_folder"] = None
context.user_data.pop("upload_target_folder", None)
context.user_data.pop("upload_target_branch", None)
# נקה סטייטים זמניים של זרם שחזור/גיבוי כדי למנוע נעילה לריפו קודם
try:
context.user_data.pop("zip_restore_expected_repo_full", None)
context.user_data.pop("github_restore_zip_purge", None)
context.user_data.pop("pending_repo_restore_zip_path", None)
context.user_data.pop("upload_mode", None)
except Exception:
pass
# שמור במסד נתונים
from database import db
db.save_selected_repo(user_id, repo_name)
# הצג את התפריט המלא אחרי בחירת הריפו
await self.github_menu_command(update, context)
return
elif query.data == "danger_delete_menu":
await self.show_danger_delete_menu(update, context)
elif query.data == "delete_file_menu":
await self.show_delete_file_menu(update, context)
elif query.data == "delete_repo_menu":
await self.show_delete_repo_menu(update, context)
elif query.data == "confirm_delete_file":
await self.confirm_delete_file(update, context)
elif query.data == "confirm_delete_repo_step1":
await self.confirm_delete_repo_step1(update, context)
elif query.data == "confirm_delete_repo":
await self.confirm_delete_repo(update, context)
elif query.data == "download_file_menu":
await self.show_download_file_menu(update, context)
elif query.data.startswith("browse_open:") or query.data.startswith("browse_open_i:"):
path = self._get_path_from_cb(context, query.data, "browse_open")
context.user_data["browse_path"] = path
context.user_data["browse_page"] = 0
# מצב מרובה ומחיקה בטוחה לאיפוס
context.user_data["multi_selection"] = []
await self.show_repo_browser(update, context)
elif query.data == "browse_ref_menu":
await self.show_browse_ref_menu(update, context)
elif query.data.startswith("browse_refs_branches_page_"):
try:
p = int(query.data.rsplit('_', 1)[1])
except Exception:
p = 0
context.user_data["browse_refs_branches_page"] = max(0, p)
context.user_data["browse_ref_tab"] = "branches"
await self.show_browse_ref_menu(update, context)
elif query.data.startswith("browse_refs_tags_page_"):
try:
p = int(query.data.rsplit('_', 1)[1])
except Exception:
p = 0
context.user_data["browse_refs_tags_page"] = max(0, p)
context.user_data["browse_ref_tab"] = "tags"
await self.show_browse_ref_menu(update, context)
elif query.data.startswith("browse_select_ref:"):
# עדכון ref נוכחי והחזרה לדפדפן
ref = query.data.split(":", 1)[1]
context.user_data["browse_ref"] = ref
context.user_data["browse_page"] = 0
await self.show_repo_browser(update, context)
elif query.data == "browse_search":
# בקש מהמשתמש להזין מחרוזת חיפוש לשמות קבצים
context.user_data["browse_search_mode"] = True
try:
await query.answer("הקלד עכשיו את השם לחיפוש (למשל: README)")
except Exception:
pass
try:
await query.edit_message_text(
"🔎 הזן/י מחרוזת לחיפוש בשם קובץ (לדוגמה: README או app.py)",
)
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
elif query.data.startswith("browse_search_page:"):
try:
page = int(query.data.split(":", 1)[1])
except Exception:
page = 1
context.user_data["browse_search_page"] = max(1, page)
await self.show_browse_search_results(update, context)
elif query.data.startswith("browse_select_download:") or query.data.startswith("browse_select_download_i:"):
path = self._get_path_from_cb(context, query.data, "browse_select_download")
# שמור על browse_action=download כדי שלא ייחשפו כפתורי מחיקה לאחר ההורדה
context.user_data.pop("waiting_for_download_file_path", None)
# אל תאפס את browse_action; נשמור אותו כ-download
try:
if context.user_data.get("browse_action") != "download":
context.user_data["browse_action"] = "download"
except Exception:
context.user_data["browse_action"] = "download"
# שמור את הנתיב האחרון כדי שהדפדפן יישאר באותו מיקום
try:
context.user_data["browse_path"] = context.user_data.get("browse_path") or "/".join((path or "").split("/")[:-1])
except Exception:
pass
# הורדה מיידית
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not token or not repo_name:
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
g = Github(token)
repo = g.get_repo(repo_name)
contents = repo.get_contents(path)
# אם הקובץ גדול מדי, שלח קישור להורדה במקום תוכן מלא
size = getattr(contents, "size", 0) or 0
if size and size > MAX_INLINE_FILE_BYTES:
download_url = getattr(contents, "download_url", None)
if download_url:
await query.message.reply_text(
f'⚠️ הקובץ גדול ({format_bytes(size)}). להורדה: <a href="{download_url}">קישור ישיר</a>',
parse_mode="HTML",
)
else:
await query.message.reply_text(
f"⚠️ הקובץ גדול ({format_bytes(size)}) ולא ניתן להורידו ישירות כרגע."
)
else:
data = contents.decoded_content
base = __import__('os').path
filename = base.basename(contents.path) or "downloaded_file"
await query.message.reply_document(document=BytesIO(data), filename=filename)
# הישאר בדפדפן במצב הורדה בלבד, עדכן מקלדת בלי להחליף טקסט
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data.startswith("browse_select_view:") or query.data.startswith("browse_select_view_i:"):
# מצב תצוגת קובץ חלקית עם "הצג עוד"
path = self._get_path_from_cb(context, query.data, "browse_select_view")
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים (בחר ריפו עם /github)")
return
g = Github(token)
repo = g.get_repo(repo_name)
# כבדוק ref נוכחי
try:
current_ref = (context.user_data.get("browse_ref") or getattr(repo, "default_branch", None) or "main")
except Exception:
current_ref = getattr(repo, "default_branch", None) or "main"
try:
contents = repo.get_contents(path, ref=current_ref)
data = contents.decoded_content.decode("utf-8", errors="replace")
# שמירת נתוני עזר: גודל ושפה מזוהה
try:
from utils import detect_language_from_filename
detected_lang = detect_language_from_filename(path)
except Exception:
detected_lang = "text"
context.user_data["view_file_size"] = int(getattr(contents, "size", 0) or 0)
context.user_data["view_detected_language"] = detected_lang
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת קובץ: {safe_html_escape(str(e))}", parse_mode="HTML")
return
# שימור טקסט בזיכרון קצר (user_data) + אינדקס עמוד
context.user_data["view_file_path"] = path
context.user_data["view_file_text"] = data
context.user_data["view_page_index"] = 0
await self._render_file_view(update, context)
elif query.data == "view_more":
# הצג עוד עמוד; נגן מפני None/מחרוזת
try:
current_index = int(context.user_data.get("view_page_index", 0) or 0)
except Exception:
current_index = 0
context.user_data["view_page_index"] = current_index + 1
await self._render_file_view(update, context)
elif query.data == "view_back":
# אם הגענו מתוצאות חיפוש – חזרה לעמוד החיפוש האחרון
if context.user_data.get("last_results_were_search"):
try:
await self.show_browse_search_results(update, context)
finally:
# ננקה את הדגל רק אחרי שחזרנו למסך החיפוש
context.user_data.pop("last_results_were_search", None)
else:
# חזרה לעץ הריפו (שומר path)
context.user_data["browse_action"] = "view"
if context.user_data.get("browse_page") is None:
context.user_data["browse_page"] = 0
await self.show_repo_browser(update, context)
elif query.data.startswith("browse_select_delete:") or query.data.startswith("browse_select_delete_i:"):
path = self._get_path_from_cb(context, query.data, "browse_select_delete")
# דרוש אישור לפני מחיקה
context.user_data["pending_delete_file_path"] = path
keyboard = [
[InlineKeyboardButton("✅ אישור מחיקה", callback_data="confirm_delete_file")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
"האם אתה בטוח שברצונך למחוק את הקובץ הבא?\n\n" f"<code>{path}</code>",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
elif query.data.startswith("download_zip:") or query.data.startswith("download_zip_i:"):
# הורדת התיקייה הנוכחית כקובץ ZIP
current_path = self._get_path_from_cb(context, query.data, "download_zip")
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
try:
await query.answer(
"מוריד תיקייה כ־ZIP, התהליך עשוי להימשך 1–2 דקות.", show_alert=True
)
g = Github(token)
repo = g.get_repo(repo_name)
# Fast path: הורדת ZIP מלא של הריפו דרך zipball
if not current_path:
try:
import requests
import zipfile as _zip
from datetime import datetime as _dt, timezone as _tz
url = repo.get_archive_link("zipball")
r = requests.get(url, timeout=60)
r.raise_for_status()
# בנה ZIP חדש עם metadata.json משולב כדי לאפשר רישום בגיבויים
src_buf = BytesIO(r.content)
with _zip.ZipFile(src_buf, "r") as zin:
# ספר קבצים (דלג על תיקיות)
file_names = [n for n in zin.namelist() if not n.endswith("/")]
file_count = len(file_names)
total_bytes = len(r.content)
# צור ZIP חדש עם metadata
out_buf = BytesIO()
with _zip.ZipFile(out_buf, "w", compression=_zip.ZIP_DEFLATED) as zout:
metadata = {
"backup_id": f"backup_{user_id}_{int(_dt.now(_tz.utc).timestamp())}",
"user_id": user_id,
"created_at": _dt.now(_tz.utc).isoformat(),
"backup_type": "github_repo_zip",
"include_versions": False,
"file_count": file_count,
"created_by": "Code Keeper Bot",
"repo": repo.full_name,
"path": current_path or ""
}
zout.writestr("metadata.json", json.dumps(metadata, indent=2))
for name in file_names:
zout.writestr(name, zin.read(name))
out_buf.seek(0)
# שמור גיבוי (Mongo/FS בהתאם לקונפיג)
backup_manager.save_backup_bytes(out_buf.getvalue(), metadata)
# שלח למשתמש
# השתמש בשם ידידותי: BKP zip <repo> vN - DD/MM/YY
try:
infos = backup_manager.list_backups(user_id)
vcount = len([b for b in infos if getattr(b, 'repo', None) == repo.full_name])
except Exception:
vcount = 1
date_str = _dt.now(_tz.utc).strftime('%d-%m-%y %H.%M')
filename = f"BKP zip {repo.name} v{vcount} - {date_str}.zip"
out_buf.name = filename
caption = f"📦 ריפו מלא — {format_bytes(total_bytes)}.\n💾 נשמר ברשימת הגיבויים."
await query.message.reply_document(
document=out_buf, filename=filename, caption=caption
)
# הצג שורת סיכום בסגנון המבוקש ואז בקש תיוג
try:
backup_id = metadata.get("backup_id")
date_str = _dt.now(_tz.utc).strftime('%d/%m/%y %H:%M')
try:
# חשב גרסת גיבוי (מספר רצים לאותו ריפו)
infos = backup_manager.list_backups(user_id)
vcount = len([b for b in infos if getattr(b, 'repo', None) == repo.full_name])
v_text = f"(v{vcount}) " if vcount else ""
except Exception:
v_text = ""
summary_line = f"⬇️ backup zip {repo.name} – {date_str} – {v_text}{format_bytes(total_bytes)}"
try:
from database import db as _db
existing_note = _db.get_backup_note(user_id, backup_id) or ""
except Exception:
existing_note = ""
note_btn_text = "📝 ערוך הערה" if existing_note else "📝 הוסף הערה"
kb = [
[InlineKeyboardButton("🏆 מצוין", callback_data=f"backup_rate:{backup_id}:excellent")],
[InlineKeyboardButton("👍 טוב", callback_data=f"backup_rate:{backup_id}:good")],
[InlineKeyboardButton("🤷 סביר", callback_data=f"backup_rate:{backup_id}:ok")],
[InlineKeyboardButton(note_btn_text, callback_data=f"backup_add_note:{backup_id}")],
]
msg = await query.message.reply_text(summary_line, reply_markup=InlineKeyboardMarkup(kb))
try:
s = context.user_data.setdefault("backup_summaries", {})
s[backup_id] = {"chat_id": msg.chat.id, "message_id": msg.message_id, "text": summary_line}
except Exception:
pass
# Rating buttons already attached above; no need to call external handler
except Exception:
pass
except Exception as e:
logger.error(f"Error fetching repo zipball: {e}")
try:
await query.edit_message_text(f"❌ שגיאה בהורדת ZIP של הריפו: {e}")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
# לאחר יצירת והורדת ה‑ZIP, הצג את רשימת הגיבויים עבור הריפו הנוכחי
try:
backup_handler = context.bot_data.get('backup_handler')
if backup_handler is None:
from backup_menu_handler import BackupMenuHandler
backup_handler = BackupMenuHandler()
context.bot_data['backup_handler'] = backup_handler
# הגדר הקשר חזרה לסאב־תפריט GitHub וגבילת הרשימה לריפו הנוכחי
try:
context.user_data['zip_back_to'] = 'github'
context.user_data['github_backup_context_repo'] = repo.full_name
context.user_data['backup_highlight_id'] = metadata.get('backup_id')
except Exception:
pass
await backup_handler._show_backups_list(update, context, page=1)
except Exception as br:
try:
await self.show_github_backup_menu(update, context)
except BadRequest as br2:
if "message is not modified" not in str(br2).lower():
raise
return
zip_buffer = BytesIO()
total_bytes = 0
total_files = 0
skipped_large = 0
with zipfile.ZipFile(zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
# קבע שם תיקיית השורש בתוך ה-ZIP
zip_root = repo.name if not current_path else current_path.split("/")[-1]
async def add_path_to_zip(path: str, rel_prefix: str):
# קבל את התוכן עבור הנתיב
contents = repo.get_contents(path or "")
if not isinstance(contents, list):
contents = [contents]
for item in contents:
if item.type == "dir":
await self.apply_rate_limit_delay(user_id)
await add_path_to_zip(item.path, f"{rel_prefix}{item.name}/")
elif item.type == "file":
await self.apply_rate_limit_delay(user_id)
file_obj = repo.get_contents(item.path)
file_size = getattr(file_obj, "size", 0) or 0
nonlocal total_bytes, total_files, skipped_large
if file_size > MAX_INLINE_FILE_BYTES:
skipped_large += 1
continue
if total_files >= MAX_ZIP_FILES:
continue
if total_bytes + file_size > MAX_ZIP_TOTAL_BYTES:
continue
data = file_obj.decoded_content
arcname = f"{zip_root}/{rel_prefix}{item.name}"
zipf.writestr(arcname, data)
total_bytes += len(data)
total_files += 1
await add_path_to_zip(current_path, "")
# הוסף metadata.json
metadata = {
"backup_id": f"backup_{user_id}_{int(datetime.now(timezone.utc).timestamp())}",
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"backup_type": "github_repo_zip",
"include_versions": False,
"file_count": total_files,
"created_by": "Code Keeper Bot",
"repo": repo.full_name,
"path": current_path or ""
}
with zipfile.ZipFile(zip_buffer, 'a', compression=zipfile.ZIP_DEFLATED) as zipf:
zipf.writestr("metadata.json", json.dumps(metadata, indent=2))
zip_buffer.seek(0)
# שם ידידותי ל-folder/repo
try:
infos = backup_manager.list_backups(user_id)
vcount = len([b for b in infos if getattr(b, 'repo', None) == repo.full_name])
except Exception:
vcount = 1
date_str = datetime.now(timezone.utc).strftime('%d-%m-%y %H.%M')
name_part = repo.name if not current_path else current_path.split('/')[-1]
filename = f"BKP zip {name_part} v{vcount} - {date_str}.zip"
zip_buffer.name = filename
caption = (
f"📦 קובץ ZIP לתיקייה: /{current_path or ''}\n"
f"מכיל {total_files} קבצים, {format_bytes(total_bytes)}.\n"
f"💾 נשמר ברשימת הגיבויים."
)
if skipped_large:
caption += f"\n⚠️ דילג על {skipped_large} קבצים גדולים (> {format_bytes(MAX_INLINE_FILE_BYTES)})."
# שמור גיבוי (Mongo/FS בהתאם לקונפיג)
try:
backup_manager.save_backup_bytes(zip_buffer.getvalue(), metadata)
except Exception as e:
logger.warning(f"Failed to persist GitHub ZIP: {e}")
await query.message.reply_document(
document=zip_buffer, filename=filename, caption=caption
)
# הצג שורת סיכום בסגנון המבוקש ואז בקש תיוג
try:
backup_id = metadata.get("backup_id")
date_str = datetime.now(timezone.utc).strftime('%d/%m/%y %H:%M')
try:
infos = backup_manager.list_backups(user_id)
vcount = len([b for b in infos if getattr(b, 'repo', None) == repo.full_name])
v_text = f"(v{vcount}) " if vcount else ""
except Exception:
v_text = ""
summary_line = f"⬇️ backup zip {repo.name} – {date_str} – {v_text}{format_bytes(total_bytes)}"
try:
from database import db as _db
existing_note = _db.get_backup_note(user_id, backup_id) or ""
except Exception:
existing_note = ""
note_btn_text = "📝 ערוך הערה" if existing_note else "📝 הוסף הערה"
kb = [
[InlineKeyboardButton("🏆 מצוין", callback_data=f"backup_rate:{backup_id}:excellent")],
[InlineKeyboardButton("👍 טוב", callback_data=f"backup_rate:{backup_id}:good")],
[InlineKeyboardButton("🤷 סביר", callback_data=f"backup_rate:{backup_id}:ok")],
[InlineKeyboardButton(note_btn_text, callback_data=f"backup_add_note:{backup_id}")],
]
msg = await query.message.reply_text(summary_line, reply_markup=InlineKeyboardMarkup(kb))
try:
s = context.user_data.setdefault("backup_summaries", {})
s[backup_id] = {"chat_id": msg.chat.id, "message_id": msg.message_id, "text": summary_line}
except Exception:
pass
# Rating buttons already attached above; no need to call external handler
except Exception:
pass
except Exception as e:
logger.error(f"Error creating ZIP: {e}")
try:
await query.edit_message_text(f"❌ שגיאה בהכנת ZIP: {e}")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
return
# החזר לדפדפן באותו מקום
# לאחר יצירת והורדת ה‑ZIP, הצג את רשימת הגיבויים עבור הריפו הנוכחי
try:
backup_handler = context.bot_data.get('backup_handler')
if backup_handler is None:
from backup_menu_handler import BackupMenuHandler
backup_handler = BackupMenuHandler()
context.bot_data['backup_handler'] = backup_handler
try:
context.user_data['zip_back_to'] = 'github'
context.user_data['github_backup_context_repo'] = repo.full_name
context.user_data['backup_highlight_id'] = metadata.get('backup_id')
except Exception:
pass
await backup_handler._show_backups_list(update, context, page=1)
except Exception as br:
try:
await self.show_repo_browser(update, context)
except BadRequest as br2:
if "message is not modified" not in str(br2).lower():
raise
elif query.data.startswith("inline_download_file:"):
# הורדת קובץ שנבחר דרך אינליין
path = query.data.split(":", 1)[1]
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים (בחר ריפו עם /github)")
return
try:
g = Github(token)
repo = g.get_repo(repo_name)
contents = repo.get_contents(path)
size = getattr(contents, "size", 0) or 0
if size and size > MAX_INLINE_FILE_BYTES:
download_url = getattr(contents, "download_url", None)
if download_url:
await query.message.reply_text(
f'⚠️ הקובץ גדול ({format_bytes(size)}). להורדה: <a href="{download_url}">קישור ישיר</a>',
parse_mode="HTML",
)
else:
await query.message.reply_text(
f"⚠️ הקובץ גדול ({format_bytes(size)}) ולא ניתן להורידו ישירות כרגע."
)
else:
data = contents.decoded_content
filename = os.path.basename(contents.path) or "downloaded_file"
await query.message.reply_document(document=BytesIO(data), filename=filename)
except Exception as e:
logger.error(f"Inline download error: {e}")
await query.message.reply_text(f"❌ שגיאה בהורדה: {e}")
return
elif query.data.startswith("browse_page:"):
# מעבר עמודים בדפדפן הריפו
try:
page_index = int(query.data.split(":", 1)[1])
except ValueError:
page_index = 0
context.user_data["browse_page"] = max(0, page_index)
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data == "multi_toggle":
# הפעל/בטל מצב בחירה מרובה
current = context.user_data.get("multi_mode", False)
context.user_data["multi_mode"] = not current
if not context.user_data["multi_mode"]:
context.user_data["multi_selection"] = []
try:
await query.answer("מצב בחירה מרובה בוטל", show_alert=False)
except Exception:
pass
else:
try:
await query.answer("מצב בחירה מרובה הופעל — סמן קבצים מהרשימה", show_alert=False)
except Exception:
pass
context.user_data["browse_page"] = 0
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data.startswith("browse_toggle_select:"):
# הוסף/הסר בחירה של קובץ
path = query.data.split(":", 1)[1]
selection = set(context.user_data.get("multi_selection", []))
if path in selection:
selection.remove(path)
else:
selection.add(path)
context.user_data["multi_selection"] = list(selection)
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data == "multi_clear":
# נקה בחירות
context.user_data["multi_selection"] = []
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data == "safe_toggle":
# החלף מצב מחיקה בטוחה
new_state = not context.user_data.get("safe_delete", True)
context.user_data["safe_delete"] = new_state
try:
await query.answer("מחיקה בטוחה " + ("פעילה (PR)" if new_state else "כבויה — מוחק ישירות"), show_alert=False)
except Exception:
pass
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data == "multi_execute":
# בצע פעולה על הבחירה (ZIP בהורדה | מחיקה במצב מחיקה)
selection = list(dict.fromkeys(context.user_data.get("multi_selection", [])))
if not selection:
await query.answer("לא נבחרו קבצים", show_alert=True)
return
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
g = Github(token)
repo = g.get_repo(repo_name)
action = context.user_data.get("browse_action")
if action == "download":
# ארוז את הבחירה ל-ZIP
try:
zip_buffer = BytesIO()
total_bytes = 0
total_files = 0
skipped_large = 0
with zipfile.ZipFile(zip_buffer, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
for path in selection:
await self.apply_rate_limit_delay(user_id)
try:
file_obj = repo.get_contents(path)
if getattr(file_obj, "type", "file") != "file":
continue
file_size = getattr(file_obj, "size", 0) or 0
if file_size > MAX_INLINE_FILE_BYTES:
skipped_large += 1
continue
if total_files >= MAX_ZIP_FILES:
continue
if total_bytes + file_size > MAX_ZIP_TOTAL_BYTES:
continue
data = file_obj.decoded_content
arcname = file_obj.path # שמור מבנה נתיב
zipf.writestr(arcname, data)
total_bytes += len(data)
total_files += 1
except Exception:
continue
if total_files == 0:
await query.answer("אין קבצים מתאימים לאריזה", show_alert=True)
else:
zip_buffer.seek(0)
filename = f"{repo.name}-selected.zip"
caption = f"📦 ZIP לקבצים נבחרים — {total_files} קבצים, {format_bytes(total_bytes)}."
if skipped_large:
caption += f"\n⚠️ דילג על {skipped_large} קבצים גדולים (> {format_bytes(MAX_INLINE_FILE_BYTES)})."
await query.message.reply_document(
document=zip_buffer, filename=filename, caption=caption
)
except Exception as e:
logger.error(f"Multi ZIP error: {e}")
await query.edit_message_text(f"❌ שגיאה באריזת ZIP: {e}")
return
finally:
# לאחר פעולה, שמור בדפדפן
pass
# השאר בדפדפן
await self.show_repo_browser(update, context)
else:
# מחיקה של נבחרים
safe_delete = context.user_data.get("safe_delete", True)
default_branch = repo.default_branch or "main"
successes = 0
failures = 0
pr_url = None
if safe_delete:
# צור סניף חדש ומחוק בו, ואז פתח PR
try:
base_ref = repo.get_git_ref(f"heads/{default_branch}")
new_branch = f"delete-bot-{int(time.time())}"
repo.create_git_ref(ref=f"refs/heads/{new_branch}", sha=base_ref.object.sha)
for path in selection:
await self.apply_rate_limit_delay(user_id)
try:
contents = repo.get_contents(path, ref=new_branch)
repo.delete_file(
contents.path,
f"Delete via bot: {path}",
contents.sha,
branch=new_branch,
)
successes += 1
except Exception:
failures += 1
pr = repo.create_pull(
title=f"Delete {successes} files via bot",
body="Automated deletion",
base=default_branch,
head=new_branch,
)
pr_url = pr.html_url
except Exception as e:
logger.error(f"Safe delete failed: {e}")
await query.edit_message_text(f"❌ שגיאה במחיקה בטוחה: {e}")
return
else:
# מחיקה ישירה בבראנץ' ברירת המחדל
for path in selection:
await self.apply_rate_limit_delay(user_id)
try:
contents = repo.get_contents(path)
repo.delete_file(
contents.path,
f"Delete via bot: {path}",
contents.sha,
branch=default_branch,
)
successes += 1
except Exception as e:
logger.error(f"Delete file failed: {e}")
failures += 1
# סכם והצג
summary = f"✅ נמחקו {successes} | ❌ נכשלו {failures}"
if pr_url:
summary += f'\n🔗 נפתח PR: <a href="{pr_url}">קישור</a>'
try:
await query.message.reply_text(summary, parse_mode="HTML")
except Exception:
pass
# אפס מצב מרובה וחזור לתפריט הדפדפן
context.user_data["multi_mode"] = False
context.user_data["multi_selection"] = []
await self.show_repo_browser(update, context)
elif query.data.startswith("share_folder_link:"):
# שיתוף קישור לתיקייה
path = query.data.split(":", 1)[1]
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.answer("❌ חסרים נתונים", show_alert=True)
return
g = Github(token)
repo = g.get_repo(repo_name)
branch = repo.default_branch or "main"
clean_path = (path or "").strip("/")
url = (
f"https://github.com/{repo.full_name}/tree/{branch}/{clean_path}"
if clean_path
else f"https://github.com/{repo.full_name}/tree/{branch}"
)
try:
await query.message.reply_text(f"🔗 קישור לתיקייה:\n{url}")
except Exception:
await query.answer("הקישור נשלח בהודעה חדשה")
# הישאר בדפדפן
await self.show_repo_browser(update, context)
elif query.data == "share_selected_links":
# שיתוף קישורים לקבצים נבחרים
selection = list(dict.fromkeys(context.user_data.get("multi_selection", [])))
if not selection:
await query.answer("לא נבחרו קבצים", show_alert=True)
return
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.answer("❌ חסרים נתונים", show_alert=True)
return
g = Github(token)
repo = g.get_repo(repo_name)
branch = repo.default_branch or "main"
lines = []
for p in selection[:50]:
# guard: ensure string before strip
clean = str(p).strip("/")
url = f"https://github.com/{repo.full_name}/blob/{branch}/{clean}"
lines.append(f"• {clean}: {url}")
text = "🔗 קישורים לקבצים נבחרים:\n" + "\n".join(lines)
try:
await query.message.reply_text(text)
except Exception as e:
logger.error(f"share_selected_links error: {e}")
await query.answer("שגיאה בשיתוף קישורים", show_alert=True)
# השאר בדפדפן
await self.show_repo_browser(update, context)
elif query.data.startswith("share_selected_links_single:"):
# שיתוף קישור לקובץ יחיד מתצוגה רגילה
path = query.data.split(":", 1)[1]
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.answer("❌ חסרים נתונים", show_alert=True)
return
g = Github(token)
repo = g.get_repo(repo_name)
branch = repo.default_branch or "main"
clean = str(path).strip("/")
url = f"https://github.com/{repo.full_name}/blob/{branch}/{clean}"
try:
await query.message.reply_text(f"🔗 קישור לקובץ:\n{url}")
except Exception as e:
logger.error(f"share_single_link error: {e}")
await query.answer("שגיאה בשיתוף קישור", show_alert=True)
await self.show_repo_browser(update, context, only_keyboard=True)
elif query.data == "notifications_menu":
await self.show_notifications_menu(update, context)
elif query.data == "notifications_toggle":
await self.toggle_notifications(update, context)
elif query.data == "notifications_toggle_pr":
await self.toggle_notifications_pr(update, context)
elif query.data == "notifications_toggle_issues":
await self.toggle_notifications_issues(update, context)
elif query.data.startswith("notifications_interval_"):
await self.set_notifications_interval(update, context)
elif query.data == "notifications_check_now":
await self.notifications_check_now(update, context)
elif query.data == "pr_menu":
await self.show_pr_menu(update, context)
elif query.data == "create_pr_menu":
context.user_data["pr_branches_page"] = 0
await self.show_create_pr_menu(update, context)
elif query.data.startswith("branches_page_"):
try:
p = int(query.data.split("_")[-1])
except Exception:
p = 0
context.user_data["pr_branches_page"] = max(0, p)
await self.show_create_pr_menu(update, context)
elif query.data.startswith("pr_select_head:"):
head = query.data.split(":", 1)[1]
context.user_data["pr_head"] = head
await self.show_confirm_create_pr(update, context)
elif query.data == "confirm_create_pr":
await self.confirm_create_pr(update, context)
elif query.data == "merge_pr_menu":
context.user_data["pr_list_page"] = 0
await self.show_merge_pr_menu(update, context)
elif query.data.startswith("prs_page_"):
try:
p = int(query.data.split("_")[-1])
except Exception:
p = 0
context.user_data["pr_list_page"] = max(0, p)
await self.show_merge_pr_menu(update, context)
elif query.data.startswith("merge_pr:"):
pr_number = int(query.data.split(":", 1)[1])
context.user_data["pr_to_merge"] = pr_number
await self.show_confirm_merge_pr(update, context)
elif query.data == "refresh_merge_pr":
await self.show_confirm_merge_pr(update, context)
elif query.data == "confirm_merge_pr":
await self.confirm_merge_pr(update, context)
elif query.data == "validate_repo":
status_message = None
done_event = asyncio.Event()
progress_task = None
try:
status_message = await query.edit_message_text("⏳ בודק תקינות הריפו... 0%")
async def _progress_updater():
percent = 0
try:
while not done_event.is_set():
percent = min(percent + 7, 90)
try:
await status_message.edit_text(f"⏳ בודק תקינות הריפו... {percent}%")
except Exception:
pass
await asyncio.sleep(1.0)
except Exception:
pass
progress_task = asyncio.create_task(_progress_updater())
import tempfile, requests, zipfile
token_opt = self.get_user_token(user_id)
g = Github(login_or_token=(token_opt or ""))
repo_full = session.get("selected_repo")
if not repo_full:
done_event.set()
if progress_task:
try:
await progress_task
except Exception:
pass
await (status_message.edit_text("❌ קודם בחר ריפו!") if status_message else query.edit_message_text("❌ קודם בחר ריפו!"))
return
def do_validate():
repo = g.get_repo(repo_full)
url = repo.get_archive_link("zipball")
with tempfile.TemporaryDirectory(prefix="repo_val_") as tmp:
zip_path = os.path.join(tmp, "repo.zip")
r = requests.get(url, timeout=60)
r.raise_for_status()
with open(zip_path, "wb") as f:
f.write(r.content)
extract_dir = os.path.join(tmp, "repo")
os.makedirs(extract_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
# GitHub zip יוצר תיקיית-שורש יחידה
entries = [os.path.join(extract_dir, d) for d in os.listdir(extract_dir)]
root = next((p for p in entries if os.path.isdir(p)), extract_dir)
# העתק קבצי קונפיג אם יש
try:
for name in (".flake8", "pyproject.toml", "mypy.ini", "bandit.yaml"):
src = os.path.join(os.getcwd(), name)
dst = os.path.join(root, name)
if os.path.isfile(src) and not os.path.isfile(dst):
with open(src, "rb") as s, open(dst, "wb") as d:
d.write(s.read())
except Exception:
pass
# הרצת כלים על כל הריפו
def _run(cmd, timeout=60):
import subprocess
try:
cp = subprocess.run(cmd, cwd=root, capture_output=True, text=True, timeout=timeout)
out = (cp.stdout or "") + (cp.stderr or "")
return cp.returncode, out.strip()
except subprocess.TimeoutExpired:
return 124, "Timeout"
except FileNotFoundError:
return 127, "Tool not installed"
except Exception as e:
return 1, str(e)
# העדפת כלים מה-venv המקומי אם קיים
venv_bin = os.path.join(os.getcwd(), ".venv", "bin")
venv_python = os.path.join(venv_bin, "python")
def _resolve_tool_candidates(tool_name):
candidates = []
if os.path.isdir(venv_bin):
candidates.append(os.path.join(venv_bin, tool_name))
if os.path.isfile(venv_python):
candidates.append([venv_python, "-m", tool_name])
candidates.append(tool_name)
return candidates
def _run_any(tool_name, base_args, timeout=60):
for candidate in _resolve_tool_candidates(tool_name):
cmd = (candidate if isinstance(candidate, list) else [candidate]) + base_args
rc, out = _run(cmd, timeout=timeout)
# אם הכלי לא נמצא, נסה מועמד הבא
if rc == 127:
continue
return rc, out
return 127, "Tool not installed"
results = {}
results["flake8"] = _run_any("flake8", ["."])
results["mypy"] = _run_any("mypy", ["."])
results["bandit"] = _run_any("bandit", ["-q", "-r", "."])
results["black"] = _run_any("black", ["--check", "."])
return results, repo_full
# הריץ ברקע כדי לא לחסום את לולאת האירועים
results, repo_name_for_msg = await asyncio.to_thread(do_validate)
done_event.set()
if progress_task:
try:
await progress_task
except Exception:
pass
# פורמט תוצאות מעוצב
def status_label(rc):
return "OK" if rc == 0 else ("MISSING" if rc == 127 else ("TIMEOUT" if rc == 124 else "FAIL"))
def status_emoji(rc):
return "✅" if rc == 0 else ("⛔" if rc == 127 else ("⏱️" if rc == 124 else "❌"))
# תרגום סטטוסים לעברית להצגה
he_label = {"OK": "תקין", "FAIL": "נכשל", "TIMEOUT": "פג זמן", "MISSING": "לא מותקן"}
counts = {"OK": 0, "FAIL": 0, "TIMEOUT": 0, "MISSING": 0}
max_tool_len = max((len(t) for t in results.keys()), default=0)
rows = []
for tool, (rc, output) in results.items():
label = status_label(rc)
counts[label] += 1
first_line = (output.splitlines() or [""])[0][:120]
suffix = f" — {escape(first_line)}" if label != "OK" and first_line else ""
rows.append(f"{tool.ljust(max_tool_len)} | {status_emoji(rc)} {he_label.get(label, label)}{suffix}")
header = f"🧪 בדיקות מתקדמות לריפו <code>{safe_html_escape(repo_name_for_msg)}</code>\n"
summary = f"סיכום: ✅ {counts['OK']} ❌ {counts['FAIL']} ⏱️ {counts['TIMEOUT']} ⛔ {counts['MISSING']}"
body = "\n".join(rows)
# יצירת הצעות ממוקדות
suggestions: list[str] = []
# flake8 – הצעה להסרת ייבוא שלא בשימוש
rc_flake8, out_flake8 = results.get("flake8", (0, ""))
if rc_flake8 != 0 and out_flake8:
import re as _re
m = _re.search(r"^(?P<file>[^:\n]+):(?P<line>\d+):\d+:\s*F401\s+'([^']+)'\s+imported but unused", out_flake8, _re.M)
if m:
file_p = safe_html_escape(m.group("file"))
line_p = safe_html_escape(m.group("line"))
# לא תמיד אפשר לשלוף את השם בבטחה בטלגרם – משאירים כללי
suggestions.append(f"<b>flake8</b>: הסר ייבוא שלא בשימוש בשורה {line_p} בקובץ <code>{file_p}</code>")
# mypy – הצעה ל-Optional כאשר ברירת מחדל None לסוג לא-Optional
rc_mypy, out_mypy = results.get("mypy", (0, ""))
if rc_mypy != 0 and out_mypy:
import re as _re
m = _re.search(r"Incompatible default for argument \"(?P<arg>[^\"]+)\" \(default has type \"None\", argument has type \"(?P<typ>[^\"]+)\"", out_mypy)
if m:
arg_p = safe_html_escape(m.group("arg"))
typ_p = safe_html_escape(m.group("typ"))
suggestions.append(f"<b>mypy</b>: הגדר Optional[{typ_p}] לפרמטר <code>{arg_p}</code> או שנה את ברירת המחדל מ-None")
# black – הצעה להריץ black על קבצים ספציפיים
rc_black, out_black = results.get("black", (0, ""))
if rc_black != 0 and out_black:
import re as _re
files = _re.findall(r"would reformat\s+(.+)", out_black)
if files:
raw_path = files[0]
# נסה לקצר מסלול זמני של zip לנתיב יחסי בתוך הריפו
try:
_m = _re.search(r".*/repo/[^/]+/(.+)$", raw_path)
short_path = _m.group(1) if _m else raw_path
except Exception:
short_path = raw_path
file1 = safe_html_escape(short_path)
suggestions.append(f"<b>black</b>: הרץ black על <code>{file1}</code> או על הפרויקט כולו ליישור פורמט")
# bandit – הצעות כלליות בהתאם לדפוסים נפוצים
rc_bandit, out_bandit = results.get("bandit", (0, ""))
if rc_bandit != 0 and out_bandit:
if "eval(" in out_bandit or "B307" in out_bandit:
suggestions.append("<b>bandit</b>: החלף שימוש ב-eval בפתרון בטוח יותר (למשל ast.literal_eval)")
elif "exec(" in out_bandit or "B102" in out_bandit:
suggestions.append("<b>bandit</b>: הימנע מ-exec והשתמש באלטרנטיבות בטוחות")
message = f"{header}{summary}\n<pre>{body}</pre>"
if suggestions:
# שימור תגיות HTML בתוך ההצעות תוך בריחה של תוכן דינמי נעשה כבר בשלב בניית ההצעות
sug_text = "\n".join(f"• {s}" for s in suggestions[:4])
message += f"\n\n💡 הצעות ממוקדות:\n{sug_text}"
# הוסף כפתור חזרה לתפריט GitHub
kb = [[InlineKeyboardButton("🔙 חזרה לתפריט GitHub", callback_data="github_menu")]]
await query.edit_message_text(message, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb))
except Exception as e:
# ודא סגירת עדכון התקדמות גם בשגיאה
try:
done_event.set()
if progress_task:
try:
await progress_task
except Exception:
pass
except Exception:
pass
logger.exception("Repo validation failed")
await query.edit_message_text(f"❌ שגיאה בבדיקת הריפו: {safe_html_escape(e)}", parse_mode="HTML")
[תיעוד]
async def show_repo_selection(self, query, context: ContextTypes.DEFAULT_TYPE):
"""Show repository selection menu"""
await self.show_repos(query.message, context, query=query)
[תיעוד]
async def show_repos(
self, update: Update, context: ContextTypes.DEFAULT_TYPE, page: int = 0, query=None
):
"""מציג רשימת ריפוזיטוריז עם pagination"""
if query:
user_id = query.from_user.id
else:
user_id = update.effective_user.id
session = self.user_sessions.get(user_id, {})
if not self.get_user_token(user_id):
if query:
await query.answer("❌ נא להגדיר טוקן קודם")
else:
await update.reply_text("❌ נא להגדיר טוקן קודם")
return
try:
# בדוק אם יש repos ב-context.user_data ואם הם עדיין תקפים
cache_time = context.user_data.get("repos_cache_time", 0)
current_time = time.time()
cache_age = current_time - cache_time
cache_max_age = 3600 # שעה אחת
needs_refresh = "repos" not in context.user_data or cache_age > cache_max_age
if needs_refresh:
logger.info(
f"[GitHub API] Fetching repos for user {user_id} (cache age: {int(cache_age)}s)"
)
# אם אין cache או שהוא ישן, בצע בקשה ל-API
_tok = self.get_user_token(user_id)
g = Github(login_or_token=(_tok or ""))
# בדוק rate limit לפני הבקשה
rate = g.get_rate_limit()
logger.info(
f"[GitHub API] Rate limit - Remaining: {rate.core.remaining}/{rate.core.limit}"
)
if rate.core.remaining < 100:
logger.warning(
f"[GitHub API] Low on API calls! Only {rate.core.remaining} remaining"
)
if rate.core.remaining < 10:
# אם יש cache ישן, השתמש בו במקום לחסום
if "repos" in context.user_data:
logger.warning(f"[GitHub API] Using stale cache due to rate limit")
all_repos = context.user_data["repos"]
else:
if query:
await query.answer(
f"⏳ מגבלת API נמוכה! נותרו רק {rate.core.remaining} בקשות",
show_alert=True,
)
return
else:
# הוסף delay בין בקשות
await self.apply_rate_limit_delay(user_id)
user = g.get_user()
logger.info(f"[GitHub API] Getting repos for user: {user.login}")
# קבל את כל הריפוזיטוריז - טען רק פעם אחת!
context.user_data["repos"] = list(user.get_repos())
context.user_data["repos_cache_time"] = current_time
logger.info(
f"[GitHub API] Loaded {len(context.user_data['repos'])} repos into cache"
)
all_repos = context.user_data["repos"]
else:
logger.info(
f"[Cache] Using cached repos for user {user_id} - {len(context.user_data.get('repos', []))} repos (age: {int(cache_age)}s)"
)
all_repos = context.user_data["repos"]
# הגדרות pagination
repos_per_page = 8
total_repos = len(all_repos)
total_pages = (total_repos + repos_per_page - 1) // repos_per_page
# חשב אינדקסים
start_idx = page * repos_per_page
end_idx = min(start_idx + repos_per_page, total_repos)
# ריפוזיטוריז לעמוד הנוכחי
page_repos = all_repos[start_idx:end_idx]
keyboard = []
# הוסף ריפוזיטוריז
for repo in page_repos:
keyboard.append(
[
InlineKeyboardButton(
f"📁 {repo.name}", callback_data=f"repo_{repo.full_name}"
)
]
)
# כפתורי ניווט
nav_buttons = []
if page > 0:
nav_buttons.append(
InlineKeyboardButton("⬅️ הקודם", callback_data=f"repos_page_{page-1}")
)
nav_buttons.append(
InlineKeyboardButton(f"📄 {page+1}/{total_pages}", callback_data="noop")
)
if page < total_pages - 1:
nav_buttons.append(
InlineKeyboardButton("➡️ הבא", callback_data=f"repos_page_{page+1}")
)
if nav_buttons:
keyboard.append(nav_buttons)
# כפתורים נוספים
keyboard.append(
[InlineKeyboardButton("✍️ הקלד שם ריפו ידנית", callback_data="repo_manual")]
)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="back_to_menu")])
reply_markup = InlineKeyboardMarkup(keyboard)
if query:
await query.edit_message_text(
f"בחר ריפוזיטורי (עמוד {page+1} מתוך {total_pages}):", reply_markup=reply_markup
)
else:
try:
await update.callback_query.edit_message_text(
f"בחר ריפוזיטורי (עמוד {page+1} מתוך {total_pages}):", reply_markup=reply_markup
)
except Exception:
await update.message.reply_text(
f"בחר ריפוזיטורי (עמוד {page+1} מתוך {total_pages}):",
reply_markup=reply_markup,
)
except Exception as e:
error_msg = str(e)
# בדוק אם זו שגיאת rate limit
if "rate limit" in error_msg.lower() or "403" in error_msg:
error_msg = "⏳ חריגה ממגבלת GitHub API\n" "נסה שוב בעוד כמה דקות"
else:
error_msg = f"❌ שגיאה: {error_msg}"
if query:
await query.answer(error_msg, show_alert=True)
else:
try:
await update.callback_query.answer(error_msg, show_alert=True)
except Exception:
await update.message.reply_text(error_msg)
[תיעוד]
async def show_upload_other_files(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג רק קבצים שאינם מתויגים repo: ואינם קבצים גדולים, עם עימוד ואימוג'י לפי שפה."""
user_id = update.effective_user.id
from database import db
query = update.callback_query
try:
# קריאת נתונים
all_files = db.get_user_files(user_id, limit=1000)
large_files, _ = db.get_user_large_files(user_id, page=1, per_page=10000)
large_names = {lf.get('file_name') for lf in large_files if lf.get('file_name')}
other_files = []
for f in all_files:
name = f.get('file_name')
tags = f.get('tags') or []
if name and name not in large_names and not any(isinstance(t, str) and t.startswith('repo:') for t in tags):
other_files.append(f)
if not other_files:
await query.edit_message_text("ℹ️ אין 'שאר קבצים' להצגה (לא מתויגים כריפו ואינם גדולים)")
return
# מצב עמוד ובחירה
try:
page = int(context.user_data.get("other_files_page", 1))
except Exception:
page = 1
per_page = 20
total = len(other_files)
pages = max(1, (total + per_page - 1) // per_page)
if page > pages:
page = pages
context.user_data["other_files_page"] = page
start = (page - 1) * per_page
end = start + per_page
page_items = other_files[start:end]
# בניית מקלדת לבחירת קובץ יחיד להעלאה
keyboard = []
from utils import get_language_emoji, detect_language_from_filename
for f in page_items:
fid = str(f.get('_id'))
name = f.get('file_name', 'ללא שם')
lang = detect_language_from_filename(name)
emoji = get_language_emoji(lang)
keyboard.append([InlineKeyboardButton(f"{emoji} {name}", callback_data=f"upload_saved_{fid}")])
# ניווט עמודים
nav = []
if page > 1:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"other_files_page_{page-1}"))
if page < pages:
nav.append(InlineKeyboardButton("➡️ הבא", callback_data=f"other_files_page_{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="upload_file")])
await query.edit_message_text(
f"בחר/י קובץ להעלאה (שאר הקבצים) — עמוד {page}/{pages}:",
reply_markup=InlineKeyboardMarkup(keyboard)
)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת 'שאר הקבצים': {e}")
[תיעוד]
async def show_upload_repos(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט ריפואים לבחירת קבצים שמורים עם תגית repo: להעלאה"""
user_id = update.effective_user.id
from database import db
query = update.callback_query
try:
files = db.get_user_files(user_id, limit=1000)
repo_to_count = {}
for f in files:
for t in f.get('tags', []) or []:
if isinstance(t, str) and t.startswith('repo:'):
repo_to_count[t] = repo_to_count.get(t, 0) + 1
if not repo_to_count:
await query.edit_message_text("ℹ️ אין קבצים עם תגית ריפו (repo:owner/name)")
return
keyboard = []
for tag, cnt in sorted(repo_to_count.items(), key=lambda x: x[0])[:50]:
keyboard.append([InlineKeyboardButton(f"{tag} ({cnt})", callback_data=f"gh_upload_repo:{tag}")])
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="upload_file")])
await query.edit_message_text("בחר/י ריפו (מתוך תגיות הקבצים השמורים):", reply_markup=InlineKeyboardMarkup(keyboard))
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת רשימת ריפואים: {e}")
[תיעוד]
async def show_upload_repo_files(self, update: Update, context: ContextTypes.DEFAULT_TYPE,_repo_tag: str):
"""מציג קבצים שמורים תחת תגית ריפו שנבחרה ומאפשר להעלותם עם עימוד"""
user_id = update.effective_user.id
from database import db
query = update.callback_query
try:
repo_tag = _repo_tag
# עימוד: קרא מה-context או התחל בעמוד 1
try:
page = int((context.user_data.get("repo_files_page") or {}).get(repo_tag, 1))
except Exception:
page = 1
per_page = 50
files, total = db.get_user_files_by_repo(user_id, repo_tag, page=page, per_page=per_page)
if not files:
await query.edit_message_text("ℹ️ אין קבצים תחת התגית הזו")
return
pages = max(1, (total + per_page - 1) // per_page)
# בניית כפתורים
keyboard = []
for f in files:
fid = str(f.get('_id'))
name = f.get('file_name', 'ללא שם')
keyboard.append([InlineKeyboardButton(f"📄 {name}", callback_data=f"upload_saved_{fid}")])
nav = []
if page > 1:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"repo_files_page:{repo_tag}:{page-1}"))
if page < pages:
nav.append(InlineKeyboardButton("➡️ הבא", callback_data=f"repo_files_page:{repo_tag}:{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="gh_upload_cat:repos")])
await query.edit_message_text(
f"בחר/י קובץ להעלאה מהתגית {repo_tag} (עמוד {page}/{pages}, סך הכל {total}):",
reply_markup=InlineKeyboardMarkup(keyboard)
)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת קבצים: {e}")
[תיעוד]
async def upload_large_files_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג רשימת קבצים גדולים להעלאה לריפו הנבחר"""
user_id = update.effective_user.id
from database import db
query = update.callback_query
try:
large_files, total = db.get_user_large_files(user_id, page=1, per_page=50)
if not large_files:
await query.edit_message_text("ℹ️ אין קבצים גדולים שמורים")
return
keyboard = []
for lf in large_files:
fid = str(lf.get('_id'))
name = lf.get('file_name', 'ללא שם')
size_kb = (lf.get('file_size', 0) or 0) / 1024
keyboard.append([InlineKeyboardButton(f"📄 {name} ({size_kb:.0f}KB)", callback_data=f"gh_upload_large:{fid}")])
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="upload_file")])
await query.edit_message_text("בחר/י קובץ גדול להעלאה:", reply_markup=InlineKeyboardMarkup(keyboard))
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בטעינת קבצים גדולים: {e}")
[תיעוד]
async def handle_large_file_upload(self, update: Update, context: ContextTypes.DEFAULT_TYPE, file_id: str):
"""מעלה קובץ גדול שנבחר לגיטהאב (עם אותן בדיקות כמו קובץ שמור רגיל)"""
user_id = update.effective_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
query = update.callback_query
if not (session.get("selected_repo") and token):
await query.edit_message_text("❌ קודם בחר ריפו/טוקן בגיטהאב")
return
# שלוף את תוכן הקובץ הגדול
from database import db
from bson import ObjectId
doc = db.large_files_collection.find_one({"_id": ObjectId(file_id), "user_id": user_id})
if not doc:
await query.edit_message_text("❌ קובץ גדול לא נמצא")
return
# מאחדים עם זרימת show_pre_upload_check: נשתמש ב-pending_saved_file_id אחרי יצירת מסמך זמני
try:
# צור מסמך זמני בקולקשן הרגיל כדי למחזר את מסך הבדיקות
temp = {
"user_id": user_id,
"file_name": doc.get("file_name") or "large_file.txt",
"content": doc.get("content") or "",
}
res = db.collection.insert_one(temp)
context.user_data["pending_saved_file_id"] = str(res.inserted_id)
await self.show_pre_upload_check(update, context)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בהכנת קובץ גדול להעלאה: {e}")
[תיעוד]
async def handle_saved_file_upload(
self, update: Update, context: ContextTypes.DEFAULT_TYPE, file_id: str
):
"""מטפל בהעלאת קובץ שמור ל-GitHub"""
user_id = update.effective_user.id
session = self.get_user_session(user_id)
if not session.get("selected_repo"):
await update.callback_query.answer("❌ נא לבחור ריפו קודם")
return
try:
from bson import ObjectId
from database import db
# קבל את הקובץ מהמסד
file_data = db.collection.find_one({"_id": ObjectId(file_id), "user_id": user_id})
if not file_data:
await update.callback_query.answer("❌ קובץ לא נמצא", show_alert=True)
return
await update.callback_query.edit_message_text("⏳ מעלה קובץ ל-GitHub...")
# לוג פרטי הקובץ
logger.info(f"📄 מעלה קובץ שמור: {file_data['file_name']}")
# קבל את התוכן מהקובץ השמור
# בדוק כמה אפשרויות לשדה content
content = (
file_data.get("content")
or file_data.get("code")
or file_data.get("data")
or file_data.get("file_content", "")
)
if not content:
await update.callback_query.edit_message_text("❌ תוכן הקובץ ריק או לא נמצא")
return
# PyGithub מקודד אוטומטית ל-base64, אז רק נוודא שהתוכן הוא string
if isinstance(content, bytes):
content = content.decode("utf-8")
logger.info(f"✅ תוכן מוכן להעלאה, גודל: {len(content)} chars")
# התחבר ל-GitHub
token_opt = self.get_user_token(user_id)
g = Github(token_opt) if token_opt else Github(None)
token_opt = self.get_user_token(user_id)
g = Github(token_opt) if token_opt else Github(None)
token_opt = self.get_user_token(user_id)
g = Github(token_opt) if token_opt else Github(None)
# בדוק rate limit לפני הבקשה
logger.info(f"[GitHub API] Checking rate limit before uploading file")
rate = g.get_rate_limit()
logger.info(
f"[GitHub API] Rate limit - Remaining: {rate.core.remaining}/{rate.core.limit}"
)
if rate.core.remaining < 100:
logger.warning(
f"[GitHub API] Low on API calls! Only {rate.core.remaining} remaining"
)
if rate.core.remaining < 10:
await update.callback_query.answer(
f"⏳ מגבלת API נמוכה מדי! נותרו רק {rate.core.remaining} בקשות", show_alert=True
)
return
# הוסף delay בין בקשות
await self.apply_rate_limit_delay(user_id)
logger.info(f"[GitHub API] Getting repo: {session['selected_repo']}")
repo = g.get_repo(session["selected_repo"])
# Resolve target branch and folder
branch = context.user_data.get("upload_target_branch") or repo.default_branch or "main"
folder = context.user_data.get("upload_target_folder") or session.get("selected_folder")
if folder and folder.strip():
folder = folder.strip("/")
file_path = f"{folder}/{file_data['file_name']}"
else:
file_path = file_data["file_name"]
logger.info(f"📁 נתיב יעד: {file_path} (branch: {branch})")
# נסה להעלות או לעדכן את הקובץ
try:
logger.info(f"[GitHub API] Checking if file exists: {file_path} @ {branch}")
existing = repo.get_contents(file_path, ref=branch)
logger.info(f"[GitHub API] File exists, updating: {file_path}")
result = repo.update_file(
path=file_path,
message=f"Update {file_data['file_name']} via Telegram bot",
content=content, # PyGithub יקודד אוטומטית
sha=existing.sha,
branch=branch,
)
action = "עודכן"
logger.info(f"✅ קובץ עודכן בהצלחה")
except:
logger.info(f"[GitHub API] File doesn't exist, creating: {file_path}")
result = repo.create_file(
path=file_path,
message=f"Upload {file_data['file_name']} via Telegram bot",
content=content, # PyGithub יקודד אוטומטית
branch=branch,
)
action = "הועלה"
logger.info(f"[GitHub API] File created successfully: {file_path}")
raw_url = (
f"https://raw.githubusercontent.com/{session['selected_repo']}/{branch}/{file_path}"
)
await update.callback_query.edit_message_text(
f"✅ הקובץ {action} בהצלחה!\n\n"
f"📁 ריפו: <code>{session['selected_repo']}</code>\n"
f"📂 מיקום: <code>{file_path}</code>\n"
f"🔗 קישור ישיר:\n{raw_url}\n\n"
f"שלח /github כדי לחזור לתפריט.",
parse_mode="HTML",
)
except Exception as e:
logger.error(f"❌ שגיאה בהעלאת קובץ שמור: {str(e)}", exc_info=True)
error_msg = str(e)
# בדוק אם זו שגיאת rate limit
if "rate limit" in error_msg.lower() or "403" in error_msg:
error_msg = (
"⏳ חריגה ממגבלת GitHub API\n"
"נסה שוב בעוד כמה דקות\n\n"
"💡 טיפ: המתן מספר דקות לפני ניסיון נוסף"
)
else:
error_msg = f"❌ שגיאה בהעלאה:\n{error_msg}\n\nפרטים נוספים נשמרו בלוג."
await update.callback_query.edit_message_text(error_msg)
[תיעוד]
async def handle_file_upload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle file upload"""
user_id = update.message.from_user.id
session = self.get_user_session(user_id)
# בדוק אם אנחנו במצב העלאה לגיטהאב (תמיכה בשני המשתנים)
if context.user_data.get("waiting_for_upload_folder"):
# Capture folder path from user text and return to pre-upload check
folder_text = (update.message.text or "").strip()
# normalize: remove leading/trailing slashes
folder_norm = folder_text.strip("/")
context.user_data["upload_target_folder"] = folder_norm
context.user_data["waiting_for_upload_folder"] = False
await update.message.reply_text("✅ תיקיית יעד עודכנה. חוזר לבדיקות...")
await self.show_pre_upload_check(update, context)
return True
if (
context.user_data.get("waiting_for_github_upload")
or context.user_data.get("upload_mode") == "github"
):
# העלאה לגיטהאב
repo_name = context.user_data.get("target_repo") or session.get("selected_repo")
if not repo_name:
await update.message.reply_text("❌ קודם בחר ריפו!\nשלח /github")
return ConversationHandler.END
if update.message.document:
await update.message.reply_text("⏳ מעלה קובץ לגיטהאב...")
try:
file = await context.bot.get_file(update.message.document.file_id)
file_data = await file.download_as_bytearray()
filename = update.message.document.file_name
# לוג גודל וסוג הקובץ
file_size = len(file_data)
logger.info(f"📄 מעלה קובץ: {filename}, גודל: {file_size} bytes")
# PyGithub מקודד אוטומטית ל-base64, אז נמיר ל-string אם צריך
if isinstance(file_data, (bytes, bytearray)):
content = file_data.decode("utf-8")
else:
content = str(file_data)
logger.info(f"✅ תוכן מוכן להעלאה, גודל: {len(content)} chars")
token = self.get_user_token(user_id) or os.environ.get("GITHUB_TOKEN")
g = Github(login_or_token=(token or ""))
# בדוק rate limit לפני הבקשה
logger.info(f"[GitHub API] Checking rate limit before file upload")
rate = g.get_rate_limit()
logger.info(
f"[GitHub API] Rate limit - Remaining: {rate.core.remaining}/{rate.core.limit}"
)
if rate.core.remaining < 100:
logger.warning(
f"[GitHub API] Low on API calls! Only {rate.core.remaining} remaining"
)
if rate.core.remaining < 10:
await update.message.reply_text(
f"⏳ מגבלת API נמוכה מדי!\n"
f"נותרו רק {rate.core.remaining} בקשות\n"
f"נסה שוב מאוחר יותר"
)
return ConversationHandler.END
# הוסף delay בין בקשות
await self.apply_rate_limit_delay(user_id)
logger.info(f"[GitHub API] Getting repo: {repo_name}")
repo = g.get_repo(repo_name)
# בניית נתיב הקובץ
folder = (
context.user_data.get("upload_target_folder")
or context.user_data.get("target_folder")
or session.get("selected_folder")
)
if folder and folder.strip() and folder != "root":
# הסר / מיותרים
folder = folder.strip("/")
file_path = f"{folder}/{filename}"
else:
# העלה ל-root
file_path = filename
logger.info(f"📁 נתיב יעד: {file_path}")
try:
existing = repo.get_contents(file_path)
result = repo.update_file(
path=file_path,
message=f"Update {filename} via Telegram bot",
content=content, # PyGithub יקודד אוטומטית
sha=existing.sha,
)
action = "עודכן"
logger.info(f"✅ קובץ עודכן בהצלחה")
except:
result = repo.create_file(
path=file_path,
message=f"Upload {filename} via Telegram bot",
content=content, # PyGithub יקודד אוטומטית
)
action = "הועלה"
logger.info(f"✅ קובץ נוצר בהצלחה")
raw_url = f"https://raw.githubusercontent.com/{repo_name}/main/{file_path}"
await update.message.reply_text(
f"✅ הקובץ {action} בהצלחה לגיטהאב!\n\n"
f"📁 ריפו: <code>{repo_name}</code>\n"
f"📂 מיקום: <code>{file_path}</code>\n"
f"🔗 קישור ישיר:\n{raw_url}\n\n"
f"שלח /github כדי לחזור לתפריט.",
parse_mode="HTML",
)
# נקה את הסטטוס
context.user_data["waiting_for_github_upload"] = False
context.user_data["upload_mode"] = None
except Exception as e:
logger.error(f"❌ שגיאה בהעלאה: {str(e)}", exc_info=True)
error_msg = str(e)
# בדוק אם זו שגיאת rate limit
if "rate limit" in error_msg.lower() or "403" in error_msg:
error_msg = (
"⏳ חריגה ממגבלת GitHub API\n"
"נסה שוב בעוד כמה דקות\n\n"
"💡 טיפ: המתן מספר דקות לפני ניסיון נוסף"
)
else:
error_msg = f"❌ שגיאה בהעלאה:\n{error_msg}\n\nפרטים נוספים נשמרו בלוג."
await update.message.reply_text(error_msg)
else:
await update.message.reply_text("⚠️ שלח קובץ להעלאה")
return ConversationHandler.END
else:
# אם לא במצב העלאה לגיטהאב, תן למטפל הרגיל לטפל בזה
return ConversationHandler.END
[תיעוד]
async def handle_text_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle text input for various states"""
user_id = update.message.from_user.id
session = self.get_user_session(user_id)
text = update.message.text
logger.info(
f"📝 GitHub text input handler: user={user_id}, waiting_for_repo={context.user_data.get('waiting_for_repo_url')}"
)
# הנתיבים למחיקה/הורדה עוברים דרך דפדפן הכפתורים כעת, לכן אין צורך לטפל כאן
# הזן/בחר ריפו לניתוח
if context.user_data.get("waiting_for_repo_url"):
context.user_data["waiting_for_repo_url"] = False
await self.analyze_repository(update, context, text)
return True
# הזנת שם ריפו חדש לזרימת יצירה מּZIP
if context.user_data.get("waiting_for_new_repo_name"):
# נקה את מצב ההמתנה
context.user_data["waiting_for_new_repo_name"] = False
name_raw = (text or "").strip()
# סניטיזציה פשוטה: המרת רווחים למקף ואישור תווים מותרים
safe = re.sub(r"\s+", "-", name_raw)
safe = re.sub(r"[^A-Za-z0-9._-]", "-", safe)
safe = safe.strip(".-_")
if not safe:
await update.message.reply_text("❌ שם ריפו לא תקין. נסה שוב עם אותיות/מספרים/.-_ בלבד.")
context.user_data["waiting_for_new_repo_name"] = True
return True
# שמור את השם לבחירת יצירה
context.user_data["new_repo_name"] = safe
await update.message.reply_text(
f"✅ שם הריפו נקבע: <code>{safe}</code>\nשלח עכשיו קובץ ZIP לפריסה.",
parse_mode="HTML"
)
return True
# זרימת הדבקת קוד: שלב 1 - קבלת תוכן
if context.user_data.get("waiting_for_paste_content"):
context.user_data["waiting_for_paste_content"] = False
code_text = text or ""
if not code_text.strip():
context.user_data["waiting_for_paste_content"] = True
await update.message.reply_text(
"⚠️ קיבלתי תוכן ריק. הדבק/י את הקוד שוב.",
reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton("🔙 חזור", callback_data="upload_file"),
InlineKeyboardButton("❌ ביטול", callback_data="cancel_paste_flow"),
]
])
)
return True
context.user_data["paste_content"] = code_text
context.user_data["waiting_for_paste_filename"] = True
await update.message.reply_text(
"📄 איך לקרוא לקובץ?\nהקלד/י שם כולל סיומת (לדוגמה: app.py או index.ts).",
reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton("🔙 חזור", callback_data="upload_file"),
InlineKeyboardButton("❌ ביטול", callback_data="cancel_paste_flow"),
]
])
)
return True
# זרימת הדבקת קוד: שלב 2 - קבלת שם קובץ ופתיחת מסך הבדיקות
if context.user_data.get("waiting_for_paste_filename"):
context.user_data["waiting_for_paste_filename"] = False
raw_name = (text or "").strip()
# ולידציה בסיסית לשם קובץ
safe_name = raw_name.replace("\\", "/").split("/")[-1]
safe_name = re.sub(r"\s+", "_", safe_name)
safe_name = safe_name.strip()
if not safe_name or "." not in safe_name:
context.user_data["waiting_for_paste_filename"] = True
await update.message.reply_text(
"⚠️ שם קובץ לא תקין. ודא שם + סיומת, לדוגמה: main.py",
reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton("🔙 חזור", callback_data="upload_file"),
InlineKeyboardButton("❌ ביטול", callback_data="cancel_paste_flow"),
]
])
)
return True
if not session.get("selected_repo"):
await update.message.reply_text("❌ קודם בחר/י ריפו. שלח/י /github")
return True
content = context.user_data.get("paste_content") or ""
try:
from database import db
from datetime import datetime
doc = {
"user_id": user_id,
"file_name": safe_name,
"content": content,
"created_at": datetime.utcnow(),
"tags": ["pasted"],
}
res = db.collection.insert_one(doc)
context.user_data["pending_saved_file_id"] = str(res.inserted_id)
# נקה תוכן זמני
context.user_data.pop("paste_content", None)
await self.show_pre_upload_check(update, context)
except Exception as e:
await update.message.reply_text(f"❌ שגיאה בשמירת הקובץ הזמני: {safe_html_escape(str(e))}", parse_mode="HTML")
return True
# חיפוש בשם קובץ מתוך דפדפן הריפו
if context.user_data.get("browse_search_mode"):
context.user_data["browse_search_mode"] = False
query = (text or "").strip()
if not query:
await update.message.reply_text("❌ שאילתת חיפוש ריקה. נסה שוב דרך הכפתור.")
return True
context.user_data["browse_search_query"] = query
context.user_data["browse_search_page"] = 1
await self.show_browse_search_results(update, context)
return True
# בחירת תיקייה (מתוך "בחר תיקיית יעד" הכללי)
if context.user_data.get("waiting_for_selected_folder"):
context.user_data["waiting_for_selected_folder"] = False
folder_raw = (text or "").strip()
# Normalize: allow '/' or empty for root
if folder_raw in {"", "/"}:
session["selected_folder"] = None
await update.message.reply_text("✅ תיקיית יעד עודכנה ל-root")
else:
# clean slashes and collapse duplicates
folder_clean = re.sub(r"/+", "/", folder_raw.strip("/"))
session["selected_folder"] = folder_clean
await update.message.reply_text(
f"✅ תיקיית יעד עודכנה ל-<code>{safe_html_escape(folder_clean)}</code>",
parse_mode="HTML",
)
# חזרה לתפריט GitHub
await self.github_menu_command(update, context)
return True
# יצירת תיקייה חדשה (גם מהתפריט וגם מתוך בדיקות לפני העלאה)
if context.user_data.get("waiting_for_new_folder_path"):
context.user_data["waiting_for_new_folder_path"] = False
folder_raw = (text or "").strip()
if folder_raw in {"", "/"}:
await update.message.reply_text("❌ יש להזין נתיב תיקייה תקין (לדוגמה: src/new)")
return True
folder_clean = re.sub(r"/+", "/", folder_raw.strip("/"))
# צור קובץ .gitkeep בתיקייה החדשה כדי ליצור אותה בגיט
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
await update.message.reply_text("❌ חסר טוקן או ריפו לא נבחר")
return True
try:
g = Github(token)
repo = g.get_repo(repo_full)
target_branch = context.user_data.get("upload_target_branch") or getattr(repo, "default_branch", None) or "main"
file_path = f"{folder_clean}/.gitkeep"
content = "placeholder to keep directory"
# נסה ליצור, ואם קיים נעדכן
try:
existing = repo.get_contents(file_path, ref=target_branch)
repo.update_file(
path=file_path,
message=f"Update .gitkeep via bot in {folder_clean}",
content=content,
sha=existing.sha,
branch=target_branch,
)
except Exception:
repo.create_file(
path=file_path,
message=f"Create folder {folder_clean} via bot",
content=content,
branch=target_branch,
)
# אם נוצר מתוך זרימת ה-pre-upload, עדכן את תיקיית היעד וחזור לבדיקה
if context.user_data.get("return_to_pre_upload"):
context.user_data["return_to_pre_upload"] = False
context.user_data["upload_target_folder"] = folder_clean
await update.message.reply_text(
f"✅ התיקייה נוצרה: <code>{safe_html_escape(folder_clean)}</code>\nחוזר למסך הבדיקות…",
parse_mode="HTML",
)
await self.show_pre_upload_check(update, context)
else:
# אחרת, עדכן גם את התיקייה הנבחרת לשימוש עתידי וחזור לתפריט
session["selected_folder"] = folder_clean
await update.message.reply_text(
f"✅ התיקייה נוצרה ונבחרה: <code>{safe_html_escape(folder_clean)}</code>",
parse_mode="HTML",
)
await self.github_menu_command(update, context)
except Exception as e:
logger.error(f"Failed to create folder {folder_clean}: {e}", exc_info=True)
await update.message.reply_text(
f"❌ יצירת תיקייה נכשלה: {safe_html_escape(str(e))}",
parse_mode="HTML",
)
return True
# ברירת מחדל: סיים
return ConversationHandler.END
[תיעוד]
async def show_analyze_repo_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט לניתוח ריפו"""
logger.info("📋 Starting show_analyze_repo_menu function")
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
logger.info(
f"📊 Session data: selected_repo={session.get('selected_repo')}, has_token={bool(self.get_user_token(user_id))}"
)
# בדוק אם יש ריפו נבחר
if session.get("selected_repo"):
# אם יש ריפו נבחר, הצע לנתח אותו או לבחור אחר
keyboard = [
[
InlineKeyboardButton(
f"📊 נתח את {session['selected_repo']}",
callback_data="analyze_current_repo",
)
],
[InlineKeyboardButton("🔍 נתח ריפו אחר", callback_data="analyze_other_repo")],
[InlineKeyboardButton("🔙 חזור לתפריט", callback_data="github_menu")],
]
await query.edit_message_text(
"🔍 <b>ניתוח ריפוזיטורי</b>\n\n" "בחר אפשרות:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
else:
# אם אין ריפו נבחר, בקש URL
await self.request_repo_url(update, context)
[תיעוד]
async def request_repo_url(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מבקש URL של ריפו לניתוח"""
logger.info("📝 Requesting repository URL from user")
query = update.callback_query if update.callback_query else None
keyboard = [[InlineKeyboardButton("❌ ביטול", callback_data="github_menu")]]
message_text = (
"🔍 <b>ניתוח ריפוזיטורי</b>\n\n"
"שלח URL של ריפו ציבורי ב-GitHub:\n"
"לדוגמה: <code>https://github.com/owner/repo</code>\n\n"
"💡 הריפו חייב להיות ציבורי או שיש לך גישה אליו עם הטוקן"
)
if query:
await query.edit_message_text(
message_text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
else:
await update.message.reply_text(
message_text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
# סמן שאנחנו מחכים ל-URL
context.user_data["waiting_for_repo_url"] = True
[תיעוד]
async def analyze_another_repo(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט בחירה לניתוח ריפו אחר"""
query = update.callback_query
await query.answer()
# הצג כפתורים לבחירה
keyboard = [
[InlineKeyboardButton("📁 בחר מהריפוזיטורים שלי", callback_data="choose_my_repo")],
[InlineKeyboardButton("🔗 הכנס URL של ריפו ציבורי", callback_data="enter_repo_url")],
[InlineKeyboardButton("🔙 חזור", callback_data="back_to_analysis_menu")],
]
await query.edit_message_text(
"איך תרצה לבחור ריפו לניתוח?", reply_markup=InlineKeyboardMarkup(keyboard)
)
[תיעוד]
async def analyze_repository(
self, update: Update, context: ContextTypes.DEFAULT_TYPE, repo_url: str
):
"""מנתח ריפוזיטורי ומציג תוצאות"""
logger.info(f"🎯 Starting repository analysis for URL: {repo_url}")
query = update.callback_query if update.callback_query else None
user_id = update.effective_user.id
session = self.get_user_session(user_id)
logger.info(f"👤 User {user_id} analyzing repo: {repo_url}")
# הצג הודעת המתנה
status_message = await self._send_or_edit_message(
update, "🔍 מנתח את הריפו...\nזה עשוי לקחת מספר שניות..."
)
try:
# צור מנתח עם הטוקן
analyzer = RepoAnalyzer(github_token=self.get_user_token(user_id))
# נתח את הריפו
analysis = await analyzer.fetch_and_analyze_repo(repo_url)
# שמור את הניתוח ב-session
session["last_analysis"] = analysis
session["last_analyzed_repo"] = repo_url
# צור סיכום
summary = self._create_analysis_summary(analysis)
# צור כפתורים
keyboard = [
[InlineKeyboardButton("🎯 הצג הצעות לשיפור", callback_data="show_suggestions")],
[InlineKeyboardButton("📥 הורד דוח JSON", callback_data="download_analysis_json")],
[InlineKeyboardButton("🔍 נתח ריפו אחר", callback_data="analyze_other_repo")],
[InlineKeyboardButton("🔙 חזור לתפריט", callback_data="github_menu")],
]
# עדכן את ההודעה עם התוצאות
await status_message.edit_text(
summary, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
except Exception as e:
logger.error(f"Error analyzing repository: {e}")
error_message = f"❌ שגיאה בניתוח הריפו:\n{str(e)}"
keyboard = [
[InlineKeyboardButton("🔍 נסה ריפו אחר", callback_data="analyze_other_repo")],
[InlineKeyboardButton("🔙 חזור לתפריט", callback_data="github_menu")],
]
await status_message.edit_text(
error_message, reply_markup=InlineKeyboardMarkup(keyboard)
)
def _create_analysis_summary(self, analysis: Dict[str, Any]) -> str:
"""יוצר סיכום של הניתוח"""
# Escape HTML special characters
repo_name = safe_html_escape(analysis["repo_name"])
language = (
safe_html_escape(analysis.get("language", "")) if analysis.get("language") else None
)
summary = f"📊 <b>ניתוח הריפו {repo_name}</b>\n\n"
# סטטוס קבצים בסיסיים
summary += "<b>קבצים בסיסיים:</b>\n"
summary += "✅ README\n" if analysis["has_readme"] else "❌ חסר README\n"
summary += "✅ LICENSE\n" if analysis["has_license"] else "❌ חסר LICENSE\n"
summary += "✅ .gitignore\n" if analysis["has_gitignore"] else "❌ חסר .gitignore\n"
# מידע על הפרויקט
summary += f"\n<b>מידע כללי:</b>\n"
if language:
summary += f"🔤 שפה עיקרית: {language}\n"
summary += f"📁 {analysis['file_count']} קבצי קוד\n"
# קבצים לפי סוג
if analysis["files_by_type"]:
top_types = sorted(analysis["files_by_type"].items(), key=lambda x: x[1], reverse=True)[
:3
]
for ext, count in top_types:
ext_escaped = safe_html_escape(ext)
summary += f" • {count} קבצי {ext_escaped}\n"
# תלויות
if analysis["dependencies"]:
summary += f"📦 {len(analysis['dependencies'])} תלויות\n"
# בעיות פוטנציאליות
if analysis["large_files"]:
summary += f"⚠️ {len(analysis['large_files'])} קבצים גדולים\n"
if analysis["long_functions"]:
summary += f"⚠️ {len(analysis['long_functions'])} פונקציות ארוכות\n"
# ציון איכות
quality_score = analysis.get("quality_score", 0)
if quality_score >= 80:
emoji = "🌟"
text = "מצוין"
elif quality_score >= 60:
emoji = "✨"
text = "טוב"
elif quality_score >= 40:
emoji = "⭐"
text = "בינוני"
else:
emoji = "💫"
text = "דורש שיפור"
summary += f"\n<b>ציון איכות: {emoji} {quality_score}/100 ({text})</b>"
return summary
[תיעוד]
async def show_improvement_suggestions(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
):
"""מציג הצעות לשיפור"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
session = self.get_user_session(user_id)
if not session.get("last_analysis"):
await query.edit_message_text(
"❌ לא נמצא ניתוח. נתח ריפו קודם.",
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🔍 נתח ריפו", callback_data="analyze_repo")],
[InlineKeyboardButton("🔙 חזור לתפריט", callback_data="github_menu")],
]
),
)
return
# צור הצעות לשיפור
analyzer = RepoAnalyzer()
suggestions = analyzer.generate_improvement_suggestions(session["last_analysis"])
if not suggestions:
await query.edit_message_text(
"🎉 מעולה! לא נמצאו הצעות לשיפור משמעותיות.\n" "הפרויקט נראה מצוין!",
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🔙 חזור לסיכום", callback_data="back_to_analysis")],
[InlineKeyboardButton("🏠 תפריט ראשי", callback_data="github_menu")],
]
),
)
return
# שמור הצעות ב-session
session["suggestions"] = suggestions
# צור כפתורים להצעות (מקסימום 8 הצעות)
keyboard = []
for i, suggestion in enumerate(suggestions[:8]):
impact_emoji = (
"🔴"
if suggestion["impact"] == "high"
else "🟡" if suggestion["impact"] == "medium" else "🟢"
)
button_text = f"{impact_emoji} {suggestion['title']}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"suggestion_{i}")])
keyboard.append([InlineKeyboardButton("🔙 חזור לסיכום", callback_data="back_to_analysis")])
# Escape HTML special characters
repo_name = safe_html_escape(session["last_analysis"]["repo_name"])
message = f"💡 <b>הצעות לשיפור לריפו {repo_name}</b>\n\n"
message += f"נמצאו {len(suggestions)} הצעות לשיפור.\n"
message += "בחר הצעה לפרטים נוספים:\n\n"
message += "🔴 = השפעה גבוהה | 🟡 = בינונית | 🟢 = נמוכה"
await query.edit_message_text(
message, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
[תיעוד]
async def show_suggestion_details(
self, update: Update, context: ContextTypes.DEFAULT_TYPE, suggestion_index: int
):
"""מציג פרטי הצעה ספציפית"""
query = update.callback_query
await query.answer()
try:
user_id = query.from_user.id
session = self.get_user_session(user_id)
suggestions = session.get("suggestions", [])
if suggestion_index >= len(suggestions):
await query.answer("❌ הצעה לא נמצאה", show_alert=True)
return
suggestion = suggestions[suggestion_index]
# מיפוי השפעה ומאמץ לעברית
impact_map = {"high": "גבוהה", "medium": "בינונית", "low": "נמוכה"}
effort_map = {"high": "גבוה", "medium": "בינוני", "low": "נמוך"}
# Use safe HTML escaping to prevent parsing errors
title = safe_html_escape(suggestion.get("title", "הצעה"))
why = safe_html_escape(suggestion.get("why", "לא צוין"))
how = safe_html_escape(suggestion.get("how", "לא צוין"))
impact = safe_html_escape(impact_map.get(suggestion.get("impact", "medium"), "בינונית"))
effort = safe_html_escape(effort_map.get(suggestion.get("effort", "medium"), "בינוני"))
# בנה הודעה בטוחה
message = f"<b>{title}</b>\n\n"
message += f"❓ <b>למה:</b> {why}\n\n"
message += f"💡 <b>איך:</b> {how}\n\n"
message += f"📊 <b>השפעה:</b> {impact}\n"
message += f"⚡ <b>מאמץ:</b> {effort}\n"
keyboard = []
# הוסף כפתור למידע נוסף בהתאם לקטגוריה
suggestion_id = suggestion.get("id", "")
if suggestion_id == "add_license":
keyboard.append(
[InlineKeyboardButton("📚 מידע על רישיונות", url="https://choosealicense.com/")]
)
elif suggestion_id == "add_gitignore":
keyboard.append(
[InlineKeyboardButton("📚 יצירת .gitignore", url="https://gitignore.io/")]
)
elif suggestion_id == "add_ci_cd":
keyboard.append(
[
InlineKeyboardButton(
"📚 GitHub Actions", url="https://docs.github.com/en/actions"
)
]
)
keyboard.append(
[InlineKeyboardButton("🔙 חזור להצעות", callback_data="show_suggestions")]
)
await query.edit_message_text(
message, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
except Exception as e:
logger.error(f"Error in show_suggestion_details: {e}")
# Fallback to simple text without HTML
try:
simple_text = f"הצעה #{suggestion_index + 1}\n\n"
if "suggestion" in locals():
simple_text += f"{suggestion.get('title', 'הצעה')}\n\n"
simple_text += f"למה: {suggestion.get('why', 'לא צוין')}\n"
simple_text += f"איך: {suggestion.get('how', 'לא צוין')}\n"
else:
simple_text += "לא ניתן להציג את פרטי ההצעה"
await query.edit_message_text(
simple_text,
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("🔙 חזור", callback_data="show_suggestions")]]
),
)
except Exception as fallback_error:
logger.error(f"Fallback also failed: {fallback_error}")
await query.answer("❌ שגיאה בהצגת ההצעה", show_alert=True)
[תיעוד]
async def show_full_analysis(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג ניתוח מלא"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
session = self.get_user_session(user_id)
analysis = session.get("last_analysis")
if not analysis:
await query.answer("❌ לא נמצא ניתוח", show_alert=True)
return
# צור דוח מפורט - Escape HTML special characters
repo_name = safe_html_escape(analysis["repo_name"])
repo_url = safe_html_escape(analysis["repo_url"])
description = (
safe_html_escape(analysis.get("description", ""))
if analysis.get("description")
else None
)
language = safe_html_escape(analysis.get("language", "לא זוהתה"))
report = f"📊 <b>דוח מלא - {repo_name}</b>\n\n"
# מידע בסיסי
report += "<b>📌 מידע כללי:</b>\n"
report += f"• URL: {repo_url}\n"
if description:
report += f"• תיאור: {description}\n"
report += f"• שפה: {language}\n"
report += f"• כוכבים: ⭐ {analysis.get('stars', 0)}\n"
report += f"• Forks: 🍴 {analysis.get('forks', 0)}\n"
# קבצים
report += f"\n<b>📁 קבצים:</b>\n"
report += f"• סה״כ קבצי קוד: {analysis['file_count']}\n"
if analysis["files_by_type"]:
report += "• לפי סוג:\n"
for ext, count in sorted(
analysis["files_by_type"].items(), key=lambda x: x[1], reverse=True
):
report += f" - {ext}: {count}\n"
# בעיות
if analysis["large_files"] or analysis["long_functions"]:
report += f"\n<b>⚠️ בעיות פוטנציאליות:</b>\n"
if analysis["large_files"]:
report += f"• {len(analysis['large_files'])} קבצים גדולים (500+ שורות)\n"
if analysis["long_functions"]:
report += f"• {len(analysis['long_functions'])} פונקציות ארוכות (50+ שורות)\n"
# תלויות
if analysis["dependencies"]:
report += f"\n<b>📦 תלויות ({len(analysis['dependencies'])}):</b>\n"
# הצג רק 10 הראשונות
for dep in analysis["dependencies"][:10]:
dep_name = safe_html_escape(dep["name"])
dep_type = safe_html_escape(dep["type"])
report += f"• {dep_name} ({dep_type})\n"
if len(analysis["dependencies"]) > 10:
report += f"• ... ועוד {len(analysis['dependencies']) - 10}\n"
keyboard = [
[InlineKeyboardButton("🔙 חזור לסיכום", callback_data="back_to_analysis")],
[InlineKeyboardButton("🏠 תפריט ראשי", callback_data="github_menu")],
]
# חלק את ההודעה אם היא ארוכה מדי
if len(report) > 4000:
report = report[:3900] + "\n\n... (קוצר לצורך תצוגה)"
await query.edit_message_text(
report, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
[תיעוד]
async def download_analysis_json(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""שולח קובץ JSON עם הניתוח המלא"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
session = self.get_user_session(user_id)
analysis = session.get("last_analysis")
if not analysis:
await query.answer("❌ לא נמצא ניתוח", show_alert=True)
return
# הוסף גם את ההצעות לדוח
analyzer = RepoAnalyzer()
suggestions = analyzer.generate_improvement_suggestions(analysis)
full_report = {
"analysis": analysis,
"suggestions": suggestions,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
# צור קובץ JSON
json_content = json.dumps(full_report, ensure_ascii=False, indent=2)
# שלח כקובץ
import io
file = io.BytesIO(json_content.encode("utf-8"))
file.name = f"repo_analysis_{analysis['repo_name']}.json"
await query.message.reply_document(
document=file,
filename=file.name,
caption=f"📊 דוח ניתוח מלא עבור {analysis['repo_name']}",
)
# חזור לתפריט
await self.show_analyze_results_menu(update, context)
[תיעוד]
async def show_analyze_results_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג מחדש את תפריט התוצאות"""
user_id = update.effective_user.id
session = self.get_user_session(user_id)
analysis = session.get("last_analysis")
if not analysis:
return
summary = self._create_analysis_summary(analysis)
keyboard = [
[InlineKeyboardButton("🎯 הצג הצעות לשיפור", callback_data="show_suggestions")],
[InlineKeyboardButton("📥 הורד דוח JSON", callback_data="download_analysis_json")],
[InlineKeyboardButton("🔍 נתח ריפו אחר", callback_data="analyze_other_repo")],
[InlineKeyboardButton("🔙 חזור לתפריט", callback_data="github_menu")],
]
if update.callback_query:
await update.callback_query.edit_message_text(
summary, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
else:
await update.message.reply_text(
summary, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
async def _send_or_edit_message(self, update: Update, text: str, **kwargs):
"""שולח או עורך הודעה בהתאם לסוג ה-update"""
if update.callback_query:
return await update.callback_query.edit_message_text(text, **kwargs)
else:
return await update.message.reply_text(text, **kwargs)
[תיעוד]
async def show_delete_file_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט מחיקת קובץ מהריפו (דפדוף בכפתורים)"""
query = update.callback_query
session = self.get_user_session(query.from_user.id)
repo = session.get("selected_repo")
if not repo:
await query.edit_message_text("❌ לא נבחר ריפו")
return
context.user_data["browse_action"] = "delete"
context.user_data["browse_path"] = ""
context.user_data["browse_page"] = 0
# מצב מרובה ומחיקה בטוחה לאיפוס
context.user_data["multi_mode"] = False
context.user_data["multi_selection"] = []
context.user_data["safe_delete"] = True
await self.show_repo_browser(update, context)
[תיעוד]
async def show_delete_repo_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט מחיקת ריפו שלם עם אזהרות"""
query = update.callback_query
session = self.get_user_session(query.from_user.id)
repo = session.get("selected_repo")
if not repo:
await query.edit_message_text("❌ לא נבחר ריפו")
return
keyboard = [
[
InlineKeyboardButton(
"✅ אני מבין/ה ומאשר/ת מחיקה", callback_data="confirm_delete_repo_step1"
)
],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
"⚠️ מחיקת ריפו שלם הינה פעולה בלתי הפיכה!\n\n"
"- יימחקו כל הקבצים, ה-Issues, ה-PRs וה-Settings\n"
"- לא ניתן לשחזר לאחר המחיקה\n\n"
f"ריפו למחיקה: <code>{repo}</code>\n\n"
"אם ברצונך להמשיך, לחץ על האישור ואז תתבקש לאשר שוב.",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
[תיעוד]
async def confirm_delete_file(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מבצע מחיקת קובץ לאחר אישור"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
file_path = context.user_data.get("pending_delete_file_path")
if not (token and repo_name and file_path):
await query.edit_message_text("❌ נתונים חסרים למחיקה")
return
try:
g = Github(token)
repo = g.get_repo(repo_name)
# בדוק אם הקובץ קיים וקבל sha לצורך מחיקה
contents = repo.get_contents(file_path)
default_branch = repo.default_branch or "main"
repo.delete_file(
contents.path, f"Delete via bot: {file_path}", contents.sha, branch=default_branch
)
await query.edit_message_text(
f"✅ הקובץ נמחק בהצלחה: <code>{file_path}</code>", parse_mode="HTML"
)
except Exception as e:
logger.error(f"Error deleting file: {e}")
await query.edit_message_text(f"❌ שגיאה במחיקת קובץ: {e}")
finally:
context.user_data.pop("pending_delete_file_path", None)
await self.github_menu_command(update, context)
[תיעוד]
async def confirm_delete_repo_step1(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מסך אישור סופי לפני מחיקת ריפו, מפנה ללחצן מחיקה סופי"""
query = update.callback_query
session = self.get_user_session(query.from_user.id)
repo = session.get("selected_repo")
if not repo:
await query.edit_message_text("❌ לא נבחר ריפו")
return
keyboard = [
[InlineKeyboardButton("🧨 כן, מחק לצמיתות", callback_data="confirm_delete_repo")],
[InlineKeyboardButton("🔙 ביטול", callback_data="github_menu")],
]
await query.edit_message_text(
f"⚠️ אישור סופי למחיקת <code>{repo}</code>\n\n"
"פעולה זו תמחק לצמיתות את הריפו וכל התוכן המשויך אליו.\n"
"אין דרך לשחזר לאחר מכן.",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
[תיעוד]
async def confirm_delete_repo(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מבצע מחיקת ריפו שלם לאחר אישור"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ נתונים חסרים למחיקה")
return
try:
g = Github(token)
repo = g.get_repo(repo_name)
owner = g.get_user()
# ודא שלמשתמש יש הרשאה למחוק
if repo.owner.login != owner.login:
await query.edit_message_text("❌ ניתן למחוק רק ריפו שאתה בעליו")
return
repo.delete()
# נקה קאש ריפוזיטוריז כדי שהרשימה תרוענן ולא תציג פריטים שנמחקו
context.user_data.pop("repos", None)
context.user_data.pop("repos_cache_time", None)
await query.edit_message_text(
f"✅ הריפו נמחק בהצלחה: <code>{repo_name}</code>", parse_mode="HTML"
)
# נקה בחירה לאחר מחיקה
session["selected_repo"] = None
except Exception as e:
logger.error(f"Error deleting repository: {e}")
await query.edit_message_text(f"❌ שגיאה במחיקת ריפו: {e}")
finally:
# לאחר מחיקה, ודא שקאש הרשימות אינו משאיר את הריפו הישן
context.user_data.pop("repos", None)
context.user_data.pop("repos_cache_time", None)
await self.github_menu_command(update, context)
[תיעוד]
async def show_danger_delete_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט מחיקות מסוכן"""
query = update.callback_query
session = self.get_user_session(query.from_user.id)
repo = session.get("selected_repo")
if not repo:
await query.edit_message_text("❌ לא נבחר ריפו")
return
keyboard = [
[InlineKeyboardButton("🗑️ מחק קובץ מהריפו", callback_data="delete_file_menu")],
[InlineKeyboardButton("⚠️ מחק ריפו שלם (מתקדם)", callback_data="delete_repo_menu")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
f"🧨 פעולות מחיקה ב-<code>{repo}</code>\n\nבחר פעולה:",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
[תיעוד]
async def show_download_file_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט הורדת קובץ מהריפו (דפדוף בכפתורים)"""
query = update.callback_query
session = self.get_user_session(query.from_user.id)
repo = session.get("selected_repo")
if not repo:
await query.edit_message_text("❌ לא נבחר ריפו")
return
# התחל בדפדוף מה-root במצב הורדה בלבד
context.user_data["browse_action"] = "download"
context.user_data["browse_path"] = ""
context.user_data["browse_page"] = 0
# אפס מצב מחיקה אם הופעל קודם
context.user_data["multi_mode"] = False
context.user_data["multi_selection"] = []
context.user_data["safe_delete"] = True
await self.show_repo_browser(update, context)
[תיעוד]
async def show_repo_browser(self, update: Update, context: ContextTypes.DEFAULT_TYPE, only_keyboard: bool = False):
"""מציג דפדפן ריפו לפי נתיב ושימוש (view/download/delete), כולל breadcrumbs ועימוד."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
g = Github(token)
repo = g.get_repo(repo_name)
path = context.user_data.get("browse_path", "")
# קביעת ref נוכחי לניווט (ענף/תג)
try:
current_ref = context.user_data.get("browse_ref") or (getattr(repo, "default_branch", None) or "main")
except Exception:
current_ref = getattr(repo, "default_branch", None) or "main"
# קבלת תוכן התיקייה
try:
contents = repo.get_contents(path or "", ref=current_ref)
except Exception:
contents = repo.get_contents(path or "")
if not isinstance(contents, list):
# אם זה קובץ יחיד, הפוך לרשימה לצורך תצוגה
contents = [contents]
# בניית פריטים (תיקיות קודם, אחר כך קבצים)
folders = [c for c in contents if c.type == "dir"]
files = [c for c in contents if c.type == "file"]
# במצב בחירת תיקייה, לא נציג קבצים כלל
folder_selecting = bool(context.user_data.get("folder_select_mode"))
entry_rows = []
# Breadcrumbs
crumbs_row = []
crumbs_row.append(InlineKeyboardButton("🏠 root", callback_data=self._mk_cb(context, "browse_open", "")))
if path:
parts = path.split("/")
accum = []
for part in parts:
accum.append(part)
crumbs_row.append(InlineKeyboardButton(part, callback_data=self._mk_cb(context, "browse_open", '/'.join(accum))))
if crumbs_row:
entry_rows.append(crumbs_row)
# שורת כלים: חיפוש ובחירת ref
tools_row = [
InlineKeyboardButton("🔎 חפש בשם קובץ", callback_data="browse_search"),
InlineKeyboardButton(f"🌿 ref: {current_ref}", callback_data="browse_ref_menu"),
]
entry_rows.append(tools_row)
for folder in folders:
# תמיד מציגים פתיחת תיקייה; אין צורך בכפתור "בחר כיעד" (הוסרה דרישתך)
row = [InlineKeyboardButton(
f"📂 {folder.name}", callback_data=self._mk_cb(context, "browse_open", folder.path)
)]
entry_rows.append(row)
multi_mode = context.user_data.get("multi_mode", False)
selection = set(context.user_data.get("multi_selection", []))
if not folder_selecting:
for f in files:
if multi_mode:
checked = "☑️" if f.path in selection else "⬜️"
entry_rows.append(
[
InlineKeyboardButton(
f"{checked} {f.name}", callback_data=f"browse_toggle_select:{f.path}"
)
]
)
else:
mode = context.user_data.get("browse_action")
if mode == "download":
size_val = getattr(f, "size", 0) or 0
large_flag = " ⚠️" if size_val and size_val > MAX_INLINE_FILE_BYTES else ""
entry_rows.append(
[
InlineKeyboardButton(
f"⬇️ {f.name}{large_flag}",
callback_data=self._mk_cb(context, "browse_select_download", f.path),
)
]
)
elif mode == "view":
# הסר כפתור "שתף קישור" מרשימה; נשאיר רק במסך התצוגה
entry_rows.append(
[
InlineKeyboardButton(
f"👁️ {f.name}", callback_data=self._mk_cb(context, "browse_select_view", f.path)
)
]
)
else:
# במצב שאינו download ואינו view — זה מצב delete בלבד
entry_rows.append(
[
InlineKeyboardButton(
f"🗑️ {f.name}", callback_data=self._mk_cb(context, "browse_select_delete", f.path)
)
]
)
# ודא דגלים ברירת מחדל כדי למנוע שגיאות בניווט
if context.user_data.get("browse_page") is None:
context.user_data["browse_page"] = 0
if context.user_data.get("multi_mode") is None:
context.user_data["multi_mode"] = False
# עימוד
page_size = 10
# ודא ששורת הכלים (חיפוש/בחירת ref) תמיד נשמרת בראש כל עמוד
# נבנה את המטריצה כך שהשורה הראשונה תהיה תמיד הכלים, ולא תיספר לעימוד
# מצא אינדקס תחילת הפריטים לעימוד אחרי breadcrumbs ושורת כלים
# breadcrumbs נמצאת ב-entry_rows[0] (אם קיימת), ושורת כלים ב-entry_rows[1]
start_items_index = 0
if entry_rows:
# אם יש breadcrumbs, הם באינדקס 0
start_items_index = 1
# אם יש גם שורת כלים, היא באינדקס 1
if len(entry_rows) > 1 and any(
isinstance(btn, InlineKeyboardButton) and getattr(btn, 'callback_data', '') == 'browse_search'
for btn in entry_rows[1]
):
start_items_index = 2
paginable_rows = entry_rows[start_items_index:]
total_items = len(paginable_rows)
total_pages = max(1, (total_items + page_size - 1) // page_size)
current_page = min(max(0, context.user_data.get("browse_page", 0)), total_pages - 1)
start_index = current_page * page_size
end_index = start_index + page_size
# בנה מקלדת: breadcrumbs (אם קיימת) + שורת כלים + פריטי העמוד
keyboard = []
if entry_rows and start_items_index >= 1:
keyboard.append(entry_rows[0]) # breadcrumbs
if entry_rows and start_items_index >= 2:
keyboard.append(entry_rows[1]) # tools row (כולל חיפוש)
keyboard.extend(paginable_rows[start_index:end_index])
# ניווט עמודים
if total_pages > 1:
nav_row = []
if current_page > 0:
nav_row.append(
InlineKeyboardButton("⬅️ הקודם", callback_data=f"browse_page:{current_page - 1}")
)
nav_row.append(
InlineKeyboardButton(f"עמוד {current_page + 1}/{total_pages}", callback_data="noop")
)
if current_page < total_pages - 1:
nav_row.append(
InlineKeyboardButton("הבא ➡️", callback_data=f"browse_page:{current_page + 1}")
)
keyboard.append(nav_row)
# שורה תחתונה
bottom = []
if path:
# חזרה למעלה
parent = "/".join(path.split("/")[:-1])
bottom.append(InlineKeyboardButton("⬆️ למעלה", callback_data=self._mk_cb(context, "browse_open", parent)))
# כפתור חזרה/סיום לבחירת תיקייה
if context.user_data.get("folder_select_mode") == "session":
bottom.append(InlineKeyboardButton("✅ סיום בחירה", callback_data="folder_select_done"))
bottom.append(InlineKeyboardButton("🔙 ביטול", callback_data="github_menu"))
# הוסף כפתור יצירת תיקייה חדשה במצב בחירת תיקייה
keyboard.append([InlineKeyboardButton("➕ צור תיקייה חדשה", callback_data="create_folder")])
# סדר כפתורים לשורות כדי למנוע צפיפות
row = []
if (not folder_selecting) and context.user_data.get("browse_action") == "download":
row.append(InlineKeyboardButton("📦 הורד תיקייה כ־ZIP", callback_data=self._mk_cb(context, "download_zip", path or "")))
if len(row) >= 1:
keyboard.append(row)
row = []
if (not folder_selecting) and context.user_data.get("browse_action") == "download":
row.append(InlineKeyboardButton("🔗 שתף קישור לתיקייה", callback_data=self._mk_cb(context, "share_folder_link", path or "")))
if not folder_selecting:
# במצב הורדה לא מציגים כלל כפתורי מחיקה/בחירה מרובה למחיקה
if context.user_data.get("browse_action") == "download":
if multi_mode:
keyboard.append(row)
row = []
row.append(InlineKeyboardButton("📦 הורד נבחרים כ־ZIP", callback_data="multi_execute"))
row.append(InlineKeyboardButton("🔗 שתף קישורים לנבחרים", callback_data="share_selected_links"))
keyboard.append(row)
row = [InlineKeyboardButton("♻️ נקה בחירה", callback_data="multi_clear"), InlineKeyboardButton("🚫 בטל מצב מרובה", callback_data="multi_toggle")]
keyboard.append(row)
else:
row.append(InlineKeyboardButton("✅ בחר מרובים", callback_data="multi_toggle"))
keyboard.append(row)
else:
# מצב delete/view – התנהגות קיימת
if not multi_mode:
row.append(InlineKeyboardButton("✅ בחר מרובים", callback_data="multi_toggle"))
keyboard.append(row)
else:
keyboard.append(row)
row = []
safe_label = (
"מצב מחיקה בטוח: פעיל" if context.user_data.get("safe_delete", True) else "מצב מחיקה בטוח: כבוי"
)
row.append(InlineKeyboardButton(safe_label, callback_data="safe_toggle"))
keyboard.append(row)
row = [InlineKeyboardButton("🗑️ מחק נבחרים", callback_data="multi_execute"), InlineKeyboardButton("🔗 שתף קישורים לנבחרים", callback_data="share_selected_links")]
keyboard.append(row)
row = [InlineKeyboardButton("♻️ נקה בחירה", callback_data="multi_clear"), InlineKeyboardButton("🚫 בטל מצב מרובה", callback_data="multi_toggle")]
keyboard.append(row)
keyboard.append([InlineKeyboardButton("🔙 חזרה", callback_data="github_menu")])
if bottom:
keyboard.append(bottom)
# טקסט
_mode = context.user_data.get("browse_action")
action = "תצוגה" if _mode == "view" else ("הורדה" if _mode == "download" else "מחיקה")
if only_keyboard:
try:
await TelegramUtils.safe_edit_message_reply_markup(query, reply_markup=InlineKeyboardMarkup(keyboard))
return
except Exception:
pass
if folder_selecting:
await TelegramUtils.safe_edit_message_text(
query,
f"📁 דפדוף ריפו: <code>{repo_name}</code>\n"
f"🔀 ref: <code>{current_ref}</code>\n"
f"📂 נתיב: <code>/{path or ''}</code>\n\n"
f"בחר תיקייה יעד או פתח תיקייה (מציג {min(page_size, max(0, total_items - start_index))} מתוך {total_items}):",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
else:
await TelegramUtils.safe_edit_message_text(
query,
f"📁 דפדוף ריפו: <code>{repo_name}</code>\n"
f"🔀 ref: <code>{current_ref}</code>\n"
f"📂 נתיב: <code>/{path or ''}</code>\n\n"
f"בחר קובץ ל{action} או פתח תיקייה (מציג {min(page_size, max(0, total_items - start_index))} מתוך {total_items}):",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
else:
if folder_selecting:
await query.edit_message_text(
f"📁 דפדוף ריפו: <code>{repo_name}</code>\n"
f"🔀 ref: <code>{current_ref}</code>\n"
f"📂 נתיב: <code>/{path or ''}</code>\n\n"
f"בחר תיקייה יעד או פתח תיקייה (מציג {min(page_size, max(0, total_items - start_index))} מתוך {total_items}):",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
else:
try:
await query.edit_message_text(
f"📁 דפדוף ריפו: <code>{repo_name}</code>\n"
f"🔀 ref: <code>{current_ref}</code>\n"
f"📂 נתיב: <code>/{path or ''}</code>\n\n"
f"בחר קובץ ל{action} או פתח תיקייה (מציג {min(page_size, max(0, total_items - start_index))} מתוך {total_items}):",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
[תיעוד]
async def handle_inline_query(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Inline mode: חיפוש/ביצוע פעולות ישירות מכל צ'אט"""
inline_query = update.inline_query
user_id = inline_query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
q = (inline_query.query or "").strip()
results = []
if not (token and repo_name):
# בקש מהמשתמש לבחור ריפו
results.append(
InlineQueryResultArticle(
id=f"help-no-repo",
title="בחר/התחבר לריפו לפני שימוש באינליין",
description="שלח /github לבחירת ריפו ו/או התחברות",
input_message_content=InputTextMessageContent(
"🔧 שלח /github לבחירת ריפו ולהתחברות ל-GitHub"
),
)
)
await inline_query.answer(results, cache_time=1, is_personal=True)
return
g = Github(token)
repo = g.get_repo(repo_name)
# ללא קלט: הצג עזרה קצרה
if not q:
results = [
InlineQueryResultArticle(
id="help-1",
title="zip <path> — הורד תיקייה כ־ZIP",
description="לדוגמה: zip src/components",
input_message_content=InputTextMessageContent("בחר תיקייה להורדה כ־ZIP"),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("פתח /github", callback_data="github_menu")]]
),
),
InlineQueryResultArticle(
id="help-2",
title="file <path> — הורד קובץ בודד",
description="לדוגמה: file README.md או src/app.py",
input_message_content=InputTextMessageContent("בחר קובץ להורדה"),
reply_markup=InlineKeyboardMarkup(
[[InlineKeyboardButton("פתח /github", callback_data="github_menu")]]
),
),
InlineQueryResultArticle(
id="help-3",
title=f"ריפו נוכחי: {repo_name}",
description="הקלד נתיב מלא לרשימה/קובץ",
input_message_content=InputTextMessageContent(f"ריפו: {repo_name}"),
),
]
await inline_query.answer(results, cache_time=1, is_personal=True)
return
# פרסור פשוט: zip <path> / file <path> או נתיב ישיר
is_zip = False
is_file = False
path = q
if q.lower().startswith("zip "):
is_zip = True
path = q[4:].strip()
elif q.lower().startswith("file "):
is_file = True
path = q[5:].strip()
path = path.lstrip("/")
try:
contents = repo.get_contents(path)
# תיקייה
if isinstance(contents, list):
# תוצאה ל־ZIP
results.append(
InlineQueryResultArticle(
id=f"zip-{path or 'root'}",
title=f"📦 ZIP לתיקייה: /{path or ''}",
description=f"{repo_name} — אריזת תיקייה והורדה",
input_message_content=InputTextMessageContent(
f"ZIP לתיקייה: /{path or ''}"
),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"📦 הורד ZIP", callback_data=f"download_zip:{path}"
)
]
]
),
)
)
# הצג כמה קבצים ראשונים בתיקייה להורדה מהירה
shown = 0
for item in contents:
if getattr(item, "type", "") == "file":
size_str = format_bytes(getattr(item, "size", 0) or 0)
results.append(
InlineQueryResultArticle(
id=f"file-{item.path}",
title=f"⬇️ {item.name} ({size_str})",
description=f"/{item.path}",
input_message_content=InputTextMessageContent(
f"קובץ: /{item.path}"
),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"⬇️ הורד",
callback_data=f"inline_download_file:{item.path}",
)
]
]
),
)
)
shown += 1
if shown >= 10:
break
else:
# קובץ בודד
size_str = format_bytes(getattr(contents, "size", 0) or 0)
results.append(
InlineQueryResultArticle(
id=f"file-{path}",
title=f"⬇️ הורד: {os.path.basename(contents.path)} ({size_str})",
description=f"/{path}",
input_message_content=InputTextMessageContent(f"קובץ: /{path}"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"⬇️ הורד", callback_data=f"inline_download_file:{path}"
)
]
]
),
)
)
except Exception:
# אם לצורך zip/file מפורש, החזר כפתור גם אם לא קיים (ייתכן נתיב שגוי)
if is_zip and path:
results.append(
InlineQueryResultArticle(
id=f"zip-maybe-{path}",
title=f"📦 ZIP: /{path}",
description="ניסיון אריזה לתיקייה (אם קיימת)",
input_message_content=InputTextMessageContent(f"ZIP לתיקייה: /{path}"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"📦 הורד ZIP", callback_data=f"download_zip:{path}"
)
]
]
),
)
)
elif is_file and path:
results.append(
InlineQueryResultArticle(
id=f"file-maybe-{path}",
title=f"⬇️ קובץ: /{path}",
description="ניסיון הורדה לקובץ (אם קיים)",
input_message_content=InputTextMessageContent(f"קובץ: /{path}"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"⬇️ הורד", callback_data=f"inline_download_file:{path}"
)
]
]
),
)
)
else:
results.append(
InlineQueryResultArticle(
id="not-found",
title="לא נמצאה התאמה",
description="הקלד: zip <path> או file <path> או נתיב מלא",
input_message_content=InputTextMessageContent("לא נמצאה התאמה לשאילתה"),
)
)
await inline_query.answer(results[:50], cache_time=1, is_personal=True)
[תיעוד]
async def show_notifications_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
if not session.get("selected_repo"):
await query.edit_message_text("❌ בחר ריפו קודם (/github)")
return
settings = context.user_data.get("notifications", {})
enabled = settings.get("enabled", False)
pr_on = settings.get("pr", True)
issues_on = settings.get("issues", True)
interval = settings.get("interval", 300)
keyboard = [
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
[
InlineKeyboardButton(
"הפעל" if not enabled else "כבה", callback_data="notifications_toggle"
)
],
[
InlineKeyboardButton(
f"PRs: {'פעיל' if pr_on else 'כבוי'}", callback_data="notifications_toggle_pr"
)
],
[
InlineKeyboardButton(
f"Issues: {'פעיל' if issues_on else 'כבוי'}",
callback_data="notifications_toggle_issues",
)
],
[
InlineKeyboardButton("תדירות: 2ד׳", callback_data="notifications_interval_120"),
InlineKeyboardButton("5ד׳", callback_data="notifications_interval_300"),
InlineKeyboardButton("15ד׳", callback_data="notifications_interval_900"),
],
[InlineKeyboardButton("בדוק עכשיו", callback_data="notifications_check_now")],
]
text = (
f"🔔 התראות לריפו: <code>{session['selected_repo']}</code>\n"
f"מצב: {'פעיל' if enabled else 'כבוי'} | תדירות: {int(interval/60)} ד׳\n"
f"התראות = בדיקת PRs/Issues חדשים/שעודכנו ושיגור הודעה אליך."
)
try:
await query.edit_message_text(
text, reply_markup=InlineKeyboardMarkup(keyboard), parse_mode="HTML"
)
except BadRequest as e:
# התעלם אם התוכן לא השתנה
if "Message is not modified" not in str(e):
raise
[תיעוד]
async def toggle_notifications(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
settings = context.user_data.setdefault(
"notifications", {"enabled": False, "pr": True, "issues": True, "interval": 300}
)
settings["enabled"] = not settings.get("enabled", False)
# ניהול job
name = f"notif_{user_id}"
jq = getattr(context, "job_queue", None) or getattr(context.application, "job_queue", None)
if jq:
for job in jq.get_jobs_by_name(name) or []:
job.schedule_removal()
if settings["enabled"]:
jq.run_repeating(
self._notifications_job,
interval=settings.get("interval", 300),
first=5,
name=name,
data={"user_id": user_id},
)
else:
await query.answer("אזהרה: JobQueue לא זמין — התראות לא ירוצו ברקע", show_alert=True)
await self.show_notifications_menu(update, context)
[תיעוד]
async def toggle_notifications_pr(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
settings = context.user_data.setdefault(
"notifications", {"enabled": False, "pr": True, "issues": True, "interval": 300}
)
settings["pr"] = not settings.get("pr", True)
await self.show_notifications_menu(update, context)
[תיעוד]
async def toggle_notifications_issues(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
settings = context.user_data.setdefault(
"notifications", {"enabled": False, "pr": True, "issues": True, "interval": 300}
)
settings["issues"] = not settings.get("issues", True)
await self.show_notifications_menu(update, context)
[תיעוד]
async def set_notifications_interval(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
settings = context.user_data.setdefault(
"notifications", {"enabled": False, "pr": True, "issues": True, "interval": 300}
)
try:
interval = int(query.data.rsplit("_", 1)[1])
except Exception:
interval = 300
settings["interval"] = interval
# עדכן job אם קיים
name = f"notif_{user_id}"
jq = getattr(context, "job_queue", None) or getattr(context.application, "job_queue", None)
if jq:
for job in jq.get_jobs_by_name(name) or []:
job.schedule_removal()
if settings.get("enabled"):
jq.run_repeating(
self._notifications_job,
interval=interval,
first=5,
name=name,
data={"user_id": user_id},
)
else:
await query.answer("אזהרה: JobQueue לא זמין — התראות לא ירוצו ברקע", show_alert=True)
await self.show_notifications_menu(update, context)
[תיעוד]
async def notifications_check_now(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
try:
await query.answer("בודק עכשיו...", show_alert=False)
except Exception:
pass
await self._notifications_job(context, user_id=query.from_user.id, force=True)
try:
await self.show_notifications_menu(update, context)
except BadRequest as e:
if "Message is not modified" not in str(e):
raise
async def _notifications_job(
self, context: ContextTypes.DEFAULT_TYPE, user_id: Optional[int] = None, force: bool = False
):
try:
if user_id is None:
job = getattr(context, "job", None)
if job and getattr(job, "data", None):
user_id = job.data.get("user_id")
if not user_id:
return
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
settings = (
context.application.user_data.get(user_id, {}).get("notifications")
if hasattr(context.application, "user_data")
else None
)
if settings is None:
settings = context.user_data.get("notifications", {})
if not (token and repo_name):
return
if not force and not (settings and settings.get("enabled")):
return
g = Github(token)
repo = g.get_repo(repo_name)
# נהל זיכרון "נבדק לאחרונה"
last = session.get("notifications_last", {"pr": None, "issues": None})
messages = []
# PRs
if settings.get("pr", True):
last_pr_check_time = last.get("pr")
# If this is the first run (no baseline), set a baseline without sending backlog
if last_pr_check_time is None:
session["notifications_last"] = session.get("notifications_last", {})
session["notifications_last"]["pr"] = datetime.now(timezone.utc)
else:
pulls = repo.get_pulls(state="all", sort="updated", direction="desc")
for pr in pulls[:10]:
updated = pr.updated_at
if updated <= last_pr_check_time:
break
status = (
"נפתח"
if pr.state == "open" and pr.created_at == pr.updated_at
else ("מוזג" if pr.merged else ("נסגר" if pr.state == "closed" else "עודכן"))
)
messages.append(
f'🔔 PR {status}: <a href="{pr.html_url}">{safe_html_escape(pr.title)}</a>'
)
session["notifications_last"] = session.get("notifications_last", {})
session["notifications_last"]["pr"] = datetime.now(timezone.utc)
# Issues
if settings.get("issues", True):
last_issues_check_time = last.get("issues")
if last_issues_check_time is None:
session["notifications_last"] = session.get("notifications_last", {})
session["notifications_last"]["issues"] = datetime.now(timezone.utc)
else:
issues = repo.get_issues(state="all", sort="updated", direction="desc")
count = 0
for issue in issues:
if issue.pull_request is not None:
continue
updated = issue.updated_at
if updated <= last_issues_check_time:
break
status = (
"נפתח"
if issue.state == "open" and issue.created_at == issue.updated_at
else ("נסגר" if issue.state == "closed" else "עודכן")
)
messages.append(
f'🔔 Issue {status}: <a href="{issue.html_url}">{safe_html_escape(issue.title)}</a>'
)
count += 1
if count >= 10:
break
session["notifications_last"] = session.get("notifications_last", {})
session["notifications_last"]["issues"] = datetime.now(timezone.utc)
# שלח הודעה אם יש
if messages:
text = "\n".join(messages)
await context.bot.send_message(
chat_id=user_id, text=text, parse_mode="HTML", disable_web_page_preview=True
)
except Exception as e:
logger.error(f"notifications job error: {e}")
[תיעוד]
async def show_pr_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
if not session.get("selected_repo"):
await query.edit_message_text("❌ בחר ריפו קודם (/github)")
return
keyboard = [
[InlineKeyboardButton("🆕 צור PR מסניף", callback_data="create_pr_menu")],
[InlineKeyboardButton("🔀 מזג PR פתוח", callback_data="merge_pr_menu")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
f"🔀 פעולות Pull Request עבור <code>{session['selected_repo']}</code>",
reply_markup=InlineKeyboardMarkup(keyboard),
parse_mode="HTML",
)
[תיעוד]
async def show_create_pr_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
g = Github(token)
repo = g.get_repo(repo_name)
branches = list(repo.get_branches())
page = context.user_data.get("pr_branches_page", 0)
page_size = 10
total_pages = max(1, (len(branches) + page_size - 1) // page_size)
page = min(max(0, page), total_pages - 1)
start = page * page_size
end = start + page_size
keyboard = []
for br in branches[start:end]:
keyboard.append(
[InlineKeyboardButton(f"🌿 {br.name}", callback_data=f"pr_select_head:{br.name}")]
)
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"branches_page_{page-1}"))
nav.append(InlineKeyboardButton(f"עמוד {page+1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"branches_page_{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="pr_menu")])
await query.edit_message_text(
f"🆕 צור PR — בחר סניף head (base יהיה ברירת המחדל של הריפו)",
reply_markup=InlineKeyboardMarkup(keyboard),
)
[תיעוד]
async def show_confirm_create_pr(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
head = context.user_data.get("pr_head")
g = Github(token)
repo = g.get_repo(repo_name)
base = repo.default_branch or "main"
txt = (
f"תיצור PR חדש?\n"
f"ריפו: <code>{repo_name}</code>\n"
f"base: <code>{base}</code> ← head: <code>{head}</code>\n\n"
f"כותרת: <code>PR: {head} → {base}</code>"
)
kb = [
[InlineKeyboardButton("✅ אשר יצירה", callback_data="confirm_create_pr")],
[InlineKeyboardButton("🔙 חזור", callback_data="create_pr_menu")],
]
await query.edit_message_text(txt, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
[תיעוד]
async def confirm_create_pr(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
head = context.user_data.get("pr_head")
try:
g = Github(token)
repo = g.get_repo(repo_name)
base = repo.default_branch or "main"
title = f"PR: {head} → {base} (via bot)"
body = "נוצר אוטומטית על ידי הבוט"
pr = repo.create_pull(title=title, body=body, base=base, head=head)
await query.edit_message_text(
f'✅ נוצר PR: <a href="{pr.html_url}">{safe_html_escape(pr.title)}</a>',
parse_mode="HTML",
)
except Exception as e:
await query.edit_message_text(f"❌ שגיאה ביצירת PR: {e}")
return
await self.show_pr_menu(update, context)
[תיעוד]
async def show_merge_pr_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
g = Github(token)
repo = g.get_repo(repo_name)
pulls = list(repo.get_pulls(state="open", sort="created", direction="desc"))
page = context.user_data.get("pr_list_page", 0)
page_size = 10
total_pages = max(1, (len(pulls) + page_size - 1) // page_size)
page = min(max(0, page), total_pages - 1)
start = page * page_size
end = start + page_size
keyboard = []
for pr in pulls[start:end]:
title = safe_html_escape(pr.title)
keyboard.append(
[
InlineKeyboardButton(
f"#{pr.number} {title}", callback_data=f"merge_pr:{pr.number}"
)
]
)
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"prs_page_{page-1}"))
nav.append(InlineKeyboardButton(f"עמוד {page+1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"prs_page_{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="pr_menu")])
await query.edit_message_text(
f"🔀 בחר PR למיזוג (פתוחים בלבד)", reply_markup=InlineKeyboardMarkup(keyboard)
)
[תיעוד]
async def show_confirm_merge_pr(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
pr_number = context.user_data.get("pr_to_merge")
if not (token and repo_name and pr_number):
await query.edit_message_text("❌ חסרים נתונים")
return
g = Github(token)
repo = g.get_repo(repo_name)
pr = repo.get_pull(pr_number)
try:
pr.update()
except Exception:
pass
checks = []
can_merge = True
try:
# Try to read permissions from repo API result
perms = repo.raw_data.get("permissions") if hasattr(repo, "raw_data") else None
if isinstance(perms, dict):
push_allowed = bool(perms.get("push"))
else:
push_allowed = True
checks.append(f"הרשאת push: {'כן' if push_allowed else 'לא'}")
if not push_allowed:
can_merge = False
except Exception:
pass
mergeable = pr.mergeable
mergeable_state = getattr(pr, "mergeable_state", None)
if mergeable is False:
can_merge = False
checks.append(f"מצב mergeable: {mergeable_state or ('כן' if mergeable else 'לא ידוע')}")
try:
statuses = list(repo.get_commit(pr.head.sha).get_statuses())
if statuses:
latest_state = statuses[0].state
checks.append(f"סטטוסים: {latest_state}")
except Exception:
pass
if getattr(pr, "draft", False):
checks.append("Draft: כן")
can_merge = False
else:
checks.append("Draft: לא")
try:
reviews = list(pr.get_reviews())
need_changes = any(r.state == 'CHANGES_REQUESTED' for r in reviews)
if need_changes:
checks.append("בקשות שינוי פתוחות: כן")
can_merge = False
except Exception:
pass
txt = (
f"למזג PR?\n"
f"#{pr.number}: <b>{safe_html_escape(pr.title)}</b>\n"
f"{pr.html_url}\n\n"
f"בדיקות לפני מיזוג:\n" + "\n".join(f"• {c}" for c in checks)
)
kb = []
kb.append([InlineKeyboardButton("🔄 רענן בדיקות", callback_data="refresh_merge_pr")])
if can_merge:
kb.append([InlineKeyboardButton("✅ אשר מיזוג", callback_data="confirm_merge_pr")])
kb.append([InlineKeyboardButton("🔙 חזור", callback_data="merge_pr_menu")])
await query.edit_message_text(
txt,
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
disable_web_page_preview=True,
)
[תיעוד]
async def confirm_merge_pr(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
pr_number = context.user_data.get("pr_to_merge")
if not (token and repo_name and pr_number):
await query.edit_message_text("❌ חסרים נתונים")
return
try:
g = Github(token)
repo = g.get_repo(repo_name)
pr = repo.get_pull(pr_number)
result = pr.merge(merge_method="merge")
if result.merged:
await query.edit_message_text(
f"✅ PR מוזג בהצלחה: <a href=\"{pr.html_url}\">#{pr.number}</a>",
parse_mode="HTML",
)
else:
await query.edit_message_text(f"❌ מיזוג נכשל: {result.message}")
except Exception as e:
await query.edit_message_text(f"❌ שגיאה במיזוג PR: {e}")
return
await self.show_pr_menu(update, context)
[תיעוד]
async def git_checkpoint(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_full = session.get("selected_repo")
token = self.get_user_token(user_id)
if not token or not repo_full:
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
# Acknowledge the callback early to avoid Telegram timeout spinner
try:
await query.answer("יוצר נקודת שמירה...", show_alert=False)
except Exception:
pass
try:
import datetime
g = Github(login_or_token=token)
repo = g.get_repo(repo_full)
branch_obj = repo.get_branch(repo.default_branch)
default_branch = branch_obj.name
sha = branch_obj.commit.sha
ts = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d-%H%M%S")
prefix = (config.GIT_CHECKPOINT_PREFIX or "checkpoint").strip()
# שמור על תווים חוקיים לשמות refs בסיסיים
prefix = re.sub(r"[^A-Za-z0-9._/-]+", "-", prefix)
base_name = f"{prefix}-{ts}"
tag_name = base_name
# Create lightweight tag by creating a ref refs/tags/<tag>
try:
repo.create_git_ref(ref=f"refs/tags/{tag_name}", sha=sha)
except GithubException as ge:
status = getattr(ge, 'status', None)
# נסה פעם נוספת עם סיומת SHA במקרה של התנגשויות בשם
if status == 422:
try:
tag_name = f"{base_name}-{sha[:7]}"
repo.create_git_ref(ref=f"refs/tags/{tag_name}", sha=sha)
except GithubException as ge2:
# fallback ל-branch
branch_name = base_name
try:
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=sha)
except GithubException as gbe:
if getattr(gbe, 'status', None) == 422:
branch_name = f"{base_name}-{sha[:7]}"
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=sha)
else:
raise ge # שמור על הודעת השגיאה המקורית של ה-tag
# הצלחת גיבוי לענף
text = (
f"✅ נוצר branch (Fallback): <code>{branch_name}</code> על <code>{default_branch}</code>\n"
f"סיבה: tag נחסם (HTTP {status or 'N/A'})\n"
f"SHA: <code>{sha[:7]}</code>\n"
f"שחזור מהיר: <code>git checkout {branch_name}</code>\n\n"
f"רוצה שאיצור עבורך קובץ הוראות לשחזור?"
)
kb = [
[InlineKeyboardButton("📝 צור קובץ הוראות", callback_data=f"git_checkpoint_doc:branch:{branch_name}")],
[InlineKeyboardButton("לא תודה", callback_data="git_checkpoint_doc_skip")],
]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
return
else:
# לא 422: עבור ישירות לגיבוי לענף
branch_name = base_name
try:
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=sha)
except GithubException as gbe:
if getattr(gbe, 'status', None) == 422:
branch_name = f"{base_name}-{sha[:7]}"
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=sha)
else:
raise ge
text = (
f"✅ נוצר branch (Fallback): <code>{branch_name}</code> על <code>{default_branch}</code>\n"
f"סיבה: יצירת tag נכשלה (HTTP {status or 'N/A'})\n"
f"SHA: <code>{sha[:7]}</code>\n"
f"שחזור מהיר: <code>git checkout {branch_name}</code>\n\n"
f"רוצה שאיצור עבורך קובץ הוראות לשחזור?"
)
kb = [
[InlineKeyboardButton("📝 צור קובץ הוראות", callback_data=f"git_checkpoint_doc:branch:{branch_name}")],
[InlineKeyboardButton("לא תודה", callback_data="git_checkpoint_doc_skip")],
]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
return
# הצלחת יצירת tag
text = (
f"✅ נוצר tag: <code>{tag_name}</code> על <code>{default_branch}</code>\n"
f"SHA: <code>{sha[:7]}</code>\n"
f"שחזור מהיר: <code>git checkout tags/{tag_name}</code>\n\n"
f"רוצה שאיצור עבורך קובץ הוראות לשחזור?"
)
kb = [
[InlineKeyboardButton("📝 צור קובץ הוראות", callback_data=f"git_checkpoint_doc:tag:{tag_name}")],
[InlineKeyboardButton("לא תודה", callback_data="git_checkpoint_doc_skip")],
]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
except GithubException as e:
status = getattr(e, 'status', None)
gh_message = ''
try:
gh_message = (e.data or {}).get('message') # type: ignore[attr-defined]
except Exception:
gh_message = str(e)
help_lines = [
"בדוק את הרשאות ה-Token שלך:",
"• לטוקן קלאסי: <b>repo</b> (גישה מלאה) או לכל הפחות <b>public_repo</b> לריפו ציבורי.",
"• לטוקן מסוג Fine-grained: תחת Repository permissions, תן <b>Contents: Read and write</b> ו-<b>Metadata: Read-only</b> לריפו.",
"• ודא שיש לך גישת כתיבה לריפו (לא רק לקריאה/פורק).",
"• בארגונים, ייתכן שנדרש לאשר את האפליקציה/הטוקן בארגון.",
]
extra = ""
if status in (403, 404):
extra = "\nייתכן שאין הרשאת כתיבה או שהטוקן מוגבל."
await query.edit_message_text(
f"❌ יצירת נקודת שמירה בגיט נכשלה (HTTP {status or 'N/A'}): <b>{safe_html_escape(gh_message)}</b>{extra}\n\n" +
"\n".join(help_lines),
parse_mode="HTML",
)
except Exception as e:
logger.error(f"Failed to create git checkpoint: {e}")
await query.edit_message_text(f"❌ יצירת נקודת שמירה בגיט נכשלה: {safe_html_escape(e)}", parse_mode="HTML")
[תיעוד]
async def show_pre_upload_check(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג בדיקות לפני העלאת קובץ שמור (הרשאות/קיום קובץ/ענף/תיקייה)."""
query = update.callback_query if hasattr(update, "callback_query") else None
user_id = (query.from_user.id if query else update.effective_user.id)
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
file_id = context.user_data.get("pending_saved_file_id")
if not (token and repo_name and file_id):
if query:
await query.edit_message_text("❌ חסרים נתונים (טוקן/ריפו/קובץ)")
else:
await update.message.reply_text("❌ חסרים נתונים (טוקן/ריפו/קובץ)")
return
from database import db
try:
from bson import ObjectId
file_data = db.collection.find_one({"_id": ObjectId(file_id), "user_id": user_id})
if not file_data:
if query:
await query.edit_message_text("❌ קובץ לא נמצא")
else:
await update.message.reply_text("❌ קובץ לא נמצא")
return
filename = file_data.get("file_name") or "file"
# Resolve target folder/branch (overrides take precedence)
override_folder = (context.user_data.get("upload_target_folder") or "").strip()
target_folder = override_folder if override_folder != "" else (session.get("selected_folder") or "")
g = Github(token)
repo = g.get_repo(repo_name)
override_branch = context.user_data.get("upload_target_branch")
default_branch = repo.default_branch or "main"
target_branch = override_branch or default_branch
# Build file path
if target_folder:
folder_clean = target_folder.strip("/")
file_path = f"{folder_clean}/{filename}"
else:
folder_clean = ""
file_path = filename
# Basic repo flags
archived = getattr(repo, "archived", False)
perms = repo.raw_data.get("permissions") if hasattr(repo, "raw_data") else None
push_allowed = True if not isinstance(perms, dict) else bool(perms.get("push"))
# Check if file exists on target branch
exists = False
try:
repo.get_contents(file_path, ref=target_branch)
exists = True
except Exception:
exists = False
# Build summary text
checks = []
checks.append(f"ענף יעד: {target_branch}")
checks.append(f"תיקייה: {folder_clean or 'root'}")
checks.append(f"הרשאת push: {'כן' if push_allowed else 'לא'}")
checks.append(f"Archived: {'כן' if archived else 'לא'}")
checks.append(f"הקובץ קיים כבר: {'כן (יעודכן)' if exists else 'לא (ייווצר חדש)'}")
txt = (
"בדיקות לפני העלאה:\n"
f"ריפו: <code>{repo_name}</code>\n"
f"קובץ: <code>{file_path}</code>\n\n"
+ "\n".join(f"• {c}" for c in checks)
)
# Build keyboard
kb = []
kb.append([InlineKeyboardButton("🌿 בחר ענף יעד", callback_data="choose_upload_branch")])
kb.append([InlineKeyboardButton("📂 בחר תיקיית יעד", callback_data="choose_upload_folder")])
kb.append([InlineKeyboardButton("➕ צור תיקייה חדשה", callback_data="upload_folder_create")])
kb.append([InlineKeyboardButton("🔄 רענן בדיקות", callback_data="refresh_saved_checks")])
if push_allowed and not archived:
kb.append([InlineKeyboardButton("✅ אשר והעלה", callback_data="confirm_saved_upload")])
kb.append([InlineKeyboardButton("🔙 חזור", callback_data="back_to_menu")])
if query:
await query.edit_message_text(txt, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
else:
await update.message.reply_text(txt, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
except Exception as e:
msg = f"❌ שגיאה בבדיקות לפני העלאה: {safe_html_escape(str(e))}"
if query:
await query.edit_message_text(msg, parse_mode="HTML")
else:
await update.message.reply_text(msg, parse_mode="HTML")
[תיעוד]
async def confirm_saved_upload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
# Proceed with the actual upload of the saved file after checks
file_id = context.user_data.get("pending_saved_file_id")
if not file_id:
await update.edit_message_text("❌ לא נמצא קובץ ממתין להעלאה")
else:
await self.handle_saved_file_upload(update, context, file_id)
[תיעוד]
async def refresh_saved_checks(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await self.show_pre_upload_check(update, context)
[תיעוד]
async def show_upload_branch_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_name = session.get("selected_repo")
if not (token and repo_name):
await query.edit_message_text("❌ חסרים נתונים")
return
g = Github(token)
repo = g.get_repo(repo_name)
branches = list(repo.get_branches())
page = context.user_data.get("upload_branches_page", 0)
page_size = 10
total_pages = max(1, (len(branches) + page_size - 1) // page_size)
page = min(max(0, page), total_pages - 1)
start = page * page_size
end = start + page_size
keyboard = []
for br in branches[start:end]:
keyboard.append([InlineKeyboardButton(f"🌿 {br.name}", callback_data=f"upload_select_branch:{br.name}")])
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"upload_branches_page_{page-1}"))
nav.append(InlineKeyboardButton(f"עמוד {page+1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav.append(InlineKeyboardButton("הבא ➡️", callback_data=f"upload_branches_page_{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="refresh_saved_checks")])
await query.edit_message_text("בחר ענף יעד להעלאה:", reply_markup=InlineKeyboardMarkup(keyboard))
[תיעוד]
async def show_upload_folder_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
# הצג את התיקייה הפעילה הנוכחית: עדיפות ל-override זמני מזרימת ההעלאה, אחרת התיקייה שנבחרה במפגש, אחרת root
current = (context.user_data.get("upload_target_folder") or session.get("selected_folder") or "root")
kb = [
[InlineKeyboardButton("📁 root (ראשי)", callback_data="upload_folder_root")],
[InlineKeyboardButton(f"📂 השתמש בתיקייה שנבחרה: {current}", callback_data="upload_folder_current")],
[InlineKeyboardButton("✏️ הזן נתיב ידנית", callback_data="upload_folder_custom")],
[InlineKeyboardButton("➕ צור תיקייה חדשה", callback_data="upload_folder_create")],
[InlineKeyboardButton("🔙 חזור", callback_data="refresh_saved_checks")],
]
await query.edit_message_text("בחר תיקיית יעד:", reply_markup=InlineKeyboardMarkup(kb))
[תיעוד]
async def ask_upload_folder(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
context.user_data["waiting_for_upload_folder"] = True
await query.edit_message_text(
"✏️ הקלד נתיב תיקייה יעד (למשל: src/utils או ריק ל-root).\nשלח טקסט חופשי עכשיו.")
[תיעוד]
async def create_checkpoint_doc(self, update: Update, context: ContextTypes.DEFAULT_TYPE, kind: str, name: str):
"""יוצר קובץ הוראות שחזור לנקודת שמירה ושולח ל-flow של העלאה"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_full = session.get("selected_repo") or ""
from datetime import datetime
# בנה תוכן Markdown
is_tag = (kind == "tag")
title = "# 🏷️ נקודת שמירה בגיט\n\n"
what = (f"נוצר tag בשם `{name}`" if is_tag else f"נוצר branch בשם `{name}`")
repo_line = f"בריפו: `{repo_full}`\n\n" if repo_full else "\n"
intro = (
f"{what}.\n{repo_line}"
"כך ניתן לשחזר לאותה נקודה במחשב המקומי:\n\n"
)
if is_tag:
commands = (
"1. עדכן תגיות מהריפו:\n\n"
"```bash\n"
"git fetch --tags\n"
"```\n\n"
"2. מעבר לקריאה בלבד ל-tag (מצב detached):\n\n"
f"```bash\n"
f"git checkout tags/{name}\n"
"```\n\n"
"3. לחזרה לענף הראשי לאחר מכן:\n\n"
"```bash\n"
"git checkout -\n"
"```\n"
)
else:
commands = (
"1. עדכן רפרנסים מהריפו:\n\n"
"```bash\n"
"git fetch origin\n"
"```\n\n"
"2. מעבר לענף שנוצר:\n\n"
f"```bash\n"
f"git checkout {name}\n"
"```\n"
)
notes = (
"\n> הערות:\n"
"> - נקודת שמירה היא רפרנס ל-commit (tag או branch).\n"
"> - ניתן למחוק את הקובץ הזה לאחר השחזור.\n"
)
content = title + intro + commands + notes
file_name = f"RESTORE_{name}.md"
# שמירה במסד והמשך ל-flow של העלאה
from database import db
doc = {
"user_id": user_id,
"file_name": file_name,
"content": content,
"programming_language": "markdown",
"description": "הוראות שחזור לנקודת שמירה",
"tags": ["checkpoint", "instructions"],
"version": 1,
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
"is_active": True,
}
try:
res = db.collection.insert_one(doc)
context.user_data["pending_saved_file_id"] = str(res.inserted_id)
# פתח את בדיקות ההעלאה (בחירת ענף/תיקייה ואישור)
await self.show_pre_upload_check(update, context)
except Exception as e:
await query.edit_message_text(f"❌ נכשל ביצירת קובץ הוראות: {safe_html_escape(str(e))}")
[תיעוד]
async def show_restore_checkpoint_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג רשימת תגיות נקודות שמירה לבחירה לשחזור"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
try:
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
try:
await query.answer("❌ חסר טוקן או ריפו נבחר", show_alert=True)
except Exception:
pass
return
try:
g = Github(token)
repo = g.get_repo(repo_full)
# משוך תגיות (נחתוך לכמות סבירה, למשל 100)
tags = list(repo.get_tags())[:100]
prefix = (config.GIT_CHECKPOINT_PREFIX or "checkpoint").strip()
# שמות חוקיים
prefix = re.sub(r"[^A-Za-z0-9._/-]+", "-", prefix)
checkpoint_tags = [t for t in tags if (t.name or "").startswith(prefix + "-")]
if not checkpoint_tags:
try:
await query.edit_message_text("ℹ️ לא נמצאו תגיות נקודת שמירה בריפו.")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
try:
await query.answer("ℹ️ לא נמצאו תגיות נקודת שמירה", show_alert=False)
except Exception:
pass
return
# עימוד
page = int(context.user_data.get("restore_tags_page", 0) or 0)
per_page = 10
total = len(checkpoint_tags)
total_pages = max(1, (total + per_page - 1) // per_page)
page = min(max(0, page), total_pages - 1)
start = page * per_page
end = start + per_page
page_tags = checkpoint_tags[start:end]
# בנה מקלדת
keyboard = []
for t in page_tags:
keyboard.append([InlineKeyboardButton(f"🏷 {t.name}", callback_data=f"restore_select_tag:{t.name}")])
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️ הקודם", callback_data=f"restore_tags_page_{page-1}"))
nav.append(InlineKeyboardButton(f"📄 {page+1}/{total_pages}", callback_data="noop"))
if page < total_pages - 1:
nav.append(InlineKeyboardButton("➡️ הבא", callback_data=f"restore_tags_page_{page+1}"))
if nav:
keyboard.append(nav)
keyboard.append([InlineKeyboardButton("🔙 חזור", callback_data="github_menu")])
try:
await query.edit_message_text(
"בחר תגית נקודת שמירה לשחזור:", reply_markup=InlineKeyboardMarkup(keyboard)
)
except BadRequest as br:
if "message is not modified" not in str(br).lower():
# פרסום הודעה חדשה כגיבוי
await query.message.reply_text(
"בחר תגית נקודת שמירה לשחזור:", reply_markup=InlineKeyboardMarkup(keyboard)
)
else:
try:
await query.answer("אין שינוי בתצוגה", show_alert=False)
except Exception:
pass
except Exception as e:
try:
await query.edit_message_text(f"❌ שגיאה בטעינת תגיות: {safe_html_escape(str(e))}")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
try:
await query.answer(f"❌ שגיאה בטעינת תגיות: {safe_html_escape(str(e))}", show_alert=True)
except Exception:
pass
[תיעוד]
async def show_restore_tag_actions(self, update: Update, context: ContextTypes.DEFAULT_TYPE, tag_name: str):
"""מציג פעולות אפשריות לשחזור מתגית נתונה"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_full = session.get("selected_repo")
if not repo_full:
await query.edit_message_text("❌ לא נבחר ריפו")
return
# הצג אפשרויות: צור קובץ הוראות / צור ענף מהתגית
text = (
f"🏷 תגית נבחרה: <code>{tag_name}</code>\n\n"
f"בחר פעולה לשחזור:"
)
kb = [
[InlineKeyboardButton("📝 צור קובץ הוראות", callback_data=f"git_checkpoint_doc:tag:{tag_name}")],
[InlineKeyboardButton("🌿 צור ענף מהתגית", callback_data=f"restore_branch_from_tag:{tag_name}")],
[InlineKeyboardButton("🔁 צור PR לשחזור (Revert)", callback_data=f"restore_revert_pr_from_tag:{tag_name}")],
[InlineKeyboardButton("🔙 חזור", callback_data="restore_checkpoint_menu")],
]
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
[תיעוד]
async def create_branch_from_tag(self, update: Update, context: ContextTypes.DEFAULT_TYPE, tag_name: str):
"""יוצר ענף חדש שמצביע ל-commit של התגית לשחזור נוח"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
try:
g = Github(token)
repo = g.get_repo(repo_full)
sha = None
# נסה להשיג SHA מה-ref של התגית
try:
ref = repo.get_git_ref(f"tags/{tag_name}")
sha = ref.object.sha
except GithubException:
# נפילה חזרה לחיפוש ברשימת תגיות
for t in repo.get_tags():
if t.name == tag_name:
sha = t.commit.sha
break
if not sha:
await query.edit_message_text("❌ לא נמצאה התגית המבוקשת")
return
# שם ברירת מחדל לענף שחזור
base_branch = re.sub(r"[^A-Za-z0-9._/-]+", "-", f"restore-{tag_name}")
branch_name = base_branch
# צור את ה-ref, עם ניסיון לשמור על ייחודיות
try:
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=sha)
except GithubException as gbe:
if getattr(gbe, 'status', None) == 422:
branch_name = f"{base_branch}-{sha[:7]}"
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=sha)
else:
raise
kb = [
[InlineKeyboardButton("🔀 פתח PR מהענף", callback_data=f"open_pr_from_branch:{branch_name}")],
[InlineKeyboardButton("🔙 חזור", callback_data="restore_checkpoint_menu")],
]
await query.edit_message_text(
f"✅ נוצר ענף שחזור: <code>{branch_name}</code> מתוך <code>{tag_name}</code>\n\n"
f"שחזור מקומי מהיר:\n"
f"<code>git fetch origin && git checkout {branch_name}</code>",
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
)
except Exception as e:
# הצג אפשרות להמשיך ליצירת PR לשחזור למרות הכישלון ביצירת ענף
try:
kb = [
[InlineKeyboardButton("🔁 צור PR לשחזור (Revert)", callback_data=f"restore_revert_pr_from_tag:{tag_name}")],
[InlineKeyboardButton("🔙 חזור", callback_data="restore_checkpoint_menu")],
]
await query.edit_message_text(
f"❌ שגיאה ביצירת ענף שחזור: {safe_html_escape(str(e))}\n\n"
f"תוכל עדיין ליצור PR לשחזור ישירות מהתגית <code>{tag_name}</code>.",
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
)
except Exception:
await query.edit_message_text(f"❌ שגיאה ביצירת ענף שחזור: {safe_html_escape(str(e))}")
[תיעוד]
async def open_pr_from_branch(self, update: Update, context: ContextTypes.DEFAULT_TYPE, branch_name: str):
"""פותח Pull Request מהענף שנוצר אל הענף הראשי של הריפו"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
try:
g = Github(token)
repo = g.get_repo(repo_full)
base_branch = repo.default_branch or "main"
owner_login = repo.owner.login if getattr(repo, "owner", None) else repo_full.split("/")[0]
# 1) אם כבר קיים PR פתוח מהענף הזה לבסיס – הצג אותו במקום ליצור חדש
try:
existing_prs = list(
repo.get_pulls(state="open", base=base_branch, head=f"{owner_login}:{branch_name}")
)
if existing_prs:
pr = existing_prs[0]
kb = [[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")]]
await query.edit_message_text(
f"ℹ️ כבר קיים PR פתוח מהענף <code>{branch_name}</code> ל-<code>{base_branch}</code>: "
f"<a href=\"{pr.html_url}\">#{pr.number}</a>",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(kb),
)
return
except Exception:
# נמשיך לנסות ליצור PR אם לא הצלחנו לבדוק קיום
pass
# 2) בדוק שיש הבדלים בין HEAD ל-base (אחרת GitHub יחזיר Validation Failed)
try:
cmp = repo.compare(base_branch, branch_name)
if getattr(cmp, "ahead_by", 0) == 0 and getattr(cmp, "behind_by", 0) == 0:
kb = [
[InlineKeyboardButton("↩️ בחר תגית אחרת", callback_data="restore_checkpoint_menu")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
await query.edit_message_text(
(
"❌ לא ניתן לפתוח PR: אין שינויים בין הענף "
f"<code>{branch_name}</code> ל- <code>{base_branch}</code>\n\n"
"נסה לבחור תגית אחרת לשחזור, או בצע שינוי/commit בענף לפני פתיחת PR."
),
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(kb),
)
return
except Exception:
# אם ההשוואה נכשלה, ננסה בכל זאת ליצור PR – ייתכן שהענף חדש מאוד
pass
# 3) צור PR
title = f"Restore from checkpoint: {branch_name}"
body = (
f"Automated PR to restore state from branch `{branch_name}`.\n\n"
f"Created via Telegram bot."
)
pr = repo.create_pull(title=title, body=body, head=branch_name, base=base_branch)
kb = [[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")]]
await query.edit_message_text(
f"✅ נפתח PR: <a href=\"{pr.html_url}\">#{pr.number}</a> ← <code>{base_branch}</code> ← <code>{branch_name}</code>",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(kb),
)
except GithubException as ge:
# פרשנות מפורטת יותר לשגיאות Validation Failed
message_text = "Validation Failed"
try:
data = ge.data or {}
if isinstance(data, dict):
# הודעת על
if data.get("message"):
message_text = data["message"]
# בדוק פירוט שגיאות נפוצות
errors = data.get("errors") or []
if isinstance(errors, list) and errors:
details = []
for err in errors:
# err יכול להיות dict עם מפתחות code/message
code = err.get("code") if isinstance(err, dict) else None
msg = err.get("message") if isinstance(err, dict) else None
if code == "custom" and msg:
details.append(msg)
elif msg:
details.append(msg)
if details:
message_text += ": " + "; ".join(details)
except Exception:
pass
# נסה לזהות במפורש "No commits between" או PR קיים ולהציע פתרון
lower_msg = (message_text or "").lower()
kb = [[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")]]
if "no commits between" in lower_msg or "no commits" in lower_msg:
kb.insert(0, [InlineKeyboardButton("↩️ בחר תגית אחרת", callback_data="restore_checkpoint_menu")])
await query.edit_message_text(
(
"❌ שגיאה בפתיחת PR: אין שינויים בין הענפים.\n\n"
f"ענף: <code>{branch_name}</code> → בסיס: <code>{base_branch}</code>\n\n"
"בחר נקודת שמירה מוקדמת יותר או בצע שינוי/commit בענף ואז נסה שוב."
),
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(kb),
)
return
if "already exists" in lower_msg or "a pull request already exists" in lower_msg:
# נסה למצוא את ה-PR הקיים ולהציג קישור
try:
prs = list(repo.get_pulls(state="open", base=base_branch, head=f"{owner_login}:{branch_name}"))
if prs:
pr = prs[0]
await query.edit_message_text(
f"ℹ️ כבר קיים PR פתוח: <a href=\"{pr.html_url}\">#{pr.number}</a>",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(kb),
)
return
except Exception:
pass
await query.edit_message_text(f"❌ שגיאה בפתיחת PR: {safe_html_escape(message_text)}", parse_mode="HTML")
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בפתיחת PR: {safe_html_escape(str(e))}")
[תיעוד]
async def create_revert_pr_from_tag(self, update: Update, context: ContextTypes.DEFAULT_TYPE, tag_name: str):
"""יוצר PR שמשחזר את מצב הריפו לתגית ע"י יצירת commit חדש עם עץ התגית על גבי base.
כך תמיד יהיה diff וה-PR ייפתח בהצלחה.
"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
try:
g = Github(token)
repo = g.get_repo(repo_full)
base_branch = repo.default_branch or "main"
logger.info("[create_revert_pr_from_tag] repo=%s base=%s tag=%s user=%s", repo_full, base_branch, tag_name, user_id)
# מצא את ה-SHA של עץ התגית (מתמודד גם עם תגיות מוכללות)
tag_tree_sha = None
try:
ref = repo.get_git_ref(f"tags/{tag_name}")
ref_obj = getattr(ref, "object", None)
ref_type = getattr(ref_obj, "type", None)
ref_sha = getattr(ref_obj, "sha", None)
logger.info("[create_revert_pr_from_tag] ref_type=%s ref_sha=%s", ref_type, ref_sha)
if ref_type == "commit" and ref_sha:
commit = repo.get_commit(ref_sha)
tag_tree_sha = commit.commit.tree.sha
elif ref_type == "tag" and ref_sha:
# תגית מוכללת — נפרק לאובייקט היעד
tag_obj = repo.get_git_tag(ref_sha)
logger.info("[create_revert_pr_from_tag] annotated tag sha=%s", ref_sha)
while getattr(getattr(tag_obj, "object", None), "type", None) == "tag":
logger.info("[create_revert_pr_from_tag] peeling nested tag sha=%s", tag_obj.object.sha)
tag_obj = repo.get_git_tag(tag_obj.object.sha)
target_type = getattr(tag_obj.object, "type", None)
target_sha = getattr(tag_obj.object, "sha", None)
logger.info("[create_revert_pr_from_tag] tag target_type=%s target_sha=%s", target_type, target_sha)
if target_type == "commit" and target_sha:
commit = repo.get_commit(target_sha)
tag_tree_sha = commit.commit.tree.sha
elif target_type == "tree" and target_sha:
tag_tree_sha = target_sha
elif ref_type == "tree" and ref_sha:
tag_tree_sha = ref_sha
except GithubException as ge:
logger.warning("[create_revert_pr_from_tag] get_git_ref failed: %s", getattr(ge, 'data', None) or str(ge))
pass
# נפילה ל-backup: מעבר על get_tags (עובד לרוב על תגיות קלילות)
if not tag_tree_sha:
logger.info("[create_revert_pr_from_tag] fallback to repo.get_tags() for %s", tag_name)
for t in repo.get_tags():
if t.name == tag_name:
try:
commit = repo.get_commit(t.commit.sha)
tag_tree_sha = commit.commit.tree.sha
logger.info("[create_revert_pr_from_tag] fallback resolved tree=%s via commit=%s", tag_tree_sha, t.commit.sha)
except Exception as inner_e:
logger.exception("[create_revert_pr_from_tag] fallback resolving tag failed: %s", inner_e)
break
if not tag_tree_sha:
await query.edit_message_text("❌ לא נמצאה התגית המבוקשת")
return
# צור ענף עבודה חדש משם ברור
safe_branch = re.sub(r"[^A-Za-z0-9._/-]+", "-", f"restore-from-{tag_name}")
work_branch = safe_branch
try:
base_sha = repo.get_branch(base_branch).commit.sha
logger.info("[create_revert_pr_from_tag] creating work branch=%s from base_sha=%s", work_branch, base_sha)
repo.create_git_ref(ref=f"refs/heads/{work_branch}", sha=base_sha)
except GithubException as gbe:
if getattr(gbe, 'status', None) == 422:
work_branch = f"{safe_branch}-{int(time.time())}"
base_sha = repo.get_branch(base_branch).commit.sha
logger.info("[create_revert_pr_from_tag] branch exists, retry with %s", work_branch)
repo.create_git_ref(ref=f"refs/heads/{work_branch}", sha=base_sha)
else:
raise
# צור commit חדש בעבודה עם tree של התגית והורה מה-base
base_head = repo.get_branch(base_branch).commit.sha
parent = repo.get_git_commit(base_head)
new_tree = repo.get_git_tree(tag_tree_sha)
new_commit_message = f"Restore repository state from tag {tag_name}"
logger.info("[create_revert_pr_from_tag] creating git commit on %s with tree=%s parent=%s", work_branch, tag_tree_sha, base_head)
new_commit = repo.create_git_commit(new_commit_message, new_tree, [parent])
# עדכן את ה-ref של הענף החדש ל-commit החדש
repo.get_git_ref(f"heads/{work_branch}").edit(new_commit.sha, force=True)
logger.info("[create_revert_pr_from_tag] updated ref heads/%s -> %s", work_branch, new_commit.sha)
# פתח PR
title = f"Restore to checkpoint: {tag_name}"
body = (
f"This PR restores the repository state to tag `{tag_name}` by creating a new commit with the tag's tree on top of `{base_branch}`.\n\n"
f"Created via Telegram bot."
)
pr = repo.create_pull(title=title, body=body, head=work_branch, base=base_branch)
kb = [[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")]]
await query.edit_message_text(
f"✅ נפתח PR: <a href=\"{pr.html_url}\">#{pr.number}</a> ← <code>{base_branch}</code> ← <code>{work_branch}</code>",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup(kb),
)
except GithubException as ge:
msg = "Validation Failed"
details = None
try:
data = ge.data or {}
if isinstance(data, dict) and data.get('message'):
msg = data['message']
details = json.dumps(data, ensure_ascii=False)
except Exception:
pass
logger.error("[create_revert_pr_from_tag] GithubException: %s data=%s", msg, details)
await query.edit_message_text(f"❌ שגיאה ביצירת PR לשחזור: {safe_html_escape(msg)}")
except Exception as e:
logger.exception("[create_revert_pr_from_tag] Unexpected error: %s", e)
await query.edit_message_text(f"❌ שגיאה ביצירת PR לשחזור: {safe_html_escape(str(e))}")
[תיעוד]
async def show_github_backup_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג תפריט גיבוי/שחזור עבור הריפו הנבחר"""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
try:
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
try:
await query.answer("❌ חסר טוקן או ריפו נבחר", show_alert=True)
except Exception:
pass
return
# כניסה לתפריט גיבוי/שחזור מתחילה זרם חדש – נקה נעילות/סטייטים קודמים
try:
context.user_data.pop("zip_restore_expected_repo_full", None)
context.user_data.pop("github_restore_zip_purge", None)
context.user_data.pop("pending_repo_restore_zip_path", None)
except Exception:
pass
# סמן הקשר כדי לאפשר סינון גיבויים לפי הריפו הנוכחי
context.user_data["github_backup_context_repo"] = repo_full
kb = [
[InlineKeyboardButton("📦 הורד גיבוי ZIP של הריפו", callback_data="download_zip:")],
[InlineKeyboardButton("♻️ שחזר ZIP לריפו (פריסה והחלפה)", callback_data="github_restore_zip_to_repo")],
[InlineKeyboardButton("📂 שחזר מגיבוי שמור לריפו", callback_data="github_restore_zip_list")],
[InlineKeyboardButton("🏷 נקודת שמירה בגיט", callback_data="git_checkpoint")],
[InlineKeyboardButton("↩️ חזרה לנקודת שמירה", callback_data="restore_checkpoint_menu")],
[InlineKeyboardButton("🗂 גיבויי DB אחרונים", callback_data="github_backup_db_list")],
[InlineKeyboardButton("♻️ שחזור מגיבוי (ZIP)", callback_data="backup_restore_full_start")],
[InlineKeyboardButton("ℹ️ הסבר על הכפתורים", callback_data="github_backup_help")],
[InlineKeyboardButton("🔙 חזור", callback_data="github_menu")],
]
try:
await query.edit_message_text(
f"🧰 תפריט גיבוי ושחזור לריפו:\n<code>{repo_full}</code>",
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
)
except BadRequest as br:
if "message is not modified" not in str(br).lower():
# פרסום הודעה חדשה כגיבוי
await query.message.reply_text(
f"🧰 תפריט גיבוי ושחזור לריפו:\n<code>{repo_full}</code>",
reply_markup=InlineKeyboardMarkup(kb),
parse_mode="HTML",
)
else:
try:
await query.answer("אין שינוי בתצוגה", show_alert=False)
except Exception:
pass
return
# Unreachable guard to satisfy linters if parser confuses block ends
if False and (query and query.data == "github_restore_zip_to_repo"):
pass
elif query.data.startswith("github_restore_zip_setpurge:"):
purge_flag = query.data.split(":", 1)[1] == "1"
# ודא שניקינו דגלים ישנים של העלאה רגילה כדי למנוע בלבול
context.user_data["waiting_for_github_upload"] = False
context.user_data["upload_mode"] = "github_restore_zip_to_repo"
context.user_data["github_restore_zip_purge"] = purge_flag
await query.edit_message_text(
("🧹 יבוצע ניקוי לפני העלאה. " if purge_flag else "🔁 ללא מחיקה. ") +
"שלח עכשיו קובץ ZIP לשחזור לריפו."
)
return
elif query.data == "github_restore_zip_list":
# הצג רשימת גיבויים (ZIP) של הריפו הנוכחי לצורך שחזור לריפו
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_full = session.get("selected_repo")
if not repo_full:
await query.edit_message_text("❌ קודם בחר ריפו!")
return
from file_manager import backup_manager
backups = backup_manager.list_backups(user_id)
# סנן רק גיבויים עם metadata של אותו ריפו
backups = [b for b in backups if getattr(b, 'repo', None) == repo_full]
if not backups:
await query.edit_message_text(
f"ℹ️ אין גיבויי ZIP שמורים עבור הריפו:\n<code>{repo_full}</code>",
parse_mode="HTML",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔙 חזור", callback_data="github_backup_menu")]])
)
return
# הצג עד 10 אחרונים
items = backups[:10]
lines = [f"בחר גיבוי לשחזור לריפו:\n<code>{repo_full}</code>\n"]
kb = []
for b in items:
lines.append(f"• {b.backup_id} — {b.created_at.strftime('%d/%m/%Y %H:%M')} — {int(b.total_size/1024)}KB")
kb.append([InlineKeyboardButton("♻️ שחזר גיבוי זה לריפו", callback_data=f"github_restore_zip_from_backup:{b.backup_id}")])
kb.append([InlineKeyboardButton("🔙 חזור", callback_data="github_backup_menu")])
await query.edit_message_text("\n".join(lines), reply_markup=InlineKeyboardMarkup(kb), parse_mode="HTML")
return
elif query.data.startswith("github_restore_zip_from_backup:"):
# קבל backup_id ואז פתח את תהליך השחזור-לריפו עם קובץ ה-ZIP הזה
backup_id = query.data.split(":", 1)[1]
user_id = query.from_user.id
from file_manager import backup_manager
info_list = backup_manager.list_backups(user_id)
match = next((b for b in info_list if b.backup_id == backup_id), None)
if not match or not match.file_path or not os.path.exists(match.file_path):
await query.edit_message_text("❌ הגיבוי לא נמצא בדיסק")
return
# הגדר purge? בקש בחירה
context.user_data["pending_repo_restore_zip_path"] = match.file_path
await query.edit_message_text(
"האם למחוק קודם את התוכן בריפו לפני העלאה?",
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("🧹 מחיקה מלאה לפני העלאה", callback_data="github_repo_restore_backup_setpurge:1")],
[InlineKeyboardButton("🚫 אל תמחק, רק עדכן", callback_data="github_repo_restore_backup_setpurge:0")],
[InlineKeyboardButton("❌ ביטול", callback_data="github_backup_menu")],
])
)
return
elif query.data.startswith("github_repo_restore_backup_setpurge:"):
# בצע את ההעלאה לריפו מתוך קובץ ה-ZIP שמור בדיסק
purge_flag = query.data.split(":", 1)[1] == "1"
zip_path = context.user_data.get("pending_repo_restore_zip_path")
if not zip_path or not os.path.exists(zip_path):
await query.edit_message_text("❌ קובץ ZIP לא נמצא")
return
# הפעל ריסטור לריפו דרך פונקציה חיצונית פשוטה שמתממשקת עם main.handle_document logic
try:
await query.edit_message_text("⏳ משחזר לריפו מגיבוי נבחר...")
# נשתמש בלוגיקה פשוטה: נקרא לפונקציה פנימית שתבצע את אותו זרם של שחזור לריפו
await self.restore_zip_file_to_repo(update, context, zip_path, purge_flag)
await query.edit_message_text("✅ השחזור הועלה לריפו בהצלחה")
except Exception as e:
await query.edit_message_text(f"❌ שגיאה בשחזור לריפו: {e}")
finally:
context.user_data.pop("pending_repo_restore_zip_path", None)
return
[תיעוד]
async def restore_zip_file_to_repo(self, update: Update, context: ContextTypes.DEFAULT_TYPE, zip_path: str, purge_first: bool) -> None:
"""שחזור קבצים מ-ZIP מקומי לריפו הנוכחי באמצעות Trees API (commit אחד)"""
user_id = update.effective_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo")
if not (token and repo_full):
raise RuntimeError("חסר טוקן או ריפו")
# חגורת בטיחות: אשר שהיעד תואם את היעד שננעל בתחילת ה-flow
expected = context.user_data.get("zip_restore_expected_repo_full")
if expected and expected != repo_full:
logger.critical(f"[restore_zip_from_backup] Target mismatch: expected={expected}, got={repo_full}. Aborting.")
raise ValueError(f"Target mismatch: expected {expected}, got {repo_full}")
if not expected:
try:
context.user_data["zip_restore_expected_repo_full"] = repo_full
except Exception:
pass
import zipfile
if not os.path.exists(zip_path) or not zipfile.is_zipfile(zip_path):
raise RuntimeError("ZIP לא תקין")
with zipfile.ZipFile(zip_path, 'r') as zf:
# סינון קבצי מערכת לא רלוונטיים
all_names = [n for n in zf.namelist() if not n.endswith('/')]
members = [n for n in all_names if not (n.startswith('__MACOSX/') or n.split('/')[-1].startswith('._'))]
# זיהוי תיקיית-שורש משותפת
top_levels = set()
for n in zf.namelist():
if '/' in n and not n.startswith('__MACOSX/'):
top_levels.add(n.split('/', 1)[0])
common_root = list(top_levels)[0] if len(top_levels) == 1 else None
logger.info(f"[restore_zip_from_backup] Detected common_root={common_root!r}, files_in_zip={len(members)}")
def strip_root(path: str) -> str:
if common_root and path.startswith(common_root + '/'):
return path[len(common_root) + 1:]
return path
files = []
for name in members:
raw = zf.read(name)
clean = strip_root(name)
if not clean:
continue
files.append((clean, raw))
g = Github(token)
repo = g.get_repo(repo_full)
target_branch = repo.default_branch or 'main'
base_ref = repo.get_git_ref(f"heads/{target_branch}")
base_commit = repo.get_git_commit(base_ref.object.sha)
base_tree = base_commit.tree
elements = []
for path, raw in files:
# כתוב blob מתאים: טקסט כ-utf-8, בינארי כ-base64
import base64
is_text = any(path.lower().endswith(ext) for ext in (
'.md', '.txt', '.json', '.yml', '.yaml', '.xml', '.gitignore', '.py', '.js', '.ts', '.tsx', '.css', '.scss', '.html', '.sh'
))
try:
if is_text:
content = raw.decode('utf-8')
blob = repo.create_git_blob(content, 'utf-8')
else:
b64 = base64.b64encode(raw).decode('ascii')
blob = repo.create_git_blob(b64, 'base64')
except Exception:
b64 = base64.b64encode(raw).decode('ascii')
blob = repo.create_git_blob(b64, 'base64')
elements.append(InputGitTreeElement(path=path, mode='100644', type='blob', sha=blob.sha))
if purge_first:
# Soft purge: יצירת עץ חדש ללא בסיס (מוחק קבצים שאינם ב-ZIP)
new_tree = repo.create_git_tree(elements)
else:
new_tree = repo.create_git_tree(elements, base_tree)
commit_message = f"Restore from ZIP via bot: replace {'with purge' if purge_first else 'update only'}"
new_commit = repo.create_git_commit(commit_message, new_tree, [base_commit])
base_ref.edit(new_commit.sha)
logger.info(f"[restore_zip_from_backup] Restore commit created: {new_commit.sha}, files_added={len(elements)}, purge={purge_first}")
# ניקוי סטייט הגנה אחרי הצלחה
try:
context.user_data.pop("zip_restore_expected_repo_full", None)
except Exception:
pass