"""
מעבד Batch לעיבוד מרובה קבצים
Batch Processor for Multiple Files
"""
import asyncio
import logging
import time
from typing import Dict, List, Optional, Callable, Any
import re
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from database import db
from services import code_service as code_processor
from cache_manager import cache
from html import escape as html_escape
import tempfile
import subprocess
import os
logger = logging.getLogger(__name__)
[תיעוד]
@dataclass
class BatchJob:
"""עבודת batch"""
job_id: str
user_id: int
operation: str
files: List[str]
status: str = "pending" # pending, running, completed, failed
progress: int = 0
total: int = 0
results: Dict[str, Any] = None
error_message: str = ""
start_time: float = 0
end_time: float = 0
def __post_init__(self):
if self.results is None:
self.results = {}
if self.total == 0:
self.total = len(self.files)
[תיעוד]
class BatchProcessor:
"""מעבד batch לפעולות על מרובה קבצים"""
[תיעוד]
def __init__(self):
self.max_workers = 3 # מספר threads מקסימלי כדי למנוע "סיום מיידי" לא ריאלי
self.max_concurrent_jobs = 3 # מספר עבודות batch בו-זמנית
self.active_jobs: Dict[str, BatchJob] = {}
self.job_counter = 0
[תיעוד]
def create_job(self, user_id: int, operation: str, files: List[str]) -> str:
"""יצירת עבודת batch חדשה"""
self.job_counter += 1
job_id = f"batch_{user_id}_{self.job_counter}_{int(time.time())}"
job = BatchJob(
job_id=job_id,
user_id=user_id,
operation=operation,
files=files
)
self.active_jobs[job_id] = job
logger.info(f"נוצרה עבודת batch: {job_id} עם {len(files)} קבצים")
return job_id
[תיעוד]
async def process_files_batch(self, job_id: str, operation_func: Callable, **kwargs) -> BatchJob:
"""עיבוד batch של קבצים"""
if job_id not in self.active_jobs:
raise ValueError(f"עבודת batch {job_id} לא נמצאה")
job = self.active_jobs[job_id]
job.status = "running"
job.start_time = time.time()
try:
# עיבוד בparallel עם ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# יצירת futures לכל קובץ
future_to_file = {
executor.submit(operation_func, job.user_id, file_name, **kwargs): file_name
for file_name in job.files
}
# עיבוד התוצאות כשהן מוכנות
for future in as_completed(future_to_file):
file_name = future_to_file[future]
try:
result = future.result()
# קבע הצלחה לוגית לפי תוצאת הפונקציה (למשל is_valid)
success_flag = True
if isinstance(result, dict) and ('is_valid' in result):
success_flag = bool(result.get('is_valid'))
job.results[file_name] = {
'success': success_flag,
'result': result
}
except Exception as e:
job.results[file_name] = {
'success': False,
'error': str(e)
}
logger.error(f"שגיאה בעיבוד {file_name}: {e}")
# עדכון progress
job.progress += 1
logger.debug(f"Job {job_id}: {job.progress}/{job.total} completed")
# הוספת דיליי קטן כדי לאפשר חוויית התקדמות אמיתית (ולא "סיים בשנייה")
# נמוך מספיק כדי לא לעכב משמעותית, אך יוצר תחושה ריאלית ב-UI
try:
time.sleep(0.05)
except Exception:
pass
job.status = "completed"
job.end_time = time.time()
# חישוב סטטיסטיקות
successful = sum(1 for r in job.results.values() if r['success'])
failed = job.total - successful
logger.info(f"עבודת batch {job_id} הושלמה: {successful} הצליחו, {failed} נכשלו")
except Exception as e:
job.status = "failed"
job.error_message = str(e)
job.end_time = time.time()
logger.error(f"עבודת batch {job_id} נכשלה: {e}")
return job
[תיעוד]
async def analyze_files_batch(self, user_id: int, file_names: List[str]) -> str:
"""ניתוח batch של קבצים"""
job_id = self.create_job(user_id, "analyze", file_names)
def analyze_single_file(user_id: int, file_name: str) -> Dict[str, Any]:
"""ניתוח קובץ יחיד"""
try:
file_data = db.get_latest_version(user_id, file_name)
if not file_data:
return {'error': 'קובץ לא נמצא'}
code = file_data['code']
language = file_data['programming_language']
# ניתוח הקוד (עמוק יותר עבור קבצים גדולים)
analysis = code_processor.analyze_code(code, language)
# סימולציית זמן עיבוד ריאלי: 1ms לכל 500 תווים, עד 150ms
try:
delay = min(max(len(code) / 500_000.0, 0.02), 0.15)
time.sleep(delay)
except Exception:
pass
return {
'lines': len(code.split('\n')),
'chars': len(code),
'language': language,
'analysis': analysis
}
except Exception as e:
return {'error': str(e)}
# הרצה ברקע כדי לא לחסום את הלולאה הראשית
asyncio.create_task(self.process_files_batch(job_id, analyze_single_file))
return job_id
[תיעוד]
async def validate_files_batch(self, user_id: int, file_names: List[str]) -> str:
"""בדיקת תקינות batch של קבצים"""
job_id = self.create_job(user_id, "validate", file_names)
def _run_local_cmd(args_list, cwd: str, timeout_sec: int = 20) -> Dict[str, Any]:
try:
completed = subprocess.run(
args_list,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout_sec
)
output = (completed.stdout or "") + (completed.stderr or "")
return {"returncode": completed.returncode, "output": output.strip()}
except subprocess.TimeoutExpired as e:
return {"returncode": 124, "output": "Timeout"}
except FileNotFoundError:
return {"returncode": 127, "output": "Tool not installed"}
except Exception as e:
return {"returncode": 1, "output": str(e)}
def _advanced_python_checks(temp_dir: str, filename: str) -> Dict[str, Any]:
results: Dict[str, Any] = {}
# Prefer project configs if present in temp_dir (copied beforehand)
results["flake8"] = _run_local_cmd(["flake8", filename], temp_dir)
results["mypy"] = _run_local_cmd(["mypy", filename], temp_dir)
results["bandit"] = _run_local_cmd(["bandit", "-q", "-r", filename], temp_dir)
results["black"] = _run_local_cmd(["black", "--check", filename], temp_dir)
return results
def _copy_lint_configs(temp_dir: str) -> None:
"""Copy lint/type/security config files into temp_dir if they exist in project root."""
project_root = os.getcwd()
configs = [
(os.path.join(project_root, ".flake8"), os.path.join(temp_dir, ".flake8")),
(os.path.join(project_root, "pyproject.toml"), os.path.join(temp_dir, "pyproject.toml")),
(os.path.join(project_root, "mypy.ini"), os.path.join(temp_dir, "mypy.ini")),
(os.path.join(project_root, "bandit.yaml"), os.path.join(temp_dir, "bandit.yaml")),
# JS/TS linters/formatters
(os.path.join(project_root, ".eslintrc.json"), os.path.join(temp_dir, ".eslintrc.json")),
(os.path.join(project_root, ".eslintrc.js"), os.path.join(temp_dir, ".eslintrc.js")),
(os.path.join(project_root, "package.json"), os.path.join(temp_dir, "package.json")),
(os.path.join(project_root, ".prettierrc"), os.path.join(temp_dir, ".prettierrc")),
(os.path.join(project_root, "prettier.config.js"), os.path.join(temp_dir, "prettier.config.js")),
(os.path.join(project_root, "tsconfig.json"), os.path.join(temp_dir, "tsconfig.json")),
# YAML/Docker/Semgrep
(os.path.join(project_root, ".yamllint"), os.path.join(temp_dir, ".yamllint")),
(os.path.join(project_root, ".hadolint.yaml"), os.path.join(temp_dir, ".hadolint.yaml")),
(os.path.join(project_root, ".semgrep.yml"), os.path.join(temp_dir, ".semgrep.yml")),
(os.path.join(project_root, ".semgrep.yaml"), os.path.join(temp_dir, ".semgrep.yaml")),
]
for src, dst in configs:
try:
if os.path.isfile(src):
with open(src, "rb") as fsrc, open(dst, "wb") as fdst:
fdst.write(fsrc.read())
except Exception:
continue
def _guess_extension(file_name: str, language: str) -> str:
name = (file_name or "").lower()
lang = (language or "").lower()
# Prefer existing extension if present
if "." in name:
return name.rsplit(".", 1)[-1]
if lang in ("python", "py"):
return "py"
if lang in ("javascript", "js"):
return "js"
if lang in ("typescript", "ts"):
return "ts"
if lang in ("bash", "shell", "sh"):
return "sh"
if lang in ("yaml", "yml"):
return "yml"
if lang in ("json",):
return "json"
if "dockerfile" in name:
return "dockerfile"
return "txt"
def _secrets_scan(code: str) -> Dict[str, Any]:
"""Very lightweight secret detection with regex patterns."""
patterns = {
"aws_access_key": r"AKIA[0-9A-Z]{16}",
"aws_secret_key": r"(?i)aws(.{0,20})?(secret|access)[=:].{0,3}([A-Za-z0-9/+=]{40})",
"github_token": r"ghp_[A-Za-z0-9]{36}",
"google_api_key": r"AIza[0-9A-Za-z\-_]{35}",
"private_key": r"-----BEGIN (RSA|EC|DSA|OPENSSH) PRIVATE KEY-----",
}
findings = []
for name, pat in patterns.items():
try:
if re.search(pat, code):
findings.append(name)
except re.error:
continue
return {
"returncode": 0 if not findings else 1,
"output": ", ".join(findings) if findings else "no-secrets-found",
}
def validate_single_file(user_id: int, file_name: str) -> Dict[str, Any]:
"""בדיקת תקינות קובץ יחיד"""
try:
file_data = db.get_latest_version(user_id, file_name)
if not file_data:
return {'error': 'קובץ לא נמצא'}
code = file_data['code']
language = file_data['programming_language']
# בדיקת תקינות
is_valid, cleaned_code, error_msg = code_processor.validate_code_input(code, file_name, user_id)
result: Dict[str, Any] = {
'is_valid': is_valid,
'error_message': error_msg,
'cleaned_code': cleaned_code,
'original_length': len(code) if isinstance(code, str) else 0,
'cleaned_length': len(cleaned_code) if isinstance(cleaned_code, str) else 0,
'language': language,
'advanced_checks': {}
}
# בדיקות מתקדמות לפי שפה
with tempfile.TemporaryDirectory(prefix="validate_") as temp_dir:
# קבע שם קובץ זמני לפי השפה/הסיומת
ext = _guess_extension(file_name or "file", language or "")
base_name = (file_name or f"file.{ext}").replace("/", "_")
if not base_name.lower().endswith(f".{ext}") and ext not in ("dockerfile"):
base_name = f"{base_name}.{ext}"
temp_file = os.path.join(temp_dir, base_name)
os.makedirs(os.path.dirname(temp_file), exist_ok=True)
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(code)
_copy_lint_configs(temp_dir)
adv: Dict[str, Any] = {}
# Python specific
if (language or "").lower() == 'python' or temp_file.endswith('.py'):
adv.update(_advanced_python_checks(temp_dir, os.path.basename(temp_file)))
# pylint, isort, radon (optional)
adv["pylint"] = _run_local_cmd(["pylint", "-sn", os.path.basename(temp_file)], temp_dir)
adv["isort"] = _run_local_cmd(["isort", "--check-only", os.path.basename(temp_file)], temp_dir)
adv["radon_cc"] = _run_local_cmd(["radon", "cc", "-s", "-a", os.path.basename(temp_file)], temp_dir)
adv["radon_mi"] = _run_local_cmd(["radon", "mi", os.path.basename(temp_file)], temp_dir)
# JavaScript / TypeScript
if temp_file.endswith('.js') or temp_file.endswith('.ts'):
adv["eslint"] = _run_local_cmd(["eslint", "-f", "stylish", os.path.basename(temp_file)], temp_dir)
if temp_file.endswith('.ts'):
adv["tsc"] = _run_local_cmd(["tsc", "--noEmit", os.path.basename(temp_file)], temp_dir)
adv["prettier"] = _run_local_cmd(["prettier", "--check", os.path.basename(temp_file)], temp_dir)
# Shell scripts
if temp_file.endswith('.sh'):
adv["shellcheck"] = _run_local_cmd(["shellcheck", os.path.basename(temp_file)], temp_dir)
# YAML
if temp_file.endswith('.yml') or temp_file.endswith('.yaml'):
adv["yamllint"] = _run_local_cmd(["yamllint", os.path.basename(temp_file)], temp_dir)
# Dockerfile
if ext == 'dockerfile' or os.path.basename(temp_file).lower() == 'dockerfile':
adv["hadolint"] = _run_local_cmd(["hadolint", os.path.basename(temp_file)], temp_dir)
# JSON
if temp_file.endswith('.json'):
adv["jq"] = _run_local_cmd(["jq", "-e", ".", os.path.basename(temp_file)], temp_dir)
# Semgrep (generic SAST) - optional and quiet
adv["semgrep"] = _run_local_cmd(["semgrep", "--quiet", "--config", "auto", os.path.basename(temp_file)], temp_dir)
# Internal secrets scan (always available)
adv["secrets_scan"] = _secrets_scan(code)
result['advanced_checks'] = adv
# סימולציית זמן עיבוד ריאלי
try:
delay = min(max(len(code) / 400_000.0, 0.03), 0.2)
time.sleep(delay)
except Exception:
pass
return result
except Exception as e:
return {'error': str(e)}
# הרצה ברקע כדי לא לחסום את הלולאה הראשית
asyncio.create_task(self.process_files_batch(job_id, validate_single_file))
return job_id
[תיעוד]
async def export_files_batch(self, user_id: int, file_names: List[str], export_format: str = "zip") -> str:
"""ייצוא batch של קבצים"""
job_id = self.create_job(user_id, "export", file_names)
def export_single_file(user_id: int, file_name: str, format: str = "zip") -> Dict[str, Any]:
"""ייצוא קובץ יחיד"""
try:
file_data = db.get_latest_version(user_id, file_name)
if not file_data:
return {'error': 'קובץ לא נמצא'}
# הכנת הקובץ לייצוא
return {
'file_name': file_name,
'content': file_data['code'],
'language': file_data['programming_language'],
'size': len(file_data['code'])
}
except Exception as e:
return {'error': str(e)}
# הרצה ברקע כדי לא לחסום את הלולאה הראשית
asyncio.create_task(self.process_files_batch(job_id, export_single_file, format=export_format))
return job_id
[תיעוד]
def get_job_status(self, job_id: str) -> Optional[BatchJob]:
"""קבלת סטטוס עבודת batch"""
return self.active_jobs.get(job_id)
[תיעוד]
def cleanup_old_jobs(self, max_age_hours: int = 24):
"""ניקוי עבודות ישנות"""
current_time = time.time()
old_jobs = []
for job_id, job in self.active_jobs.items():
if job.end_time > 0: # עבודה שהסתיימה
age_hours = (current_time - job.end_time) / 3600
if age_hours > max_age_hours:
old_jobs.append(job_id)
for job_id in old_jobs:
del self.active_jobs[job_id]
if old_jobs:
logger.info(f"נוקו {len(old_jobs)} עבודות batch ישנות")
# יצירת instance גלובלי
batch_processor = BatchProcessor()