דוגמאות שימוש
דף זה מכיל דוגמאות קוד לשימוש ב-API של Code Keeper Bot.
שימוש בסיסי
יצירת אפליקציית בוט
from main import CodeKeeperBot
from config import config
# יצירת מופע של הבוט
bot = CodeKeeperBot()
# הפעלת הבוט
bot.run()
שמירת קוד חדש
from database import db, CodeSnippet
# יצירת snippet חדש
snippet = CodeSnippet(
user_id=123456789,
file_name="example.py",
code="print('Hello, World!')",
programming_language="python",
note="דוגמה ראשונה"
)
# שמירה במסד הנתונים
result = await db.save_snippet(snippet)
print(f"נשמר עם ID: {result.inserted_id}")
חיפוש קוד
from database import db
# חיפוש לפי טקסט
results = await db.search_snippets(
user_id=123456789,
search_term="Hello"
)
for snippet in results:
print(f"נמצא: {snippet['file_name']}")
שימוש ב-Handlers
יצירת Command Handler
from telegram.ext import CommandHandler
async def my_command(update, context):
"""פקודה מותאמת אישית"""
await update.message.reply_text(
"שלום! זו פקודה מותאמת אישית"
)
# הוספת הפקודה לבוט
handler = CommandHandler("mycommand", my_command)
application.add_handler(handler)
יצירת Conversation Handler
from telegram.ext import ConversationHandler, MessageHandler, filters
# מצבי שיחה
WAITING_FOR_CODE = 1
WAITING_FOR_NAME = 2
async def start_save(update, context):
await update.message.reply_text("שלח לי את הקוד:")
return WAITING_FOR_CODE
async def receive_code(update, context):
context.user_data['code'] = update.message.text
await update.message.reply_text("מה שם הקובץ?")
return WAITING_FOR_NAME
async def receive_name(update, context):
context.user_data['filename'] = update.message.text
# שמור את הקוד...
await update.message.reply_text("נשמר בהצלחה!")
return ConversationHandler.END
# יצירת ה-handler
conv_handler = ConversationHandler(
entry_points=[CommandHandler('save', start_save)],
states={
WAITING_FOR_CODE: [MessageHandler(filters.TEXT, receive_code)],
WAITING_FOR_NAME: [MessageHandler(filters.TEXT, receive_name)],
},
fallbacks=[]
)
שימוש ב-Services
זיהוי שפת תכנות
from services import code_service
code = '''
def hello():
print("Hello, World!")
'''
language = code_service.detect_language(code, "test.py")
print(f"השפה שזוהתה: {language}") # python
ניתוח קוד
from services import code_service
analysis = code_service.analyze_code(code, "python")
print(f"מספר שורות: {analysis['lines']}")
print(f"מורכבות: {analysis['complexity']}")
אינטגרציה עם GitHub
העלאת קוד ל-Gist
from github_menu_handler import GitHubMenuHandler
github = GitHubMenuHandler()
# יצירת Gist
gist_url = await github.create_gist(
filename="example.py",
content="print('Hello from Gist!')",
description="דוגמה לקוד Python",
public=True
)
print(f"Gist נוצר: {gist_url}")
שליפת Gists של משתמש
gists = await github.get_user_gists(username="octocat")
for gist in gists:
print(f"- {gist['description']}: {gist['html_url']}")
עבודה עם מסד הנתונים
ביצוע שאילתות מורכבות
from database import db
from datetime import datetime, timedelta
# חיפוש קבצים מהשבוע האחרון
week_ago = datetime.now() - timedelta(days=7)
recent_files = db.db.code_snippets.find({
"user_id": 123456789,
"created_at": {"$gte": week_ago}
}).sort("created_at", -1)
async for file in recent_files:
print(f"{file['file_name']} - {file['created_at']}")
עדכון קבצים
# עדכון הערה לקובץ
result = db.db.code_snippets.update_one(
{"_id": file_id},
{"$set": {"note": "הערה מעודכנת"}}
)
if result.modified_count > 0:
print("עודכן בהצלחה")
סטטיסטיקות
from database import db
# קבלת סטטיסטיקות משתמש
stats = await db.get_user_statistics(user_id=123456789)
print(f"סה״כ קבצים: {stats['total_files']}")
print(f"שפה פופולרית: {stats['most_used_language']}")
print(f"גודל כולל: {stats['total_size']} bytes")
טיפול בשגיאות
טיפול בשגיאות בסיסי
from telegram.error import TelegramError
import logging
logger = logging.getLogger(__name__)
async def safe_handler(update, context):
try:
# קוד שעלול להיכשל
result = await risky_operation()
await update.message.reply_text(f"הצלחה: {result}")
except TelegramError as e:
logger.error(f"Telegram error: {e}")
await update.message.reply_text("אירעה שגיאה, נסה שוב")
except Exception as e:
logger.exception("Unexpected error")
await update.message.reply_text("משהו השתבש 😕")
Retry Logic
import asyncio
from typing import Optional
async def with_retry(func, max_attempts=3, delay=1):
"""ביצוע פונקציה עם ניסיונות חוזרים"""
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
if attempt == max_attempts - 1:
raise
await asyncio.sleep(delay * (attempt + 1))
# שימוש
result = await with_retry(
lambda: db.save_snippet(snippet),
max_attempts=3
)
בדיקות
בדיקת יחידה
import pytest
from services import code_service
def test_language_detection():
"""בדיקת זיהוי שפה"""
test_cases = [
("print('hello')", "test.py", "python"),
("console.log('hi')", "test.js", "javascript"),
("SELECT * FROM users", "query.sql", "sql"),
]
for code, filename, expected in test_cases:
result = code_service.detect_language(code, filename)
assert result == expected
בדיקת אינטגרציה
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_save_command():
"""בדיקת פקודת השמירה"""
# יצירת mock objects
update = AsyncMock()
context = AsyncMock()
# הגדרת התנהגות
update.message.text = "/save"
update.effective_user.id = 123456789
# הפעלת הפקודה
from bot_handlers import save_command
result = await save_command(update, context)
# בדיקת תוצאה
assert update.message.reply_text.called
assert "שלח לי" in update.message.reply_text.call_args[0][0]
דוגמאות מתקדמות
עיבוד באצווה
from batch_processor import BatchProcessor
processor = BatchProcessor()
# הוספת משימות
files = ["file1.py", "file2.js", "file3.go"]
for filename in files:
processor.add_task(
process_file,
filename=filename
)
# ביצוע באצווה
results = await processor.execute_all()
for result in results:
if result.success:
print(f"✓ {result.filename}")
else:
print(f"✗ {result.filename}: {result.error}")
קאשינג
from cache_manager import CacheManager
cache = CacheManager()
# שמירה בקאש
await cache.set(
key="user_stats_123",
value={"files": 42, "size": 1024},
ttl=3600 # שעה
)
# קריאה מקאש
stats = await cache.get("user_stats_123")
if stats:
print(f"מהקאש: {stats}")
else:
# חשב מחדש
stats = await calculate_stats()
await cache.set("user_stats_123", stats)