"""
פונקציות עזר כלליות לבוט שומר קבצי קוד
General Utility Functions for Code Keeper Bot
"""
import asyncio
import hashlib
import json
import logging
import mimetypes
import os
import re
import secrets
import shutil
import sys
import tempfile
import time
import zipfile
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from functools import wraps
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import telegram.error
try:
import aiofiles # type: ignore
except Exception: # optional for tests
aiofiles = None # type: ignore[assignment]
try:
import aiohttp # type: ignore
except Exception: # optional for tests
aiohttp = None # type: ignore[assignment]
try:
from telegram import Message, Update, User # type: ignore
from telegram.constants import ChatAction, ParseMode # type: ignore
from telegram.ext import ContextTypes # type: ignore
except Exception: # lightweight stubs for test env
class Message: # type: ignore[no-redef]
pass
class Update: # type: ignore[no-redef]
pass
class User: # type: ignore[no-redef]
pass
ChatAction = None # type: ignore[assignment]
ParseMode = None # type: ignore[assignment]
class _ContextTypes:
DEFAULT_TYPE = object
ContextTypes = _ContextTypes # type: ignore[assignment]
logger = logging.getLogger(__name__)
[תיעוד]
class CodeErrorLogger:
"""מערכת לוגים ייעודית לשגיאות עיבוד קוד"""
[תיעוד]
def __init__(self):
self.logger = logging.getLogger('code_error_system')
self._setup_logger()
def _setup_logger(self):
"""הגדרת הלוגר עם קבצי יומן נפרדים"""
if not self.logger.handlers:
# לוגר לשגיאות כלליות - שימוש ב-StreamHandler לסביבת פרודקשן
error_handler = logging.StreamHandler()
error_handler.setLevel(logging.ERROR)
error_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
error_handler.setFormatter(error_formatter)
# לוגר לסטטיסטיקות ופעילות - שימוש ב-StreamHandler לסביבת פרודקשן
activity_handler = logging.StreamHandler()
activity_handler.setLevel(logging.INFO)
activity_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
activity_handler.setFormatter(activity_formatter)
self.logger.addHandler(error_handler)
self.logger.addHandler(activity_handler)
self.logger.setLevel(logging.INFO)
[תיעוד]
def log_code_processing_error(self, user_id: int, error_type: str, error_message: str,
context: Dict[str, Any] = None):
"""רישום שגיאות עיבוד קוד"""
context = context or {}
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"user_id": user_id,
"error_type": error_type,
"message": error_message,
"context": context
}
self.logger.error(f"CODE_ERROR: {json.dumps(log_entry, ensure_ascii=False)}")
[תיעוד]
def log_code_activity(self, user_id: int, activity_type: str, details: Dict[str, Any] = None):
"""רישום פעילות עיבוד קוד"""
details = details or {}
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"user_id": user_id,
"activity": activity_type,
"details": details
}
self.logger.info(f"CODE_ACTIVITY: {json.dumps(log_entry, ensure_ascii=False)}")
[תיעוד]
def log_validation_failure(self, user_id: int, code_length: int, error_reason: str):
"""רישום כשל באימות קוד"""
self.log_code_processing_error(
user_id,
"validation_failure",
error_reason,
{"code_length": code_length}
)
[תיעוד]
def log_sanitization_success(self, user_id: int, original_length: int, cleaned_length: int):
"""רישום הצלחה בסניטציה"""
self.log_code_activity(
user_id,
"code_sanitized",
{
"original_length": original_length,
"cleaned_length": cleaned_length,
"reduction": original_length - cleaned_length
}
)
# יצירת אינסטנס גלובלי של הלוגר
code_error_logger = CodeErrorLogger()
[תיעוד]
class TimeUtils:
"""כלים לעבודה עם זמן ותאריכים"""
[תיעוד]
@staticmethod
def parse_date_string(date_str: str) -> Optional[datetime]:
"""פרסור מחרוזת תאריך לאובייקט datetime"""
formats = [
"%Y-%m-%d",
"%d/%m/%Y",
"%d.%m.%Y",
"%Y-%m-%d %H:%M:%S",
"%d/%m/%Y %H:%M",
"%Y-%m-%dT%H:%M:%S"
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# ניסיון לפרסור יחסי
date_str_lower = date_str.lower()
if date_str_lower in ['today', 'היום']:
return datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
elif date_str_lower in ['yesterday', 'אתמול']:
return (datetime.now(timezone.utc) - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
elif date_str_lower in ['week', 'שבוע']:
return datetime.now(timezone.utc) - timedelta(weeks=1)
elif date_str_lower in ['month', 'חודש']:
return datetime.now(timezone.utc) - timedelta(days=30)
return None
[תיעוד]
@staticmethod
def get_time_ranges(period: str) -> Tuple[datetime, datetime]:
"""קבלת טווח זמנים לפי תקופה"""
now = datetime.now(timezone.utc)
if period == 'today':
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
elif period == 'week':
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(weeks=1)
elif period == 'month':
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
if start.month == 12:
end = start.replace(year=start.year + 1, month=1)
else:
end = start.replace(month=start.month + 1)
elif period == 'year':
start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
end = start.replace(year=start.year + 1)
else:
# ברירת מחדל - יום אחרון
start = now - timedelta(days=1)
end = now
return start, end
[תיעוד]
class TextUtils:
"""כלים לעבודה עם טקסט"""
[תיעוד]
@staticmethod
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
"""קיצור טקסט עם סיומת"""
if len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
[תיעוד]
@staticmethod
def escape_markdown(text: str, version: int = 2) -> str:
"""הגנה על תווים מיוחדים ב-Markdown"""
if version == 2:
# Markdown V2: כל התווים שיש לאסקייפ לפי Telegram MarkdownV2
special_chars = set("_*[]()~`>#+-=|{}.!\\")
return "".join(("\\" + ch) if ch in special_chars else ch for ch in text)
else:
# Markdown V1: נשתמש בקבוצה מצומצמת אך גם נסמן סוגריים כדי להימנע מתקלות כלליות
special_chars = set("_*`[()\\")
return "".join(("\\" + ch) if ch in special_chars else ch for ch in text)
[תיעוד]
@staticmethod
def clean_filename(filename: str) -> str:
"""ניקוי שם קובץ מתווים לא חוקיים"""
# הסרת תווים לא חוקיים
cleaned = re.sub(r'[<>:"/\\|?*]', '_', filename)
# הסרת רווחים מיותרים
cleaned = re.sub(r'\s+', '_', cleaned)
# הסרת נקודות מיותרות
cleaned = re.sub(r'\.+', '.', cleaned)
# הגבלת אורך
if len(cleaned) > 100:
name, ext = os.path.splitext(cleaned)
cleaned = name[:100-len(ext)] + ext
return cleaned.strip('._')
[תיעוד]
@staticmethod
def highlight_text(text: str, query: str, tag: str = "**") -> str:
"""הדגשת מילות חיפוש בטקסט"""
if not query:
return text
# הדגשה בלי תלות ברישיות
pattern = re.compile(re.escape(query), re.IGNORECASE)
return pattern.sub(f"{tag}\\g<0>{tag}", text)
[תיעוד]
@staticmethod
def format_file_size(size_bytes: int) -> str:
"""פורמט גודל קובץ (bytes -> KB/MB/GB)"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 ** 2:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 ** 3:
return f"{size_bytes / (1024 ** 2):.1f} MB"
else:
return f"{size_bytes / (1024 ** 3):.1f} GB"
[תיעוד]
@staticmethod
def pluralize_hebrew(count: int, singular: str, plural: str) -> str:
"""צורת רבים עבריות"""
if count == 1:
return f"{count} {singular}"
elif count == 2:
return f"2 {plural}"
else:
return f"{count} {plural}"
[תיעוד]
class SecurityUtils:
"""כלים אמינות ובטיחות"""
[תיעוד]
@staticmethod
def generate_secure_token(length: int = 32) -> str:
"""יצירת טוקן מאובטח"""
return secrets.token_urlsafe(length)
[תיעוד]
@staticmethod
def hash_content(content: str, algorithm: str = 'sha256') -> str:
"""יצירת hash לתוכן"""
if algorithm == 'md5':
return hashlib.md5(content.encode()).hexdigest()
elif algorithm == 'sha1':
return hashlib.sha1(content.encode()).hexdigest()
elif algorithm == 'sha256':
return hashlib.sha256(content.encode()).hexdigest()
else:
raise ValueError(f"אלגוריתם לא נתמך: {algorithm}")
[תיעוד]
@staticmethod
def sanitize_code(code: str) -> str:
"""ניקוי קוד מתוכן מסוכן (בסיסי)"""
# דפוסי קוד מסוכנים בסיסיים
dangerous_patterns = [
r'exec\s*\(',
r'eval\s*\(',
r'__import__\s*\(',
r'open\s*\(',
r'file\s*\(',
r'input\s*\(',
r'raw_input\s*\(',
]
# החלפת דפוסים מסוכנים
cleaned = code
for pattern in dangerous_patterns:
cleaned = re.sub(pattern, '[REMOVED]', cleaned, flags=re.IGNORECASE)
return cleaned
[תיעוד]
class TelegramUtils:
"""כלים לעבודה עם Telegram"""
[תיעוד]
@staticmethod
async def send_typing_action(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""שליחת אקשן 'כותב...'"""
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action=ChatAction.TYPING
)
[תיעוד]
@staticmethod
async def send_document_action(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""שליחת אקשן 'שולח מסמך...'"""
await context.bot.send_chat_action(
chat_id=update.effective_chat.id,
action=ChatAction.UPLOAD_DOCUMENT
)
[תיעוד]
@staticmethod
def get_user_mention(user: User) -> str:
"""קבלת מנשן למשתמש"""
if user.username:
return f"@{user.username}"
else:
return f"[{user.first_name}](tg://user?id={user.id})"
[תיעוד]
@staticmethod
def split_long_message(text: str, max_length: int = 4096) -> List[str]:
"""חלוקת הודעה ארוכה לחלקים"""
if len(text) <= max_length:
return [text]
parts = []
current_part = ""
for line in text.split('\n'):
if len(current_part) + len(line) + 1 <= max_length:
current_part += line + '\n'
else:
if current_part:
parts.append(current_part.rstrip())
current_part = line + '\n'
if current_part:
parts.append(current_part.rstrip())
return parts
[תיעוד]
@staticmethod
async def safe_edit_message_text(query, text: str, reply_markup=None, parse_mode: Optional[str] = None) -> None:
"""עריכת טקסט הודעה בבטיחות: מתעלם משגיאת 'Message is not modified'."""
try:
if parse_mode is None:
await query.edit_message_text(text=text, reply_markup=reply_markup)
else:
await query.edit_message_text(text=text, reply_markup=reply_markup, parse_mode=parse_mode)
except telegram.error.BadRequest as e:
if "message is not modified" in str(e).lower():
return
raise
[תיעוד]
@staticmethod
async def safe_edit_message_reply_markup(query, reply_markup=None) -> None:
"""עריכת מקלדת הודעה בבטיחות: מתעלם משגיאת 'Message is not modified'."""
try:
await query.edit_message_reply_markup(reply_markup=reply_markup)
except telegram.error.BadRequest as e:
if "message is not modified" in str(e).lower():
return
raise
[תיעוד]
class CallbackQueryGuard:
"""Guard גורף ללחיצות כפולות על כפתורי CallbackQuery.
מבוסס על טביעת אצבע של המשתמש/הודעה/הנתון (callback_data) כדי לחסום
את אותה פעולה בחלון זמן קצר, בלי לחסום פעולות שונות.
"""
DEFAULT_WINDOW_SECONDS: float = 1.2
# נעילות פר-משתמש כדי למנוע מרוץ בין לחיצות מקבילות של אותו משתמש
_user_locks: Dict[int, asyncio.Lock] = {}
@staticmethod
def _fingerprint(update: Update) -> str:
try:
q = getattr(update, "callback_query", None)
user_id = int(getattr(update.effective_user, "id", 0) or 0)
chat_id = int(getattr(update.effective_chat, "id", 0) or 0)
msg_id = int(getattr(getattr(q, "message", None), "message_id", 0) or 0)
data = str(getattr(q, "data", "") or "")
return f"{user_id}:{chat_id}:{msg_id}:{data}"
except Exception:
return "unknown"
[תיעוד]
@staticmethod
def should_block(update: Update, context: ContextTypes.DEFAULT_TYPE, window_seconds: Optional[float] = None) -> bool:
"""בודק האם יש לחסום את העדכון כלחיצה כפולה.
אם זו אותה טביעת אצבע בתוך חלון הזמן – נחסום; אחרת נסמן ונאפשר.
"""
try:
win = float(window_seconds if window_seconds is not None else CallbackQueryGuard.DEFAULT_WINDOW_SECONDS)
except Exception:
win = CallbackQueryGuard.DEFAULT_WINDOW_SECONDS
try:
fp = CallbackQueryGuard._fingerprint(update)
now_ts = time.time()
last_fp = context.user_data.get("_last_cb_fp") if hasattr(context, "user_data") else None
busy_until = float(context.user_data.get("_cb_guard_until", 0.0) or 0.0) if hasattr(context, "user_data") else 0.0
if last_fp == fp and now_ts < busy_until:
return True
# סמנו את הפעולה הנוכחית לחלון קצר
if hasattr(context, "user_data"):
context.user_data["_last_cb_fp"] = fp
context.user_data["_cb_guard_until"] = now_ts + win
return False
except Exception:
# אל תחסום אם guard נכשל
return False
[תיעוד]
@staticmethod
async def should_block_async(update: Update, context: ContextTypes.DEFAULT_TYPE, window_seconds: Optional[float] = None) -> bool:
"""בודק בצורה אטומית (עם נעילה) אם לחסום לחיצה כפולה של אותו משתמש.
חסימה מבוססת חלון זמן פר-משתמש, ללא תלות ב-message_id/data, כדי למנוע מרוץ.
"""
try:
try:
win = float(window_seconds if window_seconds is not None else CallbackQueryGuard.DEFAULT_WINDOW_SECONDS)
except Exception:
win = CallbackQueryGuard.DEFAULT_WINDOW_SECONDS
user_id = int(getattr(getattr(update, 'effective_user', None), 'id', 0) or 0)
# אם אין זיהוי משתמש, fallback להתנהגות הישנה ללא חסימה
if user_id <= 0:
return CallbackQueryGuard.should_block(update, context, window_seconds=win)
# קבל/צור נעילה למשתמש
lock = CallbackQueryGuard._user_locks.get(user_id)
if lock is None:
lock = asyncio.Lock()
CallbackQueryGuard._user_locks[user_id] = lock
async with lock:
now_ts = time.time()
# השתמש באותו שדה זמן גלובלי שהיה בשימוש, אך ללא טביעת אצבע
busy_until = float(context.user_data.get("_cb_guard_until", 0.0) or 0.0) if hasattr(context, "user_data") else 0.0
if now_ts < busy_until:
return True
# סמנו חלון זמן חסימה חדש
if hasattr(context, "user_data"):
context.user_data["_cb_guard_until"] = now_ts + win
return False
except Exception:
# אל תחסום אם guard נכשל
return False
[תיעוד]
class AsyncUtils:
"""כלים לעבודה אסינכרונית"""
[תיעוד]
@staticmethod
async def run_with_timeout(coro, timeout: float = 30.0):
"""הרצת פונקציה אסינכרונית עם timeout"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
logger.warning(f"פעולה הופסקה עקב timeout ({timeout}s)")
return None
[תיעוד]
@staticmethod
async def batch_process(items: List[Any], process_func: Callable,
batch_size: int = 10, delay: float = 0.1) -> List[Any]:
"""עיבוד פריטים בקבוצות"""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# עיבוד הקבוצה
batch_tasks = [process_func(item) for item in batch]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
results.extend(batch_results)
# המתנה בין קבוצות
if delay > 0 and i + batch_size < len(items):
await asyncio.sleep(delay)
return results
[תיעוד]
class ValidationUtils:
"""כלים לוולידציה"""
[תיעוד]
@staticmethod
def is_valid_filename(filename: str) -> bool:
"""בדיקת תקינות שם קובץ"""
if not filename or len(filename) > 255:
return False
# תווים לא חוקיים
invalid_chars = '<>:"/\\|?*'
if any(char in filename for char in invalid_chars):
return False
# שמות שמורים ב-Windows
reserved_names = [
'CON', 'PRN', 'AUX', 'NUL',
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
]
name_without_ext = os.path.splitext(filename)[0].upper()
if name_without_ext in reserved_names:
return False
return True
[תיעוד]
@staticmethod
def is_safe_code(code: str, programming_language: str) -> Tuple[bool, List[str]]:
"""בדיקה בסיסית של בטיחות קוד"""
warnings = []
# דפוסים מסוכנים
dangerous_patterns = {
'python': [
r'exec\s*\(',
r'eval\s*\(',
r'__import__\s*\(',
r'open\s*\([^)]*["\']w', # כתיבה לקובץ
r'subprocess\.',
r'os\.system\s*\(',
r'os\.popen\s*\(',
],
'javascript': [
r'eval\s*\(',
r'Function\s*\(',
r'document\.write\s*\(',
r'innerHTML\s*=',
r'outerHTML\s*=',
],
'bash': [
r'rm\s+-rf',
r'rm\s+/',
r'dd\s+if=',
r'mkfs\.',
r'fdisk\s+',
]
}
if programming_language in dangerous_patterns:
for pattern in dangerous_patterns[programming_language]:
if re.search(pattern, code, re.IGNORECASE):
warnings.append(f"דפוס מסוכן אפשרי: {pattern}")
# בדיקות כלליות
if 'password' in code.lower() or 'secret' in code.lower():
warnings.append("הקוד מכיל מילות סיסמה או סוד")
if re.search(r'https?://\S+', code):
warnings.append("הקוד מכיל URLים")
is_safe = len(warnings) == 0
return is_safe, warnings
[תיעוד]
class FileUtils:
"""כלים לעבודה עם קבצים"""
[תיעוד]
@staticmethod
async def download_file(url: str, max_size: int = 10 * 1024 * 1024) -> Optional[bytes]:
"""הורדת קובץ מ-URL"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
logger.error(f"שגיאה בהורדת קובץ: {response.status}")
return None
# בדיקת גודל
content_length = response.headers.get('content-length')
if content_length and int(content_length) > max_size:
logger.error(f"קובץ גדול מדי: {content_length} bytes")
return None
# הורדת התוכן
content = b""
async for chunk in response.content.iter_chunked(8192):
content += chunk
if len(content) > max_size:
logger.error("קובץ גדול מדי")
return None
return content
except Exception as e:
logger.error(f"שגיאה בהורדת קובץ: {e}")
return None
[תיעוד]
@staticmethod
def get_file_extension(filename: str) -> str:
"""קבלת סיומת קובץ"""
return os.path.splitext(filename)[1].lower()
[תיעוד]
@staticmethod
def get_mime_type(filename: str) -> str:
"""קבלת MIME type של קובץ"""
mime_type, _ = mimetypes.guess_type(filename)
return mime_type or 'application/octet-stream'
[תיעוד]
@staticmethod
async def create_temp_file(content: Union[str, bytes],
suffix: str = "") -> str:
"""יצירת קובץ זמני"""
with tempfile.NamedTemporaryFile(mode='wb', suffix=suffix, delete=False) as temp_file:
if isinstance(content, str):
content = content.encode('utf-8')
temp_file.write(content)
return temp_file.name
[תיעוד]
class ConfigUtils:
"""כלים לקונפיגורציה"""
[תיעוד]
@staticmethod
def load_json_config(file_path: str, default: Dict = None) -> Dict:
"""טעינת קונפיגורציה מקובץ JSON"""
if default is None:
default = {}
try:
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
else:
logger.warning(f"קובץ קונפיגורציה לא נמצא: {file_path}")
return default
except Exception as e:
logger.error(f"שגיאה בטעינת קונפיגורציה: {e}")
return default
[תיעוד]
@staticmethod
def save_json_config(file_path: str, config: Dict) -> bool:
"""שמירת קונפיגורציה לקובץ JSON"""
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"שגיאה בשמירת קונפיגורציה: {e}")
return False
[תיעוד]
class CacheUtils:
"""כלים לקאש זמני"""
_cache = {}
_cache_times = {}
[תיעוד]
@classmethod
def set(cls, key: str, value: Any, ttl: int = 300):
"""שמירה בקאש עם TTL (שניות)"""
cls._cache[key] = value
cls._cache_times[key] = time.time() + ttl
[תיעוד]
@classmethod
def get(cls, key: str, default: Any = None) -> Any:
"""קבלה מהקאש"""
if key not in cls._cache:
return default
# בדיקת תפוגה
if time.time() > cls._cache_times.get(key, 0):
cls.delete(key)
return default
return cls._cache[key]
[תיעוד]
@classmethod
def delete(cls, key: str):
"""מחיקה מהקאש"""
cls._cache.pop(key, None)
cls._cache_times.pop(key, None)
[תיעוד]
@classmethod
def clear(cls):
"""ניקוי כל הקאש"""
cls._cache.clear()
cls._cache_times.clear()
# פונקציות עזר גלובליות
[תיעוד]
def get_memory_usage() -> Dict[str, float]:
"""קבלת נתוני זיכרון"""
try:
import psutil
process = psutil.Process()
memory_info = process.memory_info()
return {
"rss_mb": memory_info.rss / 1024 / 1024,
"vms_mb": memory_info.vms / 1024 / 1024,
"percent": process.memory_percent()
}
except ImportError:
return {"error": "psutil לא מותקן"}
[תיעוד]
def setup_logging(level: str = "INFO", log_file: str = None) -> logging.Logger:
"""הגדרת לוגים"""
# הגדרת רמת לוג
log_level = getattr(logging, level.upper(), logging.INFO)
# הגדרת פורמט
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# הגדרת handlers
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
handlers.append(file_handler)
# קונפיגורציה
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=handlers
)
return logging.getLogger(__name__)
[תיעוד]
def generate_summary_stats(files_data: List[Dict]) -> Dict[str, Any]:
"""יצירת סיכום סטטיסטיקות"""
if not files_data:
return {}
total_files = len(files_data)
total_size = sum(len(f.get('code', '')) for f in files_data)
languages = [f.get('language', 'unknown') for f in files_data]
language_counts = {lang: languages.count(lang) for lang in set(languages)}
all_tags = []
for f in files_data:
all_tags.extend(f.get('tags', []))
tag_counts = {tag: all_tags.count(tag) for tag in set(all_tags)}
return {
"total_files": total_files,
"total_size": total_size,
"total_size_formatted": TextUtils.format_file_size(total_size),
"languages": language_counts,
"most_used_language": max(language_counts, key=language_counts.get) if language_counts else None,
"tags": tag_counts,
"most_used_tag": max(tag_counts, key=tag_counts.get) if tag_counts else None,
"average_file_size": total_size // total_files if total_files > 0 else 0
}
[תיעוד]
def detect_language_from_filename(filename: str) -> str:
"""זיהוי שפת תכנות לפי סיומת הקובץ"""
# מיפוי סיומות לשפות
extensions_map = {
# Python
'.py': 'python',
'.pyw': 'python',
'.pyx': 'python',
'.pyi': 'python',
# JavaScript/TypeScript
'.js': 'javascript',
'.jsx': 'javascript',
'.ts': 'typescript',
'.tsx': 'typescript',
'.mjs': 'javascript',
# Web
'.html': 'html',
'.htm': 'html',
'.css': 'css',
'.scss': 'scss',
'.sass': 'sass',
'.less': 'less',
# Java/Kotlin
'.java': 'java',
'.kt': 'kotlin',
'.kts': 'kotlin',
# C/C++
'.c': 'c',
'.h': 'c',
'.cpp': 'cpp',
'.cc': 'cpp',
'.cxx': 'cpp',
'.hpp': 'cpp',
'.hxx': 'cpp',
# C#
'.cs': 'csharp',
# Go
'.go': 'go',
# Rust
'.rs': 'rust',
# Ruby
'.rb': 'ruby',
'.rake': 'ruby',
# PHP
'.php': 'php',
'.phtml': 'php',
# Swift
'.swift': 'swift',
# Shell
'.sh': 'bash',
'.bash': 'bash',
'.zsh': 'bash',
'.fish': 'bash',
# SQL
'.sql': 'sql',
# JSON/YAML/XML
'.json': 'json',
'.yaml': 'yaml',
'.yml': 'yaml',
'.xml': 'xml',
# Markdown
'.md': 'markdown',
'.markdown': 'markdown',
# Other
'.r': 'r',
'.lua': 'lua',
'.vim': 'vim',
'.dockerfile': 'dockerfile',
'Dockerfile': 'dockerfile',
'.makefile': 'makefile',
'Makefile': 'makefile',
'.cmake': 'cmake',
'.gradle': 'gradle',
'.properties': 'properties',
'.ini': 'ini',
'.toml': 'toml',
'.env': 'env',
'.gitignore': 'gitignore',
'.dockerignore': 'dockerignore'
}
# בדיקת סיומת
filename_lower = filename.lower()
for ext, lang in extensions_map.items():
if filename_lower.endswith(ext.lower()) or filename_lower == ext.lower():
return lang
# אם לא נמצאה התאמה, נחזיר 'text'
return 'text'
[תיעוד]
def get_language_emoji(language: str) -> str:
"""מחזיר אימוג'י מתאים לשפת התכנות"""
emoji_map = {
'python': '🐍',
'javascript': '🟨',
'typescript': '🔷',
'java': '☕',
'kotlin': '🟣',
'c': '🔵',
'cpp': '🔷',
'csharp': '🟦',
'go': '🐹',
'rust': '🦀',
'ruby': '💎',
'php': '🐘',
'swift': '🦉',
'bash': '🐚',
'sql': '🗄️',
'html': '🌐',
'css': '🎨',
'scss': '🎨',
'sass': '🎨',
'less': '🎨',
'json': '📋',
'yaml': '📄',
'xml': '📰',
'markdown': '📝',
'r': '📊',
'lua': '🌙',
'vim': '📝',
'dockerfile': '🐳',
'makefile': '🔧',
'cmake': '🔨',
'gradle': '🐘',
'properties': '⚙️',
'ini': '⚙️',
'toml': '⚙️',
'env': '🔐',
'gitignore': '🚫',
'dockerignore': '🚫',
'text': '📄'
}
return emoji_map.get(language.lower(), '📄')
[תיעוד]
class SensitiveDataFilter(logging.Filter):
"""מסנן שמטשטש טוקנים ונתונים רגישים בלוגים."""
[תיעוד]
def filter(self, record: logging.LogRecord) -> bool:
try:
msg = str(record.getMessage())
# זיהוי בסיסי של טוקנים: ghp_..., github_pat_..., Bearer ...
patterns = [
(r"ghp_[A-Za-z0-9]{20,}", "ghp_***REDACTED***"),
(r"github_pat_[A-Za-z0-9_]{20,}", "github_pat_***REDACTED***"),
(r"Bearer\s+[A-Za-z0-9\-_.=:/+]{10,}", "Bearer ***REDACTED***"),
]
redacted = msg
import re as _re
for pat, repl in patterns:
redacted = _re.sub(pat, repl, redacted)
# עדכן רק את message הפורמטי
record.msg = redacted
# חשוב: נקה ארגומנטים כדי למנוע ניסיון פורמט חוזר (%s) שיוביל ל-TypeError
record.args = ()
except Exception:
pass
return True
[תיעוד]
def install_sensitive_filter():
"""התקנת המסנן על כל ה-handlers הקיימים."""
root = logging.getLogger()
f = SensitiveDataFilter()
for h in root.handlers:
h.addFilter(f)