Conversation Handlers & States
סקירה
מסמך זה מרכז את הזרימות העיקריות של ה‑ConversationHandlers וה‑states.
רשימת States (מבחר בפועל)
# handlers/states.py
GET_CODE, GET_FILENAME, GET_NOTE, EDIT_CODE, EDIT_NAME, WAIT_ADD_CODE_MODE, LONG_COLLECT = range(7)
# ספריית סניפטים – זרימת הגשה
CL_COLLECT_TITLE, CL_COLLECT_DESCRIPTION, CL_COLLECT_URL, CL_COLLECT_LOGO = range(7, 11)
SN_COLLECT_TITLE, SN_COLLECT_DESCRIPTION, SN_COLLECT_CODE, SN_COLLECT_LANGUAGE = range(11, 15)
SN_REJECT_REASON = 15
SN_LONG_COLLECT = 16
# github_menu_handler.py (שיחות העלאה)
REPO_SELECT, FILE_UPLOAD, FOLDER_SELECT = range(3)
Save Flow (תרשים מצבים)
stateDiagram-v2
[*] --> WAITING_FOR_FILENAME: /save
WAITING_FOR_FILENAME --> WAITING_FOR_CODE: שם קובץ התקבל
WAITING_FOR_CODE --> WAITING_FOR_NOTE: קוד התקבל
WAITING_FOR_NOTE --> [*]: הערה התקבלה/דילוג
WAITING_FOR_FILENAME --> [*]: /cancel
WAITING_FOR_CODE --> [*]: /cancel
WAITING_FOR_NOTE --> [*]: /cancel
GitHub Flow (תרשים מצבים)
ספריית סניפטים – זרימת הגשה (Bot)
שלב |
תיאור |
|---|---|
בחירת מצב |
”🧩 קוד רגיל“ או ”✍️ קוד ארוך“ (מצב איסוף רב־חלקי, סיום ב־/done) |
SN_COLLECT_TITLE |
קבלת כותרת (3–180) |
SN_COLLECT_DESCRIPTION |
קבלת תיאור קצר (עד 1000) |
SN_COLLECT_CODE / SN_LONG_COLLECT |
קבלת קוד (חד־חלקי או רב־חלקי) |
SN_COLLECT_LANGUAGE |
קבלת שפה (python/js/…) |
ביטול הוגן
כפתור
❌ ביטולמסיים את הזרימה ומנקהcontext.user_data.לחיצה על כל כפתור אחר בזמן הזרימה מבטלת אוטומטית את ההקשר (אין ”הידבקות“ של כפתורים לטקסט הקלט).
התראות אדמין והודעת משתמש
אדמין מקבל Inline Keyboard לאישור/דחייה + כפתור ”👁️ הצג סניפט“ (ווב־אפ).
המשתמש מקבל הודעה ידידותית בעת אישור/דחייה (כולל סיבת דחייה אם צוינה).
stateDiagram-v2
[*] --> GITHUB_MENU: /github
GITHUB_MENU --> GITHUB_REPO_SELECT: "בחר ריפו"
GITHUB_REPO_SELECT --> GITHUB_FILE_BROWSE: ריפו נבחר
GITHUB_FILE_BROWSE --> GITHUB_FILE_BROWSE: ניווט בתיקיות
GITHUB_FILE_BROWSE --> [*]: קובץ נשמר
GITHUB_MENU --> [*]: חזרה
GITHUB_REPO_SELECT --> GITHUB_MENU: חזרה
GITHUB_FILE_BROWSE --> GITHUB_REPO_SELECT: חזרה
דוגמת Handler תמציתית
save_conversation = ConversationHandler(
entry_points=[CommandHandler('save', start_save)],
states={
States.WAITING_FOR_FILENAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, receive_filename)],
States.WAITING_FOR_CODE: [MessageHandler(filters.TEXT & ~filters.COMMAND, receive_code)],
States.WAITING_FOR_NOTE: [
MessageHandler(filters.TEXT & ~filters.COMMAND, receive_note),
CallbackQueryHandler(skip_note, pattern='^skip_note$')
],
},
fallbacks=[CommandHandler('cancel', cancel)],
name="save_conversation",
persistent=True,
)
טבלת זרימות מלאה
פקודה / טריגר |
Entry Point |
States מרכזיים |
Handlers עיקריים |
קישורי קוד |
|---|---|---|---|---|
|
|
|
|
save_flow.py (start_save_flow) · get_code · get_filename · get_note · save_file_final |
|
|
|
|
|
|
|
— (דיאלוג כפתורים באמצעות |
|
|
|
|
— (דיאלוג כפתורים באמצעות |
|
הערות וקישורי API רלוונטיים:
שירותים:
services.code_service,services.github_service,services.backup_service,services.google_drive_serviceשיחת שמירה מרכזית: conversation_handlers.get_save_conversation_handler
פקודת /save (זרימה מקוצרת): main.py: save_command
קישורים
דוגמאות קוד תמציתיות (ממשיות)
Save – שלבים עיקריים
[
InlineKeyboardButton("📚 היסטוריה", callback_data=f"versions_file_{filename}"),
InlineKeyboardButton("📥 הורד", callback_data=f"download_direct_{filename}"),
],
[
InlineKeyboardButton("🗑️ מחק", callback_data=f"delete_direct_{filename}"),
],
[
InlineKeyboardButton("🔗 שתף קוד", callback_data=f"share_menu_id:{fid}") if fid else InlineKeyboardButton("🔗 שתף קוד", callback_data="share_menu_id:"),
],
[
InlineKeyboardButton("🔙 לרשימה", callback_data="files"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
'user_id': update.effective_user.id,
},
name=jid,
job_kwargs={
'id': jid,
'replace_existing': True,
}
)
context.user_data['long_collect_job'] = job
except Exception as e:
try:
emit_event(
"long_collect_schedule_timeout_failed",
severity="anomaly",
operation="save_flow.schedule_timeout",
handled=True,
user_id=getattr(getattr(update, 'effective_user', None), 'id', None),
error=str(e),
)
except Exception:
pass
logger.warning(f"Failed scheduling timeout: {e}")
async def long_collect_timeout_job(context: ContextTypes.DEFAULT_TYPE):
"""קריאת טיימאאוט: מסכם ומתקדם לפי חלקים שנאספו."""
chat_id = data.get('chat_id')
user_id = data.get('user_id')
# שליפת נתוני המשתמש
parts = context.user_data.get('long_collect_parts') or []
if not parts:
await context.bot.send_message(
chat_id=chat_id,
text="⏳ מצב איסוף הסתיים אוטומטית לאחר 15 דקות ללא פעילות.\nלא נאספו חלקים, ולכן המצב נסגר."
)
context.user_data.pop('long_collect_active', None)
_cancel_long_collect_timeout(context)
return
# סמן נעילה כדי למנוע הוספה נוספת
context.user_data['long_collect_locked'] = True
total_bytes = _get_total_bytes(parts)
await context.bot.send_message(
chat_id=chat_id,
text=(
f"⏳ מצב איסוף נסגר לאחר חוסר פעילות.\n"
f"✅ נאספו {len(parts)} חלקים (סה""כ ~{total_bytes // 1024}KB).\n"
f"שלח/י /done לאיחוד לקובץ אחד או /cancel לביטול."
)
)
# נשארים בסטייט, אך נעולים להוספה נוספת עד /done או /cancel
except Exception as e:
try:
emit_event(
"long_collect_timeout_job_failed",
severity="anomaly",
operation="save_flow.timeout_job",
handled=True,
error=str(e),
pass
logger.warning(f"Timeout job failed: {e}")
async def start_save_flow(update, context: ContextTypes.DEFAULT_TYPE) -> int:
cancel_markup = InlineKeyboardMarkup([[InlineKeyboardButton("❌ ביטול", callback_data="cancel")]])
# תמיכה גם בקריאה מתוך callback וגם מתוך הודעת טקסט
target_msg = getattr(update, "message", None)
if target_msg is None and getattr(update, "callback_query", None) is not None:
target_msg = update.callback_query.message
"📝 שלח לי את קטע הקוד המבריק שלך.\n"
"💡 אני אזהה את השפה אוטומטית ואארגן הכל!",
reply_markup=cancel_markup,
parse_mode='Markdown',
)
return GET_CODE
async def start_add_code_menu(update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""תפריט בחירת מצב הוספת קוד: רגיל או איסוף ארוך"""
keyboard = [
[InlineKeyboardButton("🧩 קוד רגיל", callback_data="add_code_regular")],
[InlineKeyboardButton("✍️ איסוף קוד ארוך", callback_data="add_code_long")],
[InlineKeyboardButton("❌ ביטול", callback_data="cancel")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"איך תרצו להוסיף קוד?",
reply_markup=reply_markup
)
return WAIT_ADD_CODE_MODE
async def start_long_collect(update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""כניסה למצב איסוף קוד ארוך"""
# איפוס/אתחול רשימת החלקים
context.user_data['long_collect_parts'] = []
context.user_data['long_collect_active'] = True
context.user_data['long_collect_locked'] = False
await update.callback_query.answer()
await update.callback_query.message.reply_text(
"נכנסתי למצב איסוף קוד ✍️\n"
"שלח/י את חלקי הקוד בהודעות נפרדות.\n"
"כשתסיים/י, שלח/י /done כדי לאחד את הכל לקובץ אחד.\n"
"אפשר גם /cancel לביטול."
)
_schedule_long_collect_timeout(update, context)
return LONG_COLLECT
async def long_collect_receive(update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""קבלת חלק קוד נוסף במצב איסוף"""
# אם מצב נעול בעקבות טיימאאוט, למנוע הוספה
if context.user_data.get('long_collect_locked'):
await update.message.reply_text("מצב האיסוף נעול לאחר חוסר פעילות. שלח/י /done או /cancel.")
return LONG_COLLECT
# התעלמות מתכנים שאינם טקסט או מסמכי טקסט
if update.message.document:
doc = update.message.document
GitHub – תפריט ושיחת העלאה
idx_map[idx] = safe_path
return f"{prefix}_i:{idx}"
def _get_path_from_cb(self, context: ContextTypes.DEFAULT_TYPE, data: str, prefix: str) -> str:
"""שחזור נתיב מתוך callback_data רגיל או ממופה (_i:)."""
try:
if data.startswith(prefix + ":"):
return data.split(":", 1)[1]
if data.startswith(prefix + "_i:"):
idx = data.split(":", 1)[1]
m = context.user_data.get("browse_idx_map") or {}
return m.get(idx, "")
except Exception:
return ""
return ""
async def _render_file_view(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""מציג דף תצוגה חלקית של קובץ עם כפתורי 'הצג עוד', 'הורד', 'חזרה'."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
repo_name = session.get("selected_repo") or "repo"
path = context.user_data.get("view_file_path") or ""
data = context.user_data.get("view_file_text") or ""
page = int(context.user_data.get("view_page_index", 0))
# חישוב חלוקה לשורות
lines = data.splitlines()
total_lines = len(lines)
start = page * VIEW_LINES_PER_PAGE
end = min(start + VIEW_LINES_PER_PAGE, total_lines)
chunk = "\n".join(lines[start:end])
# טקסט לתצוגה + גודל ושפה מזוהה
size_bytes = int(context.user_data.get("view_file_size", 0) or 0)
lang = context.user_data.get("view_detected_language") or "text"
header = (
f"📄 תצוגת קובץ\n"
f"📁 <code>{safe_html_escape(repo_name)}</code>\n"
f"📄 <code>{safe_html_escape(path)}</code>\n"
f"🔤 שפה: <code>{safe_html_escape(lang)}</code> | 💾 גודל: <code>{format_bytes(size_bytes)}</code>\n"
f"שורות {start+1}-{end} מתוך {total_lines}\n\n"
)
# בניית מקלדת
rows = [[InlineKeyboardButton("🔙 חזרה", callback_data="view_back")],
[InlineKeyboardButton("⬇️ הורד", callback_data=self._mk_cb(context, "browse_select_download", path))]]
# כפתור שיתוף קישור לקובץ – רק במסך התצוגה (לא ברשימה)
rows.append([InlineKeyboardButton("🔗 שתף קישור", callback_data=self._mk_cb(context, "share_selected_links_single", path))])
if end < total_lines:
rows.append([InlineKeyboardButton("הצג עוד ⤵️", callback_data="view_more")])
try:
# הדגשת תחביר קיימת במודול code_processor.highlight_code; נשתמש בה ואז ננקה ל-Telegram
try:
from services import code_service as code_processor
# פורמט שמירת שורות כברירת מחדל
lower_path = (path or '').lower()
# אם YAML – נסה צביעה ישירה, אחרת כללי
if lower_path.endswith('.yml') or lower_path.endswith('.yaml'):
try:
highlighted_html = code_processor.highlight_code(chunk, 'yaml')
body = highlighted_html or f"<pre>{safe_html_escape(chunk)}</pre>"
except Exception:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
else:
# שמירת שורות בכוח עבור סוגי קבצים רגישים לעיצוב
force_pre_exts = ('.md', '.markdown', '.py')
if lower_path.endswith(force_pre_exts):
body = f"<pre>{safe_html_escape(chunk)}</pre>"
else:
# נסה היילייט; אם יוצרת בלגן, fallback ל-pre
try:
highlighted_html = code_processor.highlight_code(chunk, lang)
if not highlighted_html or '\n' not in chunk:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
else:
body = highlighted_html
except Exception:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
except Exception:
body = f"<pre>{safe_html_escape(chunk)}</pre>"
await query.edit_message_text(header + body, parse_mode="HTML", reply_markup=InlineKeyboardMarkup(rows))
except BadRequest as br:
if "message is not modified" not in str(br).lower():
raise
async def show_import_branch_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""מציג בחירת ענף לייבוא ריפו (עימוד)."""
query = update.callback_query
user_id = query.from_user.id
session = self.get_user_session(user_id)
token = self.get_user_token(user_id)
repo_full = session.get("selected_repo") or ""
if not (token and repo_full):
await query.edit_message_text("❌ חסר טוקן או ריפו נבחר")
return
# הודעת טעינה בזמן שליפת הענפים
try:
await TelegramUtils.safe_edit_message_text(query, "⏳ טוען רשימת ענפים…")
emit_internal_alert = None # type: ignore
if emit_internal_alert is None:
return False
# שומרים את הטקסט בתור summary; פרטים נוספים (כמו רשימת אדמינים) רק להקשר.
# NOTE: לא מעבירים token/chat_id וכד' כדי לא להדליף מידע רגיש.
admin_ids = get_admin_ids()
emit_internal_alert(
"admin_notification",
severity="info",
summary=str(text or ""),
source="main.notify_admins",
admin_ids=admin_ids,
)
return True
except Exception:
return False
async def _send_direct_admins(context: ContextTypes.DEFAULT_TYPE, text: str) -> bool:
Backup – תפריט
text = build_button_text(prim_use, version_text, rating_text)
if len(text) <= MAX_LEN:
return text
# 1) קצר עוד את השם העיקרי
for limit in (24, 20, 16, 12, 10, 8):
prim_use = _truncate_middle(primary, limit)
text = build_button_text(prim_use, version_text, rating_text)
if len(text) <= MAX_LEN:
return text
# 2) נסה ללא דירוג
text = build_button_text(prim_use, version_text, "")
if len(text) <= MAX_LEN:
return text
# 3) נפילה סופית: שם קצר מאוד עם גרסה
return build_button_text(_truncate_middle(primary, 4), version_text, "")
class BackupMenuHandler:
"""תפריט גיבוי ושחזור מלא + נקודות שמירה בגיט"""
def __init__(self):
self.user_sessions: Dict[int, Dict[str, Any]] = {}
def _get_session(self, user_id: int) -> Dict[str, Any]:
if user_id not in self.user_sessions:
self.user_sessions[user_id] = {}
return self.user_sessions[user_id]
def _get_cached_backup(self, context: ContextTypes.DEFAULT_TYPE, user_id: int, backup_id: str) -> Optional[SimpleNamespace]:
try:
cache = context.user_data.get("_recent_backups", {})
except Exception:
return None
data = cache.get(backup_id)
if not data:
return None
# ייתכן שהקובץ המקומי עדיין לא נוצר (בייחוד כשהאחסון הוא GridFS והעותק המקומי נשלף לפי דרישה).
# אל תכשיל תצוגת פרטי הגיבוי — נוודא קיום קובץ רק בעת הורדה בפועל.
file_path = data.get("file_path") or ""
created_at_raw = data.get("created_at")
created_at_dt: datetime
if isinstance(created_at_raw, datetime):
created_at_dt = created_at_raw if created_at_raw.tzinfo else created_at_raw.replace(tzinfo=timezone.utc)
elif isinstance(created_at_raw, str):
try:
created_at_dt = datetime.fromisoformat(created_at_raw)
if created_at_dt.tzinfo is None:
created_at_dt = created_at_dt.replace(tzinfo=timezone.utc)
except Exception:
created_at_dt = datetime.now(timezone.utc)
else:
created_at_dt = datetime.now(timezone.utc)
try:
total_size = int(data.get("total_size") or 0)
except Exception:
total_size = 0
try:
file_count = int(data.get("file_count") or 0)
except Exception:
file_count = 0
return SimpleNamespace(
backup_id=backup_id,
user_id=user_id,
created_at=created_at_dt,
file_count=file_count,
total_size=total_size,
backup_type=data.get("backup_type", "github_repo_zip"),
Drive – תפריט
await ctx.bot.send_message(
chat_id=uid,
text="❌ הגיבוי האוטומטי נכשל — נדרש להתחבר מחדש ל‑Google Drive.",
reply_markup=InlineKeyboardMarkup(kb)
)
try:
emit_event("drive_scheduled_backup_auth_required", severity="warn", user_id=int(uid))
except Exception:
pass
except Exception:
pass
# Update next-run prefs (same behavior as legacy flow, but kept inside tracking)
try:
now_dt = datetime.now(timezone.utc)
next_dt = now_dt + timedelta(seconds=seconds)
update_prefs = {"last_backup_at": now_dt.isoformat(), "schedule_next_at": next_dt.isoformat()}
if ok:
update_prefs["last_full_backup_at"] = now_dt.isoformat()
try:
from src.infrastructure.composition import get_files_facade # type: ignore
get_files_facade().save_drive_prefs(uid, update_prefs)
except Exception:
pass
# עדכן גם על ה-Job עצמו עבור תצוגת סטטוס
try:
setattr(ctx.job, "next_t", next_dt)
except Exception:
pass
try:
emit_event(
"drive_scheduled_backup_update_prefs",
severity="info",
user_id=int(uid),
next_at=str(update_prefs.get("schedule_next_at")),
last_at=str(update_prefs.get("last_backup_at")),
last_full_at=str(update_prefs.get("last_full_backup_at") or "")
)
except Exception:
pass
except Exception as e:
logger.exception("drive_scheduled_backup_update_prefs_failed")
try:
emit_event("drive_scheduled_backup_update_prefs_failed", severity="error", user_id=int(uid), error=str(e))
except Exception:
pass
# Mark failure via context manager (do NOT call tracker.fail_run here)
if not ok:
raise RuntimeError("drive_scheduled_backup_failed")
except JobAlreadyRunningError:
try:
tracker.record_skipped(
job_id=job_id,
trigger="scheduled",
user_id=int(uid),
reason="already_running",
)
except Exception:
pass
return
except RuntimeError as e:
# Normal failure path (ok=False) was already recorded by the tracker.
if str(e) == "drive_scheduled_backup_failed":
return
raise
except Exception as e:
# Unexpected error: keep best-effort legacy event for debugging
try:
מוקשים נפוצים ופתרונות
query.answer()
# ❌ טעות נפוצה: חסר await
query.answer()
# ✅ נכון
await query.answer()
עריכת הודעות בבטחה – ”Message is not modified“
השתמשו ב־TelegramUtils.safe_edit_message_text / safe_edit_message_reply_markup כדי לבלוע את החריגה הספציפית בלבד.
from utils import TelegramUtils
await TelegramUtils.safe_edit_message_text(query, "טקסט", reply_markup=kb)
Filters
במכונות מצבים: המנעו מללכוד פקודות בתור טקסט חופשי.
MessageHandler(filters.TEXT & ~filters.COMMAND, get_code)
Persistent data (context.user_data)
שמרו דגלים זמניים ובנקודות מעבר (למשל ב־GitHub paste/upload):
context.user_data["waiting_for_paste_content"] = True
context.user_data.pop("waiting_for_paste_filename", None)
דוגמאות טסטים קצרות
Save – התחלה וזרימה בסיסית
import pytest
@pytest.mark.asyncio
async def test_save_flow_happy_path(mock_update, mock_context):
# Arrange – התחלת זרימה
from handlers import save_flow
state = await save_flow.start_save_flow(mock_update, mock_context)
assert state == save_flow.GET_CODE
# Act – קבלת קוד ושם
mock_update.message.text = "print('hi')\n"
state = await save_flow.get_code(mock_update, mock_context)
assert state == save_flow.GET_FILENAME
mock_update.message.text = "script.py"
state = await save_flow.get_filename(mock_update, mock_context)
assert state == save_flow.GET_NOTE
mock_update.message.text = "תיאור"
state = await save_flow.get_note(mock_update, mock_context)
# Assert – השיחה מסתיימת לאחר שמירה מוצלחת
from telegram.ext import ConversationHandler
assert state in (ConversationHandler.END,)
GitHub – upload_conv_handler
@pytest.mark.asyncio
async def test_github_upload_conversation(mock_update_cb, mock_context):
# סימולציית כניסה למסך העלאה ובדיקת רישום ה-handlers
from main import create_application
app = create_application("dummy")
# קיימים states FILE_UPLOAD/REPO_SELECT/FOLDER_SELECT – אין צורך להריץ בוט אמיתי בטסט זה
assert any("ConversationHandler" in type(h).__name__ for h in app.handlers)