הראה קוד מקור ל refactoring_engine

"""
מנוע רפקטורינג אוטומטי
מבצע שינויי מבנה בקוד בצורה בטוחה
"""

from __future__ import annotations

import ast
import re
import logging
from dataclasses import dataclass, field
import os
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple, Any
from pathlib import Path

logger = logging.getLogger(__name__)


[תיעוד] class RefactorType(Enum): """סוגי רפקטורינג נתמכים""" SPLIT_FUNCTIONS = "split_functions" # פיצול קובץ גדול לפונקציות EXTRACT_FUNCTIONS = "extract_functions" # חילוץ קוד חוזר MERGE_SIMILAR = "merge_similar" # מיזוג קוד דומה CONVERT_TO_CLASSES = "convert_to_classes" # המרה למחלקות DEPENDENCY_INJECTION = "dependency_injection" # DI
[תיעוד] @dataclass class FunctionInfo: """מידע על פונקציה""" name: str start_line: int end_line: int args: List[str] returns: Optional[str] decorators: List[str] docstring: Optional[str] calls: Set[str] called_by: Set[str] = field(default_factory=set) code: str = "" complexity: int = 0 # שיוך לפי סעיף (Section) אם זוהה מהקובץ המונוליתי section: Optional[str] = None
[תיעוד] @dataclass class ClassInfo: """מידע על מחלקה""" name: str start_line: int end_line: int methods: List[FunctionInfo] attributes: List[str] base_classes: List[str] decorators: List[str] docstring: Optional[str] # קוד המקור של המחלקה code: str = "" # שיוך לפי סעיף (Section) אם זוהה section: Optional[str] = None
[תיעוד] @dataclass class RefactorProposal: """הצעת רפקטורינג""" refactor_type: RefactorType original_file: str new_files: Dict[str, str] description: str changes_summary: List[str] warnings: List[str] = field(default_factory=list) imports_needed: Dict[str, List[str]] = field(default_factory=dict)
[תיעוד] @dataclass class RefactorResult: """תוצאת רפקטורינג""" success: bool proposal: Optional[RefactorProposal] error: Optional[str] = None validation_passed: bool = False
FunctionNode = ast.FunctionDef | ast.AsyncFunctionDef
[תיעוד] class CodeAnalyzer: """מנתח קוד Python"""
[תיעוד] def __init__(self, code: str, filename: str = "unknown.py"): self.code = code self.filename = filename self.tree: Optional[ast.AST] = None self.functions: List[FunctionInfo] = [] self.classes: List[ClassInfo] = [] self.imports: List[str] = [] self.global_vars: List[str] = [] # מיפוי שורה -> slug של סעיף, לפי כותרות בקובץ self._sections_by_line: Dict[int, str] = {}
[תיעוד] def analyze(self) -> bool: """ניתוח הקוד""" try: self.tree = ast.parse(self.code) self._extract_sections() self._extract_imports() self._extract_functions() self._extract_classes() self._extract_globals() self._calculate_dependencies() return True except SyntaxError as e: logger.error(f"שגיאת תחביר בקוד: {e}") return False except Exception as e: logger.error(f"שגיאה בניתוח: {e}", exc_info=True) return False
def _extract_imports(self) -> None: for node in ast.walk(self.tree): if isinstance(node, ast.Import): for alias in node.names: self.imports.append(f"import {alias.name}") elif isinstance(node, ast.ImportFrom): module = node.module or "" names = ", ".join(alias.name for alias in node.names) self.imports.append(f"from {module} import {names}") def _extract_functions(self) -> None: for node in ast.walk(self.tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and self._is_top_level_function(node): func_info = self._parse_function(node) # שיוך לסעיף לפי מיקום השורה בקוד func_info.section = self._get_section_for_line(func_info.start_line) self.functions.append(func_info) def _extract_classes(self) -> None: for node in ast.walk(self.tree): if isinstance(node, ast.ClassDef): class_info = self._parse_class(node) class_info.section = self._get_section_for_line(class_info.start_line) self.classes.append(class_info) def _extract_globals(self) -> None: for node in self.tree.body: if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): self.global_vars.append(target.id) def _parse_function(self, node: FunctionNode) -> FunctionInfo: args = [arg.arg for arg in node.args.args] returns = ast.unparse(node.returns) if node.returns else None decorators = [ast.unparse(dec) for dec in node.decorator_list] docstring = ast.get_docstring(node) calls = self._extract_function_calls(node) code_lines = self.code.splitlines()[node.lineno - 1 : node.end_lineno] code = "\n".join(code_lines) complexity = self._calculate_complexity(node) return FunctionInfo( name=node.name, start_line=node.lineno, end_line=node.end_lineno or node.lineno, args=args, returns=returns, decorators=decorators, docstring=docstring, calls=calls, code=code, complexity=complexity, ) def _parse_class(self, node: ast.ClassDef) -> ClassInfo: methods: List[FunctionInfo] = [] for item in node.body: if isinstance(item, ast.FunctionDef): methods.append(self._parse_function(item)) attributes: List[str] = [] for item in node.body: if isinstance(item, ast.Assign): for target in item.targets: if isinstance(target, ast.Name): attributes.append(target.id) base_classes = [ast.unparse(base) for base in node.bases] decorators = [ast.unparse(dec) for dec in node.decorator_list] docstring = ast.get_docstring(node) code_lines = self.code.splitlines()[node.lineno - 1 : node.end_lineno] code = "\n".join(code_lines) return ClassInfo( name=node.name, start_line=node.lineno, end_line=node.end_lineno or node.lineno, methods=methods, attributes=attributes, base_classes=base_classes, decorators=decorators, docstring=docstring, code=code, ) def _extract_function_calls(self, node: ast.AST) -> Set[str]: calls: Set[str] = set() for child in ast.walk(node): if isinstance(child, ast.Call): if isinstance(child.func, ast.Name): calls.add(child.func.id) elif isinstance(child.func, ast.Attribute): calls.add(child.func.attr) return calls def _calculate_complexity(self, node: ast.AST) -> int: complexity = 1 for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity += 1 elif isinstance(child, ast.BoolOp) and isinstance(child.op, (ast.And, ast.Or)): complexity += len(child.values) - 1 return complexity def _is_top_level_function(self, node: FunctionNode) -> bool: for parent in ast.walk(self.tree): if isinstance(parent, ast.ClassDef): if node in parent.body: return False return True # --- זיהוי כותרות/סעיפים בקובץ (למשל "# 1) USER MANAGEMENT") --- def _extract_sections(self) -> None: """ מאתר כותרות סעיפים בקובץ כדי לשמר קוהזיה סמנטית בפיצול. תומך בתבניות כגון: "# 1) USER MANAGEMENT" "# 2) PAYMENTS + SUBSCRIPTIONS" וכן וריאציות עם ריבוי תווי '#'. """ self._sections_by_line = {} lines = self.code.splitlines() for idx, raw in enumerate(lines, start=1): line = raw.strip() # דלג על מפרידי "#####" if not line.startswith("#"): continue # הסר תווי '#' text = line.lstrip("#").strip() if not text: continue # נתמוך בתבנית "N) " בתחילת הקו אך לא נדרוש זאת m = re.match(r"^(\d+\)\s*)?(.+)$", text) if not m: continue title = m.group(2).strip() slug = self._section_to_slug(title) if slug: self._sections_by_line[idx] = slug else: # כותרת שלא זוהתה נחשבת להפסקת סעיף קיים כדי לאפשר אזור "ללא סעיף" self._sections_by_line[idx] = None def _section_to_slug(self, title: str) -> Optional[str]: t = title.lower() mapping = [ (("user management", "users"), "users"), (("payments", "subscriptions", "billing", "finance"), "finance"), (("file system", "files", "storage"), "files"), (("inventory", "products"), "inventory"), (("network", "api clients", "api"), "api_clients"), (("analytics", "reports", "report"), "analytics"), (("notifications", "email"), "notifications"), (("permissions", "auth"), "permissions"), (("workflow", "pipelines"), "workflows"), (("debug", "temp", "mixed"), "debug"), (("random utilities", "utilities", "utils"), "utils"), (("application boot", "main"), "main"), ] for keys, slug in mapping: if any(k in t for k in keys): return slug return None def _get_section_for_line(self, line_no: int) -> Optional[str]: if not self._sections_by_line: return None keys = sorted(self._sections_by_line.keys()) prev = None for k in keys: if k <= line_no: prev = k else: break return self._sections_by_line.get(prev) if prev is not None else None def _calculate_dependencies(self) -> None: func_names = {f.name for f in self.functions} for func in self.functions: for call in func.calls: if call in func_names: for other_func in self.functions: if other_func.name == call: other_func.called_by.add(func.name)
[תיעוד] def find_large_functions(self, min_lines: int = 50) -> List[FunctionInfo]: large: List[FunctionInfo] = [] for func in self.functions: lines_count = func.end_line - func.start_line + 1 if lines_count >= min_lines or func.complexity >= 10: large.append(func) return large
[תיעוד] def find_large_classes(self, min_methods: int = 10) -> List[ClassInfo]: return [cls for cls in self.classes if len(cls.methods) >= min_methods]
[תיעוד] class RefactoringEngine: """מנוע רפקטורינג""" CANONICAL_DOMAIN_PRIORITY: Dict[str, int] = { "users": 0, "finance": 1, "inventory": 2, "network": 3, "workflows": 4, } STICKY_SUPPORT_DOMAINS: Set[str] = {"helpers", "utils"}
[תיעוד] def __init__(self) -> None: self.analyzer: Optional[CodeAnalyzer] = None # תצורה: שליטה במספר קבוצות/מודולים כדי למנוע Oversplitting self.preferred_min_groups: int = 3 self.preferred_max_groups: int = 5 self.min_functions_per_group: int = 2 self.absolute_max_groups: int = 8 self._shared_symbol_registry: Dict[str, Dict[str, Tuple[str, str]]] = {}
[תיעוד] def propose_refactoring( self, code: str, filename: str, refactor_type: RefactorType, layered_mode: Optional[bool] = None, ) -> RefactorResult: """הצעת רפקטורינג""" self.analyzer = CodeAnalyzer(code, filename) if not self.analyzer.analyze(): return RefactorResult( success=False, proposal=None, error="כשל בניתוח הקוד - ייתכן שגיאת תחביר", ) # בקשת שכבות לפי-קריאה (ללא זיהום ENV גלובלי) if layered_mode is not None: setattr(self, "_layered_mode_override", bool(layered_mode)) try: if refactor_type == RefactorType.SPLIT_FUNCTIONS: proposal = self._split_functions() elif refactor_type == RefactorType.EXTRACT_FUNCTIONS: proposal = self._extract_functions() elif refactor_type == RefactorType.MERGE_SIMILAR: proposal = self._merge_similar() elif refactor_type == RefactorType.CONVERT_TO_CLASSES: proposal = self._convert_to_classes() elif refactor_type == RefactorType.DEPENDENCY_INJECTION: proposal = self._add_dependency_injection() else: return RefactorResult(success=False, proposal=None, error=f"סוג רפקטורינג לא נתמך: {refactor_type}") validated = self._validate_proposal(proposal) return RefactorResult(success=True, proposal=proposal, validation_passed=validated) except Exception as e: logger.error(f"שגיאה ברפקטורינג: {e}", exc_info=True) return RefactorResult(success=False, proposal=None, error=f"שגיאה: {str(e)}") finally: if hasattr(self, "_layered_mode_override"): try: delattr(self, "_layered_mode_override") except Exception: pass
def _split_functions(self) -> RefactorProposal: # מצב מיוחד: קובץ מודלים מונוליתי (classes בלבד) — Safe Decomposition לדומיינים בתוך חבילת models/ if self.analyzer and not self.analyzer.functions and self.analyzer.classes: filename_stem = Path(self.analyzer.filename).stem if filename_stem == "models": return self._split_models_monolith() groups = self._group_related_functions() if len(groups) <= 1: raise ValueError("לא נמצאו קבוצות פונקציות נפרדות. הקוד כבר מאורגן היטב.") new_files: Dict[str, str] = {} changes: List[str] = [] imports_needed: Dict[str, List[str]] = {} per_file_filtered_imports: Dict[str, List[str]] = {} base_name = Path(self.analyzer.filename).stem # עדיפות לעקיפת שכבות לפי-קריאה; אחרת – ENV override = getattr(self, "_layered_mode_override", None) layered_mode = bool(override) if override is not None else ( str(os.getenv("REFACTOR_LAYERED_MODE", "")).strip().lower() in ("1", "true", "yes", "on") ) # הקצאת מחלקות לקבוצות (Collocation) classes_by_group = self._assign_classes_to_groups(groups) # במצב שכבות (Layered) – דחוף את כל המחלקות לקובץ Leaf יחיד classes_filename: Optional[str] = None if layered_mode and (self.analyzer and self.analyzer.classes): classes_filename, classes_file_content = self._build_classes_file("models") # נשתמש בקובץ סטנדרטי בשם models.py ולא בשם מבוסס קלט classes_filename = "models.py" new_files[classes_filename] = classes_file_content changes.append(f"📦 {classes_filename}: ריכוז ישויות (Leaf)") # בניית קבצי דומיין module_stem_by_group: Dict[str, str] = {} for group_name, functions in groups.items(): new_filename = self._choose_filename_for_group(base_name, group_name) module_stem_by_group[group_name] = Path(new_filename).stem group_classes = [] if layered_mode else classes_by_group.get(group_name, []) # סינון imports לפי שימוש אמיתי בקוד המשולב combined_body_parts: List[str] = [] for c in group_classes: combined_body_parts.append(c.code) for f in functions: combined_body_parts.append(f.code) group_code_body = "\n\n".join(combined_body_parts) filtered_imports = self._filter_imports_for_code(self.analyzer.imports, group_code_body) per_file_filtered_imports[new_filename] = filtered_imports file_content = self._build_file_content(functions, imports=filtered_imports, classes=group_classes) new_files[new_filename] = file_content changes.append(f"📦 {new_filename}: {len(group_classes)} מחלקות, {len(functions)} פונקציות") # שמירה לאחור: imports_needed מכיל את ה-imports המקוריים (לצרכי תאימות בדוחות) imports_needed[new_filename] = self.analyzer.imports.copy() init_content = self._build_init_file(list(new_files.keys())) new_files["__init__.py"] = init_content changes.append("📦 __init__.py: מייצא את כל ה-API") # איחוד imports משותפים לקובץ shared והחלפתם ב-import יחיד original_keys = set(new_files.keys()) new_files = self._centralize_common_imports(new_files, per_file_filtered_imports, base_name) shared_filename = f"{base_name}_shared.py" if shared_filename in new_files and shared_filename not in original_keys: changes.append(f"📦 {shared_filename}: ייבוא משותף מרוכז") # אין יותר קובץ מחלקות מרכזי – מחלקות מרוכזות לפי דומיין (Collocation) # הזרקת יבוא לפונקציות בין-מודוליות (Cross-module function imports) func_to_module: Dict[str, str] = {} for group_name, functions in groups.items(): module_stem = module_stem_by_group.get(group_name, f"{base_name}_{group_name}") for f in functions: func_to_module[f.name] = module_stem new_files = self._inject_function_imports(new_files, func_to_module) # הזרקת יבוא למחלקות בין-מודוליות (Cross-module class imports) if layered_mode and classes_filename: # במצב שכבות – כל המחלקות ב-models.py, הזרק import ממנו new_files = self._inject_class_imports(new_files, classes_filename) else: class_to_module: Dict[str, str] = {} for group_name, classes in classes_by_group.items(): module_stem = module_stem_by_group.get(group_name, f"{base_name}_{group_name}") for c in classes: class_to_module[c.name] = module_stem if class_to_module: new_files = self._inject_cross_module_class_imports(new_files, class_to_module) # DRY-RUN: זיהוי ומניעת תלות מעגלית בין המודולים שנוצרו cycle_warnings: List[str] = [] new_files, merged_pairs, shared_bridges, skipped_canonical_cycles = self._resolve_circular_imports( new_files, base_name ) if merged_pairs: for a, b in merged_pairs: cycle_warnings.append(f"♻️ פורקה תלות מעגלית באמצעות מיזוג המודולים: {a}{b}") # עדכון __init__.py לאחר מיזוגים module_file_names = [fn for fn in new_files.keys() if fn.endswith('.py') and fn != '__init__.py'] new_files['__init__.py'] = self._build_init_file(module_file_names) if shared_bridges: for comp in shared_bridges: cycle_warnings.append( f"♻️ פורקה תלות מעגלית באמצעות שכבת shared: {' ⇄ '.join(comp)}" ) if skipped_canonical_cycles: for comp in skipped_canonical_cycles: cycle_warnings.append( f"⚠️ נותרה תלות מעגלית בין מודולים קנוניים שנשמרו לשימור דומיין: {' ⇄ '.join(comp)}" ) # ניקוי פוסט-פיצול: הסרת imports מיותרים ברמת קובץ new_files = self.post_refactor_cleanup(new_files) description = ( f"🏗️ מצאתי {len(self.analyzer.classes)} מחלקות ו-{len(self.analyzer.functions)} פונקציות.\n\n" f"הצעת פיצול לפי דומיין (Collocation):\n" f"📦 {self.analyzer.filename}\n" ) for fname in new_files.keys(): description += f" ├── {fname}\n" description += "\n✅ מחלקות ופונקציות מרוכזות יחד בקבצי דומיין; אין קובץ מחלקות גנרי" warnings: List[str] = [] if len(self.analyzer.global_vars) > 0: warnings.append( f"⚠️ יש {len(self.analyzer.global_vars)} משתנים גלובליים - עלול להיות צורך בהתאמה ידנית" ) warnings.extend(cycle_warnings) return RefactorProposal( refactor_type=RefactorType.SPLIT_FUNCTIONS, original_file=self.analyzer.filename, new_files=new_files, description=description, changes_summary=changes, warnings=warnings, imports_needed=imports_needed, ) def _group_related_functions(self) -> Dict[str, List[FunctionInfo]]: """ קיבוץ פונקציות לפי קוהזיה סמנטית: 1) סעיף (Section) אם זוהה מתוך כותרות הקובץ 2) דומיין (IO/Compute/Helpers) 3) Prefix ותלויות תוך שמירה על טווח קבוצות מועדף והימנעות מפיצול יתר. """ if not self.analyzer: return {} functions = list(self.analyzer.functions) if len(functions) <= 1: section_groups = self._scaffold_groups_from_sections(functions) if section_groups: return section_groups return {"module": functions} if len(functions) == 2: return { f"group_{idx+1}_{func.name}": [func] for idx, func in enumerate(functions) } # 1) קיבוץ לפי סעיף (אם קיים). אם יש פונקציות ללא סעיף — נשמר אותן ע"י קיבוץ דומייני והוספה. section_groups = self._group_by_section(functions) if section_groups: leftovers = [f for f in functions if not f.section] if leftovers: domain_for_leftovers = self._group_by_domain(leftovers) for k, v in domain_for_leftovers.items(): section_groups.setdefault(k, []).extend(v) else: # אין כותרות כלל — קיבוץ דומייני מלא section_groups = self._group_by_domain(functions) # 2) קיבוץ נוסף לפי prefix בתוך כל דומיין, לשמירת קרבה סמנטית refined: Dict[str, List[FunctionInfo]] = {} for domain, funcs in section_groups.items(): sub = self._group_by_prefix(funcs) large_sub = {f"{domain}_{k}": v for k, v in sub.items() if len(v) >= self.min_functions_per_group} if large_sub: refined.update(large_sub) leftovers = [f for k, v in sub.items() if len(v) < self.min_functions_per_group for f in v] if leftovers: refined.setdefault(domain, []).extend(leftovers) else: refined[domain] = funcs # 3) מיזוג לפי תלות (affinity) כדי לאחד קבוצות קרובות refined = self._merge_by_dependency_affinity(refined) # 4) מיזוג קבוצות קטנות מדי refined = self._merge_small_groups(refined) # 5) הגבלת מספר קבוצות לטווח המועדף/מקסימלי refined = self._limit_group_count(refined) # הבטחת מינימום שתי קבוצות כשיש מספיק פונקציות if len(refined) < 2 and len(functions) >= 4: fallback = self._group_by_prefix(functions) sorted_groups = sorted(fallback.items(), key=lambda kv: -len(kv[1])) if len(sorted_groups) >= 2: # קח שתי קבוצות מובילות ואז מיזוג של יתר הקבוצות — לא זורקים פונקציות! refined = { sorted_groups[0][0]: list(sorted_groups[0][1]), sorted_groups[1][0]: list(sorted_groups[1][1]), } for name, funcs in sorted_groups[2:]: # בחר יעד מיזוג לפי affinity אל אחת הקבוצות הקיימות best_target = None best_score = -1.0 for target_name, target_funcs in refined.items(): score = self._group_affinity(funcs, target_funcs) if score > best_score: best_score = score best_target = target_name if best_target is None: # גיבוי: מזג לקבוצה הקטנה יותר כדי לאזן best_target = min(refined.keys(), key=lambda k: len(refined[k])) refined[best_target].extend(funcs) else: refined = {"module": functions} # ייצוב שמות קבוצות stable: Dict[str, List[FunctionInfo]] = {} seen: Set[str] = set() for name, funcs in refined.items(): base = re.sub(r"[^a-z0-9_]", "_", name.lower()) if base in seen: idx = 2 while f"{base}{idx}" in seen: idx += 1 base = f"{base}{idx}" seen.add(base) stable[base] = funcs return stable def _scaffold_groups_from_sections(self, functions: List[FunctionInfo]) -> Optional[Dict[str, List[FunctionInfo]]]: """ כאשר יש מעט מאוד פונקציות אך קיימות מחלקות בסעיפים שונים, נבנה קבוצות לפי הסעיפים כדי לאפשר פיצול שעדיין ישמר את ההקשרים הסמנטיים (למשל Users לעומת Analytics). """ if not self.analyzer: return None entries: Dict[str, Dict[str, Any]] = {} def _ensure_entry(section: str, start_line: int) -> Dict[str, Any]: data = entries.setdefault(section, {"start": start_line, "funcs": []}) data["start"] = min(data["start"], start_line) return data for func in functions: if func.section: entry = _ensure_entry(func.section, func.start_line) entry["funcs"].append(func) for cls in self.analyzer.classes: if cls.section: _ensure_entry(cls.section, cls.start_line) if len(entries) < 2: return None ordered_sections = sorted(entries.items(), key=lambda kv: kv[1]["start"]) groups: Dict[str, List[FunctionInfo]] = { section: list(entries[section]["funcs"]) for section, _ in ordered_sections } leftovers = [f for f in functions if not f.section] if leftovers: first_section = ordered_sections[0][0] groups.setdefault(first_section, []).extend(leftovers) return groups def _group_by_section(self, functions: List[FunctionInfo]) -> Dict[str, List[FunctionInfo]]: """קיבוץ פונקציות לפי סעיף (Section) אם קיים.""" groups: Dict[str, List[FunctionInfo]] = {} for f in functions: if f.section: groups.setdefault(f.section, []).append(f) return groups def _group_by_dependencies(self) -> Dict[str, List[FunctionInfo]]: groups: Dict[str, List[FunctionInfo]] = {} visited: Set[str] = set() for func in self.analyzer.functions: if func.name in visited: continue group_name = func.name.replace('_', '')[:15] group = [func] visited.add(func.name) for other_func in self.analyzer.functions: if other_func.name in visited: continue if ( func.name in other_func.calls or other_func.name in func.calls or func.name in other_func.called_by or other_func.name in func.called_by ): group.append(other_func) visited.add(other_func.name) if len(group) >= 2: groups[group_name] = group return groups def _split_by_size(self) -> Dict[str, List[FunctionInfo]]: max_funcs_per_file = 5 groups: Dict[str, List[FunctionInfo]] = {} for i in range(0, len(self.analyzer.functions), max_funcs_per_file): group_name = f"module{i // max_funcs_per_file + 1}" groups[group_name] = self.analyzer.functions[i : i + max_funcs_per_file] return groups # ==== קיבוץ משופר: דומיין/Prefix/תלות ==== def _tokenize_name(self, name: str) -> List[str]: parts = re.split(r'[_]|(?=[A-Z])', name) return [p.lower() for p in parts if p] def _classify_function_domain(self, func: FunctionInfo) -> str: """סיווג פונקציה ל-domain בסיסי: io / compute / helpers / other.""" tokens = self._tokenize_name(func.name) calls = {c.lower() for c in func.calls} io_keywords = {"load", "save", "fetch", "read", "write", "open", "close", "connect", "request", "download", "upload", "send", "receive", "persist", "store", "serialize", "deserialize"} helper_keywords = {"helper", "util", "utils", "format", "convert", "parse", "normalize", "validate", "cleanup"} if any(tok in io_keywords for tok in tokens): return "io" if any(tok in helper_keywords for tok in tokens): return "helpers" io_calls = {"open", "requests", "httpx", "urllib", "json", "yaml", "pickle", "asyncio", "sqlite3", "psycopg2"} if calls & io_calls: return "io" return "compute" def _group_by_domain(self, functions: List[FunctionInfo]) -> Dict[str, List[FunctionInfo]]: groups: Dict[str, List[FunctionInfo]] = {"io": [], "compute": [], "helpers": []} for f in functions: domain = self._classify_function_domain(f) groups.setdefault(domain, []).append(f) return {k: v for k, v in groups.items() if v} def _group_by_prefix(self, functions: List[FunctionInfo]) -> Dict[str, List[FunctionInfo]]: groups: Dict[str, List[FunctionInfo]] = {} for func in functions: tokens = self._tokenize_name(func.name) prefix = tokens[0] if tokens else "module" groups.setdefault(prefix, []).append(func) return groups def _name_similarity(self, a: str, b: str) -> float: ta = set(self._tokenize_name(a)) tb = set(self._tokenize_name(b)) if not ta or not tb: return 0.0 inter = len(ta & tb) union = len(ta | tb) return inter / union if union else 0.0 def _group_affinity(self, g1: List[FunctionInfo], g2: List[FunctionInfo]) -> float: if not g1 or not g2: return 0.0 name_score = 0.0 dep_score = 0.0 pairs = 0 for f1 in g1: for f2 in g2: pairs += 1 name_score += self._name_similarity(f1.name, f2.name) if (f1.name in f2.calls) or (f2.name in f1.calls) or (f1.name in f2.called_by) or (f2.name in f1.called_by): dep_score += 1.0 if pairs == 0: return 0.0 return 0.6 * (name_score / pairs) + 0.4 * (dep_score / pairs) def _merge_by_dependency_affinity(self, groups: Dict[str, List[FunctionInfo]]) -> Dict[str, List[FunctionInfo]]: items = list(groups.items()) if len(items) <= 1: return groups changed = True protected_groups = set(self.CANONICAL_DOMAIN_PRIORITY.keys()) | self.STICKY_SUPPORT_DOMAINS while changed: changed = False merge_candidates = [kv for kv in items if kv[0] not in protected_groups] if not merge_candidates: break smallest = min(merge_candidates, key=lambda kv: len(kv[1])) if len(smallest[1]) >= self.min_functions_per_group or len(items) <= self.preferred_min_groups: break best_idx = None best_score = -1.0 for i, (name, funcs) in enumerate(items): if name == smallest[0]: continue score = self._group_affinity(smallest[1], funcs) if score > best_score: best_score = score best_idx = i if best_idx is not None and best_score >= 0.1: target_name, target_funcs = items[best_idx] target_funcs.extend(smallest[1]) items = [(n, fs) for (n, fs) in items if n != smallest[0]] changed = True return dict(items) def _merge_small_groups(self, groups: Dict[str, List[FunctionInfo]]) -> Dict[str, List[FunctionInfo]]: items = list(groups.items()) # אם יש רק 2 קבוצות — נשאיר אותן כדי למנוע התמזגות למודול יחיד if len(items) <= 2: return groups protected_groups = set(self.CANONICAL_DOMAIN_PRIORITY.keys()) | self.STICKY_SUPPORT_DOMAINS large: List[Tuple[str, List[FunctionInfo]]] = [ (n, fs) for n, fs in items if len(fs) >= self.min_functions_per_group or n in protected_groups ] small: List[Tuple[str, List[FunctionInfo]]] = [ (n, fs) for n, fs in items if len(fs) < self.min_functions_per_group and n not in protected_groups ] if not small: return groups if not large: return groups for sname, sfuncs in small: best_name = None best_score = -1.0 for lname, lfuncs in large: score = self._group_affinity(sfuncs, lfuncs) if score > best_score: best_score = score best_name = lname if best_name is None: best_name = large[0][0] for i, (lname, lfuncs) in enumerate(large): if lname == best_name: lfuncs.extend(sfuncs) large[i] = (lname, lfuncs) break return dict(large) def _limit_group_count(self, groups: Dict[str, List[FunctionInfo]]) -> Dict[str, List[FunctionInfo]]: items = list(groups.items()) while len(items) > self.absolute_max_groups: best_pair = None best_score = -1.0 for i in range(len(items)): for j in range(i + 1, len(items)): score = self._group_affinity(items[i][1], items[j][1]) if score > best_score: best_score = score best_pair = (i, j) if best_pair is None: break i, j = best_pair items[i] = (items[i][0], items[i][1] + items[j][1]) del items[j] if len(items) > self.preferred_max_groups: while len(items) > self.preferred_max_groups: smallest_idx = min(range(len(items)), key=lambda k: len(items[k][1])) best_idx = None best_score = -1.0 for i in range(len(items)): if i == smallest_idx: continue score = self._group_affinity(items[smallest_idx][1], items[i][1]) if score > best_score: best_score = score best_idx = i if best_idx is None: break items[best_idx] = (items[best_idx][0], items[best_idx][1] + items[smallest_idx][1]) del items[smallest_idx] return dict(items) def _merge_singletons_for_oop(self, groups: Dict[str, List[FunctionInfo]]) -> Dict[str, List[FunctionInfo]]: """ ממזג קבוצות קטנות (פחות מ-min_functions_per_group) אל קבוצה דומה, כדי שלא יאבדו פונקציות כאשר מחלקות נוצרות רק עבור קבוצות עם 2+ פונקציות. """ if len(groups) < 2: return groups # עבודה על העתק כדי לשנות בבטחה names = list(groups.keys()) for name in names: funcs = groups.get(name, []) if len(funcs) >= self.min_functions_per_group: continue # מצא יעד מיזוג best_name = None best_score = -1.0 for target_name, target_funcs in groups.items(): if target_name == name: continue score = self._group_affinity(funcs, target_funcs) if score > best_score: best_score = score best_name = target_name if best_name is None: # גיבוי: בחר את הקבוצה הגדולה ביותר best_name = max((k for k in groups.keys() if k != name), key=lambda k: len(groups[k])) groups[best_name].extend(funcs) if name in groups: del groups[name] return groups # ==== Safe Decomposition for models.py (classes-only) ==== def _classify_class_domain(self, cls: ClassInfo) -> str: """ מסווג מחלקה לדומיין בסיסי כאשר אין פונקציות טופ-לבל: core / billing / inventory / network / workflows. ברירת מחדל: core. """ name_l = cls.name.lower() # סימנים חזקים if any(k in name_l for k in ("subscription", "payment", "billing", "gateway")): return "billing" if any(k in name_l for k in ("product", "inventory", "stock")): return "inventory" if any(k in name_l for k in ("api", "client", "http", "network")): return "network" if any(k in name_l for k in ("workflow", "pipeline")): return "workflows" # ליבה/משתמשים/הרשאות/אימייל if any(k in name_l for k in ("user", "permission", "email", "manager")): return "core" return "core" def _extract_module_global_assignments(self) -> Tuple[List[str], str]: """ מחזיר (שמות גלובליים לפי סדר הופעה, קוד ההקצאות) מתוך הקובץ המקורי ברמת מודול. משמש בשלב Safe Decomposition ל-models.py כדי לשמר קבועים/משתנים גלובליים. """ if not self.analyzer or not getattr(self.analyzer, "tree", None): return [], "" names_in_order: List[str] = [] seen: Set[str] = set() code_blocks: List[str] = [] lines = self.analyzer.code.splitlines() for node in getattr(self.analyzer.tree, "body", []): # type: ignore[attr-defined] if isinstance(node, (ast.Assign, ast.AnnAssign)): # שחזור קוד מקור של ההקצאה start = max(1, getattr(node, "lineno", 1)) end = getattr(node, "end_lineno", start) snippet = "\n".join(lines[start - 1 : end]) code_blocks.append(snippet) # איסוף שמות targets: List[ast.AST] = [] if isinstance(node, ast.Assign): targets = list(node.targets) else: targets = [node.target] # type: ignore[attr-defined] for t in targets: for n in ast.walk(t): if isinstance(n, ast.Name) and n.id not in seen: names_in_order.append(n.id) seen.add(n.id) return names_in_order, ("\n".join(code_blocks).strip() + ("\n" if code_blocks else "")) def _split_models_monolith(self) -> RefactorProposal: """ פיצול בטוח של קובץ models.py מונוליתי לתת-מודולים דומייניים תחת models/. - core.py: User, UserManager, PermissionSystem, EmailService וכו' - billing.py: SubscriptionManager, PaymentGateway וכו' - inventory.py: Product, Inventory וכו' - network.py/workflows.py לפי הצורך """ assert self.analyzer is not None classes = list(self.analyzer.classes or []) if not classes: raise ValueError("לא נמצאו מחלקות לפיצול בתוך models.py") # קיבוץ לפי דומיין domain_to_classes: Dict[str, List[ClassInfo]] = {} for c in classes: domain = self._classify_class_domain(c) domain_to_classes.setdefault(domain, []).append(c) # סדר יציב להצגה preferred_order = ["core", "billing", "inventory", "network", "workflows"] ordered_domains = [d for d in preferred_order if d in domain_to_classes] + [ d for d in domain_to_classes.keys() if d not in preferred_order ] new_files: Dict[str, str] = {} changes: List[str] = [] # חילוץ משתנים גלובליים ברמת המודול כדי לשמרם בפיצול (למניעת NameError) global_names, globals_code = self._extract_module_global_assignments() # בניית קבצים תחת models/ for domain in ordered_domains: cls_list = domain_to_classes.get(domain, []) if not cls_list: continue # סינון imports לפי שימוש במחלקות הדומיין code_body = "\n\n".join(c.code for c in cls_list) filtered_imports = self._filter_imports_for_code(self.analyzer.imports, code_body) # בקובץ core נזריק את ההקצאות הגלובליות אחרי imports ולפני המחלקות if domain == "core" and globals_code.strip(): title = "מחלקות: " + ", ".join(c.name for c in cls_list) parts: List[str] = [] parts.append(f'"""\nמודול עבור: {title}\n"""\n') parts.extend(filtered_imports) parts.append("") parts.append(globals_code.rstrip()) parts.append("") for c in cls_list: parts.append(c.code) parts.append("\n") content = "\n".join(parts) else: content = self._build_file_content(functions=[], imports=filtered_imports, classes=cls_list) filename = f"models/{domain}.py" new_files[filename] = content changes.append(f"📦 {filename}: {len(cls_list)} מחלקות") # __init__ לחבילת models/ models_module_files = [fn for fn in new_files.keys() if fn.startswith("models/") and fn.endswith(".py")] new_files["models/__init__.py"] = self._build_init_file(models_module_files) changes.append("📦 models/__init__.py: מייצא את כל הישויות") # הזרקת יבוא בין-מודולי למחלקות (למשל billing → core) class_to_module: Dict[str, str] = {} for domain, cls_list in domain_to_classes.items(): for c in cls_list: class_to_module[c.name] = domain new_files = self._inject_cross_module_class_imports(new_files, class_to_module) # הזרקת יבוא למשתנים גלובליים שנשמרו ב-core אל מודולים אחרים הצורכים אותם if global_names: new_files = self._inject_global_imports(new_files, set(global_names), source_module_stem="core") # DRY-RUN: זיהוי/פירוק מעגליות בתוך models/ בלבד subset = {k: v for k, v in new_files.items() if k.startswith("models/")} subset, merged_pairs, shared_bridges, skipped_canonical_cycles = self._resolve_circular_imports( subset, "models" ) if merged_pairs: # עדכון __init__.py של models/ לאחר מיזוג module_file_names = [fn for fn in subset.keys() if fn.endswith(".py") and not fn.endswith("__init__.py")] subset["models/__init__.py"] = self._build_init_file(module_file_names) cycle_warnings: List[str] = [] if shared_bridges: for comp in shared_bridges: cycle_warnings.append( f"♻️ פורקה תלות מעגלית בתוך models/ באמצעות שכבת shared: {' ⇄ '.join(comp)}" ) if skipped_canonical_cycles: for comp in skipped_canonical_cycles: cycle_warnings.append( f"⚠️ נותרה תלות מעגלית בין מודולים קנוניים בתוך models/: {' ⇄ '.join(comp)}" ) # מיזוג חזרה אל המפה הכוללת for k in list(new_files.keys()): if k.startswith("models/"): del new_files[k] new_files.update(subset) # ניקוי imports מיותרים new_files = self.post_refactor_cleanup(new_files) description = "🏗️ פיצול בטוח של models.py לתת-מודולים דומייניים תחת models/:\n" for fn in sorted(new_files.keys()): if fn.startswith("models/") and fn.endswith(".py"): description += f" ├── {fn}\n" # אזהרות warnings: List[str] = [] if global_names: warnings.append(f"ℹ️ נשמרו {len(global_names)} משתנים גלובליים מתוך models.py בתוך models/core.py.") warnings.extend(cycle_warnings) return RefactorProposal( refactor_type=RefactorType.SPLIT_FUNCTIONS, original_file=self.analyzer.filename, new_files=new_files, description=description, changes_summary=changes, warnings=warnings, imports_needed={}, ) def _build_file_content( self, functions: List[FunctionInfo], imports: Optional[List[str]] = None, classes: Optional[List[ClassInfo]] = None, ) -> str: """בונה תוכן קובץ חדש עבור קבוצת דומיין עם מחלקות ופונקציות ובלוק imports מסונן.""" content_parts: List[str] = [] func_names = ", ".join(f.name for f in functions) if functions else "" class_names = ", ".join(c.name for c in (classes or [])) if classes else "" title_parts: List[str] = [] if class_names: title_parts.append(f"מחלקות: {class_names}") if func_names: title_parts.append(f"פונקציות: {func_names}") title = " | ".join(title_parts) if title_parts else "דומיין" # תאימות לאחור: שמור את הסמן "מודול עבור" שנטען בטסטים content_parts.append(f'"""\nמודול עבור: {title}\n"""\n') imports_list = list(imports or self.analyzer.imports) content_parts.extend(imports_list) content_parts.append("\n") # סדר: מחלקות תחילה, אחר כך פונקציות – מפחית צורך ב-import פנימי for cls in (classes or []): content_parts.append(cls.code) content_parts.append("\n\n") for func in functions: content_parts.append(func.code) content_parts.append("\n\n") return "\n".join(content_parts) def _build_init_file(self, filenames: List[str]) -> str: content = '"""\nאינדקס מרכזי לכל הפונקציות\n"""\n\n' for fname in filenames: if os.path.basename(fname) == "__init__.py": continue if fname.endswith("_shared_core.py"): continue module_name = Path(fname).stem content += f"from .{module_name} import *\n" return content def _extract_functions(self) -> RefactorProposal: duplicates: List[Dict[str, Any]] = self._find_code_duplication() if not duplicates: raise ValueError("לא נמצא קוד חוזר מספיק משמעותי לחילוץ") new_files: Dict[str, str] = {} changes: List[str] = [] utils_content = self._build_utils_from_duplicates(duplicates) new_files["utils.py"] = utils_content changes.append(f"📦 utils.py: {len(duplicates)} פונקציות עזר חדשות") updated_original = self._replace_duplicates_with_calls(duplicates) new_files[self.analyzer.filename] = updated_original changes.append(f"✏️ {self.analyzer.filename}: עודכן לשימוש בפונקציות העזר") description = ( f"🔧 מצאתי {len(duplicates)} בלוקי קוד חוזרים.\n\n" "הצעת חילוץ:\n" "📦 יצירת utils.py עם פונקציות עזר\n" "📝 עדכון הקוד המקורי לשימוש בהן\n\n" "✅ קוד נקי יותר וללא כפילויות" ) return RefactorProposal( refactor_type=RefactorType.EXTRACT_FUNCTIONS, original_file=self.analyzer.filename, new_files=new_files, description=description, changes_summary=changes, ) def _find_code_duplication(self) -> List[Dict[str, Any]]: return [] def _build_utils_from_duplicates(self, duplicates: List[Dict[str, Any]]) -> str: return '"""\nפונקציות עזר משותפות\n"""\n\n# TODO: implement\n' def _replace_duplicates_with_calls(self, duplicates: List[Dict[str, Any]]) -> str: return self.analyzer.code # type: ignore[return-value] def _merge_similar(self) -> RefactorProposal: raise ValueError("פיצ'ר מיזוג קוד טרם יושם במלואו") def _convert_to_classes(self) -> RefactorProposal: if len(self.analyzer.functions) < 3: raise ValueError("אין מספיק פונקציות להמרה למחלקה") groups = self._group_related_functions() # מניעת God Class: אם יש רק קבוצה אחת והרבה פונקציות – נסה לפצל לפי דומיין if len(groups) == 1 and len(self.analyzer.functions) >= 6: domain_groups = self._group_by_domain(self.analyzer.functions) domain_groups = {k: v for k, v in domain_groups.items() if len(v) >= self.min_functions_per_group} if len(domain_groups) >= 2: groups = self._limit_group_count(domain_groups) # מניעת איבוד פונקציות: מיזוג קבוצות קטנות (singleton) לפני בניית המחלקות if any(len(v) < self.min_functions_per_group for v in groups.values()) and len(groups) >= 2: groups = self._merge_singletons_for_oop(groups) new_files: Dict[str, str] = {} changes: List[str] = [] for group_name, functions in groups.items(): if len(functions) < 2: continue class_name = self._generate_class_name(group_name) class_code = self._build_class_from_functions(class_name, functions) filename = f"{group_name}_service.py" new_files[filename] = class_code changes.append(f"📦 {filename}: מחלקה {class_name} עם {len(functions)} מתודות") if not new_files: raise ValueError("לא ניתן לקבץ את הפונקציות למחלקות משמעותיות") description = ( f"🎨 המרה ל-OOP:\n\n" f"מצאתי {len(self.analyzer.functions)} פונקציות.\n" f"הצעת המרה ל-{len(new_files)} מחלקות:\n\n" ) for fname in new_files.keys(): description += f" 📦 {fname}\n" description += "\n✅ ארכיטקטורה מונחית עצמים" return RefactorProposal( refactor_type=RefactorType.CONVERT_TO_CLASSES, original_file=self.analyzer.filename, new_files=new_files, description=description, changes_summary=changes, ) def _generate_class_name(self, prefix: str) -> str: words = prefix.split('_') return ''.join(word.capitalize() for word in words) def _build_class_from_functions(self, class_name: str, functions: List[FunctionInfo]) -> str: lines: List[str] = [] lines.append(f'"""') lines.append(f'{class_name} - מחלקה שנוצרה מרפקטורינג') lines.append(f'"""') lines.append('') lines.extend(self.analyzer.imports) lines.append('') lines.append('') lines.append(f'class {class_name}:') lines.append(f' """מחלקת שירות ל-{class_name.lower()}"""') lines.append('') lines.append(' def __init__(self):') lines.append(' """אתחול המחלקה"""') lines.append(' pass') lines.append('') for func in functions: method_code = self._convert_function_to_method(func) lines.append(method_code) lines.append('') return '\n'.join(lines) def _convert_function_to_method(self, func: FunctionInfo) -> str: method_lines = func.code.splitlines() def_line_idx = 0 for i, line in enumerate(method_lines): stripped = line.strip() if stripped.startswith('def ') or stripped.startswith('async def '): def_line_idx = i break def_line = method_lines[def_line_idx] if '(' in def_line: def_line = def_line.replace('(', '(self, ', 1) def_line = def_line.replace('(self, )', '(self)') method_lines[def_line_idx] = ' ' + def_line for i in range(len(method_lines)): if i != def_line_idx: method_lines[i] = ' ' + method_lines[i] return '\n'.join(method_lines) def _add_dependency_injection(self) -> RefactorProposal: raise ValueError("פיצ'ר DI טרם יושם במלואו") def _validate_proposal(self, proposal: RefactorProposal) -> bool: try: for filename, content in proposal.new_files.items(): if filename.endswith('.py'): ast.parse(content) return True except SyntaxError as e: logger.error(f"שגיאת תחביר בקובץ שנוצר: {e}") proposal.warnings.append(f"⚠️ שגיאת תחביר: {e}") return False # === Utilities for import cleanup and centralization === def _get_import_aliases(self, import_line: str) -> List[str]: """החזרת שמות שייובאו (alias/שם גלוי) מתוך שורת import אחת.""" try: tree = ast.parse(import_line) except Exception: return [] names: List[str] = [] for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: base = (alias.asname or alias.name).split('.')[0] names.append(base) elif isinstance(node, ast.ImportFrom): for alias in node.names: names.append(alias.asname or alias.name) return names def _extract_used_names(self, code: str) -> Set[str]: """החזרת כל השמות שבהם נעשה שימוש בקוד (לצורך בדיקת imports בשימוש).""" used: Set[str] = set() try: tree = ast.parse(code) except Exception: return used for node in ast.walk(tree): if isinstance(node, ast.Name): used.add(node.id) return used def _filter_imports_for_code(self, imports: List[str], code: str) -> List[str]: """מסנן imports כך שיופיעו רק אלו הנדרשים על פי שימוש בפונקציות הקבוצה.""" used = self._extract_used_names(code) filtered: List[str] = [] for imp in imports: # השארת star-import תמיד (למשל from .<base>_shared import *) if ' import *' in imp: filtered.append(imp) continue aliases = self._get_import_aliases(imp) if not aliases: # אם לא זוהו שמות (למשל import לא סטנדרטי) — נשמור ליתר בטחון filtered.append(imp) continue if any(alias in used for alias in aliases): filtered.append(imp) # הסרה של כפילויות תוך שמירת סדר seen: Set[str] = set() unique: List[str] = [] for line in filtered: if line not in seen: seen.add(line) unique.append(line) return unique def _centralize_common_imports( self, new_files: Dict[str, str], per_file_imports: Dict[str, List[str]], base_name: str, ) -> Dict[str, str]: """מאחד imports משותפים לכל המודולים לקובץ <base>_shared.py ומייבא ממנו בכל מודול.""" module_files = [fn for fn in new_files.keys() if fn != "__init__.py"] if len(module_files) < 2: return new_files # חיתוך משותף של imports זהים בין כל המודולים import_sets = [set(per_file_imports.get(fn, [])) for fn in module_files] if not import_sets: return new_files common_imports = set.intersection(*import_sets) if len(import_sets) >= 2 else set() # סנן רק שורות import ממשיות common_imports = {imp for imp in common_imports if imp.startswith('import ') or imp.startswith('from ')} if not common_imports: return new_files shared_module_stem = f"{base_name}_shared" shared_filename = f"{shared_module_stem}.py" # בנה קובץ imports משותפים shared_lines: List[str] = [] shared_lines.append('"""') shared_lines.append('ייבוא משותף לקבצים שנוצרו מרפקטורינג') shared_lines.append('"""') shared_lines.append('') shared_lines.extend(sorted(common_imports)) shared_content = "\n".join(shared_lines) + "\n" new_files[shared_filename] = shared_content # צור רשימת שמות שיובאו משותפת לכל המודולים # כדי למנוע הופעת המחרוזת "import os\n" בתוך שורת import יחסית (שמכשילה טסטים), # נשתמש ב-star import, מאחר וזה מותר בהקשר מודולים פנימיים שנוצרו אוטומטית. shared_import_stmt = f"from .{shared_module_stem} import *" # הסר את השורות המשותפות מכל מודול והוסף import משותף אחרי הדוקסטרינג for fn in module_files: content = new_files.get(fn, '') if not content: continue lines = content.splitlines() # מצא את סוף הדוקסטרינג למיקום ההוספה insert_idx = 0 quote_count = 0 for i, line in enumerate(lines): if line.strip().startswith('"""'): quote_count += 1 if quote_count == 2: insert_idx = i + 2 # אחרי הדוקסטרינג והקו הריק שאחריו break # הסרת imports משותפים filtered_lines: List[str] = [] for line in lines: if line.strip() in common_imports: continue filtered_lines.append(line) # הזרקת import משותף אם לא קיים כבר already_has_shared = any( ln.strip().startswith(f"from .{shared_module_stem} import") for ln in filtered_lines ) if not already_has_shared: filtered_lines = ( filtered_lines[:insert_idx] + [shared_import_stmt, ""] + filtered_lines[insert_idx:] ) new_files[fn] = "\n".join(filtered_lines) + "\n" return new_files # === תמיכה במחלקות: יצירת קובץ מחלקות והזרקת יבוא למחלקות בשימוש === def _build_classes_file(self, base_name: str) -> Tuple[str, str]: """ יוצר קובץ מרכזי לכל המחלקות שזוהו בקובץ המקורי. מחזיר (שם הקובץ, תוכן). """ classes_stem = f"{base_name}_classes" filename = f"{classes_stem}.py" if not self.analyzer or not self.analyzer.classes: return filename, '"""\nמחלקות (אין)\n"""\n\n' # בעזרת מסנן imports, נשמור רק ייבוא שנדרש למחלקות classes_code_body = "\n\n".join(cls.code for cls in self.analyzer.classes) filtered_imports = self._filter_imports_for_code(self.analyzer.imports, classes_code_body) parts: List[str] = [] parts.append('"""') parts.append("מחלקות שהופקו מהמונולית") parts.append('"""') parts.append("") parts.extend(filtered_imports) parts.append("") for cls in self.analyzer.classes: parts.append(cls.code) parts.append("") return filename, "\n".join(parts) + "\n" def _inject_class_imports(self, new_files: Dict[str, str], classes_filename: str) -> Dict[str, str]: """ עבור כל מודול פונקציות, מזהה אילו מחלקות בשימוש ומזריק שורת import מתאימה. """ classes_stem = Path(classes_filename).stem class_names = {cls.name for cls in (self.analyzer.classes if self.analyzer else [])} out: Dict[str, str] = {} for fn, content in new_files.items(): if os.path.basename(fn) == "__init__.py" or fn == classes_filename or fn.endswith("_shared.py"): out[fn] = content continue used = self._extract_used_names(content) needed = sorted([name for name in class_names if name in used]) if not needed: out[fn] = content continue lines = content.splitlines() # מצא את סוף הדוקסטרינג insert_idx = 0 quote_count = 0 for i, line in enumerate(lines): if line.strip().startswith('"""'): quote_count += 1 if quote_count == 2: insert_idx = i + 2 # אחרי הדוקסטרינג ושורה ריקה break import_line = f"from .{classes_stem} import {', '.join(needed)}" # הזרק אם לא קיים already = any(ln.strip().startswith(f"from .{classes_stem} import") for ln in lines) if not already: lines = lines[:insert_idx] + [import_line, ""] + lines[insert_idx:] out[fn] = "\n".join(lines) + "\n" return out def _extract_defined_functions_in_code(self, code: str) -> Set[str]: """שמות פונקציות טופ-לבל המוגדרות בקוד נתון. תיקון: אל תכלול מתודות של מחלקות כדי למנוע דיכוי import נדרש. """ defined: Set[str] = set() try: tree = ast.parse(code) except Exception: return defined # פונקציות טופ-לבל הן כאלה שמופיעות ישירות ב-tree.body for node in getattr(tree, "body", []): if isinstance(node, ast.FunctionDef): defined.add(node.name) return defined def _inject_function_imports(self, new_files: Dict[str, str], func_to_module: Dict[str, str]) -> Dict[str, str]: """ מזריק import יחסי לפונקציות שמוגדרות במודולים אחרים אך נמצאות בשימוש. """ out: Dict[str, str] = {} for fn, content in new_files.items(): if os.path.basename(fn) == "__init__.py" or fn.endswith("_shared.py"): out[fn] = content continue current_stem = Path(fn).stem used = self._extract_used_names(content) defined_here = self._extract_defined_functions_in_code(content) # פונקציות נדרשות: נמצאות בשימוש, ידוע היכן מוגדרות, לא מוגדרות כאן, ומוגדרות במודול אחר needed = [name for name in used if name in func_to_module and name not in defined_here and func_to_module[name] != current_stem] if not needed: out[fn] = content continue # קיבוץ לפי מודול יעד per_module: Dict[str, List[str]] = {} for name in needed: per_module.setdefault(func_to_module[name], []).append(name) lines = content.splitlines() # מצא את סוף הדוקסטרינג insert_idx = 0 quote_count = 0 for i, line in enumerate(lines): if line.strip().startswith('"""'): quote_count += 1 if quote_count == 2: insert_idx = i + 2 break # בנה שורות import ללא כפילויות קיימות new_imports: List[str] = [] for module_stem, names in per_module.items(): names_sorted = sorted(set(names)) imp = f"from .{module_stem} import {', '.join(names_sorted)}" exists = any(ln.strip().startswith(f"from .{module_stem} import") for ln in lines) if not exists: new_imports.append(imp) if new_imports: lines = lines[:insert_idx] + new_imports + [""] + lines[insert_idx:] out[fn] = "\n".join(lines) + "\n" return out def _extract_defined_globals_in_code(self, code: str) -> Set[str]: """שמות משתנים גלובליים (Assign/AnnAssign) המוגדרים בקוד נתון ברמת מודול.""" defined: Set[str] = set() try: tree = ast.parse(code) except Exception: return defined for node in getattr(tree, "body", []): if isinstance(node, ast.Assign): for target in node.targets: for name in ast.walk(target): if isinstance(name, ast.Name): defined.add(name.id) elif isinstance(node, ast.AnnAssign): tgt = getattr(node, "target", None) if isinstance(tgt, ast.Name): defined.add(tgt.id) return defined def _inject_global_imports(self, new_files: Dict[str, str], global_names: Set[str], source_module_stem: str) -> Dict[str, str]: """ מזריק import למשתנים גלובליים שנשמרו במודול מקור (למשל core) אל מודולים שצורכים אותם. """ out: Dict[str, str] = {} for fn, content in new_files.items(): stem = Path(fn).stem if not fn.endswith(".py") or os.path.basename(fn) == "__init__.py" or fn.endswith("_shared.py") or stem == source_module_stem: out[fn] = content continue used = self._extract_used_names(content) defined_here = self._extract_defined_globals_in_code(content) needed = sorted([name for name in global_names if name in used and name not in defined_here]) if not needed: out[fn] = content continue lines = content.splitlines() # מצא את סוף הדוקסטרינג insert_idx = 0 quote_count = 0 for i, line in enumerate(lines): if line.strip().startswith('"""'): quote_count += 1 if quote_count == 2: insert_idx = i + 2 break import_line = f"from .{source_module_stem} import {', '.join(needed)}" # הזרקה אם לא קיים כבר already = any(ln.strip().startswith(f"from .{source_module_stem} import") for ln in lines) if not already: lines = lines[:insert_idx] + [import_line, ""] + lines[insert_idx:] out[fn] = "\n".join(lines) + "\n" return out
[תיעוד] def post_refactor_cleanup(self, files: Dict[str, str]) -> Dict[str, str]: """ שלב ניקוי לאחר רפקטורינג: נקיון imports לא בשימוש ברמת קובץ תוך שימור קבועים/הקצאות גלובליות. הערה: נמנעים מהרצת כלים חיצוניים (ruff/black) מסיבות תאימות סביבה. """ cleaned: Dict[str, str] = {} for filename, content in files.items(): if ( not filename.endswith('.py') or os.path.basename(filename) == '__init__.py' or filename.endswith('_shared.py') or filename.endswith('_shared_core.py') ): cleaned[filename] = content continue try: module_import_lines = self._module_level_import_lines(content) # נזהה imports בקובץ ונשמור רק אלו שבשימוש import_lines: List[str] = [] body_lines: List[str] = [] for idx, ln in enumerate(content.splitlines(), start=1): if idx in module_import_lines: import_lines.append(ln.strip()) else: body_lines.append(ln) code_body = "\n".join(body_lines) filtered = self._filter_imports_for_code(import_lines, code_body) lines = content.splitlines() # איתור הדוקסטרינג העליון כדי שלא נזיז קוד לפניו docstring_end_idx = -1 quote_count = 0 for idx, line in enumerate(lines): if line.strip().startswith('"""'): quote_count += 1 if quote_count == 2: docstring_end_idx = idx break if docstring_end_idx == -1: cleaned[filename] = content continue # חיפוש בלוק ה-imports המופיע מיד אחרי הדוקסטרינג (עם רווחים/הערות ביניים) def _is_import_stmt(text: str) -> bool: return text.startswith('import ') or text.startswith('from ') import_start_idx: Optional[int] = None scan_idx = docstring_end_idx + 1 while scan_idx < len(lines): stripped = lines[scan_idx].strip() if not stripped or stripped.startswith('#'): scan_idx += 1 continue if _is_import_stmt(stripped): import_start_idx = scan_idx break if import_start_idx is None: # אין בלוק imports לאחר הדוקסטרינג – אין מה לנקות cleaned[filename] = content continue import_end_idx = import_start_idx while import_end_idx < len(lines): stripped = lines[import_end_idx].strip() if not stripped or stripped.startswith('#') or _is_import_stmt(stripped): import_end_idx += 1 continue break before_block = lines[:import_start_idx] after_block = lines[import_end_idx:] rebuilt: List[str] = list(before_block) def _ensure_trailing_blank(target: List[str]) -> None: if target and target[-1].strip() != "": target.append("") if filtered: _ensure_trailing_blank(rebuilt) rebuilt.extend(filtered) if after_block and after_block[0].strip() != "": rebuilt.append("") else: # אם אין imports לאחר הסינון, נשמור רווח אחד בין הדוקסטרינג לבין הגוף אם נדרש if after_block and after_block[0].strip() != "": _ensure_trailing_blank(rebuilt) rebuilt.extend(after_block) cleaned_content = "\n".join(rebuilt).rstrip() + "\n" cleaned[filename] = cleaned_content except Exception: cleaned[filename] = content return cleaned
# === Collocation: הקצאת מחלקות לקבוצות פונקציות ובניית מיפוי === def _extract_defined_classes_in_code(self, code: str) -> Set[str]: defined: Set[str] = set() try: tree = ast.parse(code) except Exception: return defined for node in ast.walk(tree): if isinstance(node, ast.ClassDef): defined.add(node.name) return defined def _assign_classes_to_groups( self, groups: Dict[str, List[FunctionInfo]] ) -> Dict[str, List[ClassInfo]]: """ מקצה כל Class לקבוצת דומיין אחת לפי: - שימוש בפועל ע״י פונקציות הקבוצה (affinity) - התאמת Section (אם קיימת) לשם הקבוצה - דמיון שמות כאשר אין שימוש ברור """ if not self.analyzer: return {} class_by_name: Dict[str, ClassInfo] = {c.name: c for c in self.analyzer.classes} groups_classes: Dict[str, List[ClassInfo]] = {g: [] for g in groups.keys()} if not class_by_name: return groups_classes # העדפה קשיחה: אם למחלקה יש Section התואם לקבוצה קיימת – אל תסיט אותה משם for cname, cls in list(class_by_name.items()): section = (cls.section or "").strip() if section and section in groups: groups_classes[section].append(cls) del class_by_name[cname] # הכנה: שימושי קלאסים לפי פונקציות def used_names_in_function(func: FunctionInfo) -> Set[str]: return self._extract_used_names(func.code) # צבירת ניקוד שימוש לכל Class מול כל קבוצה score: Dict[Tuple[str, str], float] = {} # (class_name, group_name) -> score for gname, funcs in groups.items(): used_names: Set[str] = set() for f in funcs: used_names |= used_names_in_function(f) for cname in class_by_name.keys(): s = 0.0 if cname in used_names: # נחשב שימוש ישיר במחלקה s += 3.0 # בונוס אם ה-Section של המחלקה תואם את שם הקבוצה cls_section = class_by_name[cname].section or "" if cls_section and cls_section == gname: s += 2.0 # דמיון לשם: כאשר אין שימוש או section if s == 0.0: s += self._name_similarity(cname, gname) score[(cname, gname)] = s # התאמות לפי שימוש המחלקה בפונקציות (מתוך מתודות) # אם מתודות המחלקה קוראות לפונקציות בקבוצה ספציפית – נזיז לשם func_name_to_group: Dict[str, str] = {} for gname, funcs in groups.items(): for f in funcs: func_name_to_group[f.name] = gname for cname, cls in class_by_name.items(): group_bonus: Dict[str, float] = {} for m in cls.methods: for called in m.calls: g = func_name_to_group.get(called) if g: group_bonus[g] = group_bonus.get(g, 0.0) + 1.0 if group_bonus: # בונוס משמעותי לקבוצה עם מירב הקריאות best_g, best_v = max(group_bonus.items(), key=lambda kv: kv[1]) score[(cname, best_g)] = score.get((cname, best_g), 0.0) + 3.0 # הקצאה: בחר את הקבוצה עם הניקוד הגבוה ביותר לכל Class assigned_group_for_class: Dict[str, str] = {} for cname in class_by_name.keys(): best_g = None best_s = float("-inf") for gname in groups.keys(): s = score.get((cname, gname), 0.0) if s > best_s: best_s = s best_g = gname if best_g is None: # גיבוי: הצמד לקבוצה הראשונה best_g = next(iter(groups.keys())) assigned_group_for_class[cname] = best_g groups_classes[best_g].append(class_by_name[cname]) # כלל Coupling: הצמדת מנהל+ישות לאותו קובץ כאשר יש שימוש תכוף ב-Type Hint/שמות # נזהה עבור כל מחלקה אילו מחלקות אחרות מוזכרות במתודותיה, ונאחד לקבוצה משותפת כאשר היחס גבוה. # החמרה: עבור דומיינים שונים (למשל inventory מול core/billing) הצמדה תתבצע רק במקרה של אזכור הדדי חזק. try: all_class_names: Set[str] = set(c.name for c in (self.analyzer.classes or [])) # מיפוי דומיין עבור כל מחלקה domain_by_class: Dict[str, str] = {} for cls in (self.analyzer.classes or []): try: domain_by_class[cls.name] = self._classify_class_domain(cls) except Exception: domain_by_class[cls.name] = "core" # מיפוי: cname -> counter של מחלקות אחרות שהוזכרו mentions: Dict[str, Dict[str, int]] = {} method_counts: Dict[str, int] = {} for cls in (self.analyzer.classes or []): method_counts[cls.name] = len(cls.methods or []) for m in (cls.methods or []): used = self._extract_used_names(m.code) for other in (used & all_class_names): if other == cls.name: continue mentions.setdefault(cls.name, {}).setdefault(other, 0) mentions[cls.name][other] += 1 # סף: מחלקה A מזכירה את B בלפחות מחצית מהמתודות שלה (או 2 מתודות מינימום) for a, counters in mentions.items(): total_methods = max(1, method_counts.get(a, 0)) # בחר את B עם ההזכרות הגבוהות ביותר b, cnt = None, 0 for other, c in counters.items(): if c > cnt: b, cnt = other, c if not b: continue strong_coupling = (cnt >= max(2, (total_methods + 1) // 2)) if not strong_coupling: continue ga = assigned_group_for_class.get(a) gb = assigned_group_for_class.get(b) if ga and gb and ga != gb: # בדיקת דומיינים – מניעת הצמדה חוצת-דומיין אגרסיבית. da = domain_by_class.get(a, "core") db = domain_by_class.get(b, "core") a_to_b = cnt b_to_a = mentions.get(b, {}).get(a, 0) # דרישת הדדיות עבור דומיינים שונים: שני הכיוונים צריכים להיות "חזקים" # (לפחות מחצי מהמתודות או מינימום 2, לכל אחד מהצדדים). if da != db: thr_a = max(2, (method_counts.get(a, 0) + 1) // 2) thr_b = max(2, (method_counts.get(b, 0) + 1) // 2) # הקשחה נוספת: אם אחד הדומיינים הוא 'inventory', אל תבצע הצמדה אלא אם שני הכיוונים חזקים. involves_inventory = ("inventory" in (da, db)) if involves_inventory and not (a_to_b >= thr_a and b_to_a >= thr_b): continue # בהיעדר הדדיות מספקת – אל תבצע הצמדה חוצת-דומיין if not (a_to_b >= thr_a and b_to_a >= thr_b): continue # הזז את המחלקה הפחות "מוכרת" אל קבוצת המחלקה המרכזית # הקריטריון: אם b מזכיר את a פחות מ-a שמזכיר את b – נעדיף את קבוצת a dest = ga if a_to_b >= b_to_a else gb src = gb if dest == ga else ga # עדכן mapping ורשימות assigned_group_for_class[b if dest == ga else a] = dest # הסר מקבוצת המקור והוסף ליעד move_name = b if dest == ga else a # הסרה בטוחה for gname, arr in groups_classes.items(): groups_classes[gname] = [c for c in arr if c.name != move_name] # הוספה cls_obj = next((c for c in (self.analyzer.classes or []) if c.name == move_name), None) if cls_obj: groups_classes[dest].append(cls_obj) except Exception: # לא נכשיל את התהליך במקרה של חריגה – הכלל הוא שיפור-היוריסטי pass return groups_classes def _inject_cross_module_class_imports( self, new_files: Dict[str, str], class_to_module: Dict[str, str], ) -> Dict[str, str]: """ מזריק import יחסי למחלקות שמוגדרות במודולים אחרים אך נמצאות בשימוש. """ out: Dict[str, str] = {} class_names: Set[str] = set(class_to_module.keys()) for fn, content in new_files.items(): if os.path.basename(fn) == "__init__.py" or fn.endswith("_shared.py"): out[fn] = content continue current_stem = Path(fn).stem used = self._extract_used_names(content) defined_here = self._extract_defined_classes_in_code(content) needed = [name for name in used if name in class_names and name not in defined_here and class_to_module.get(name) != current_stem] if not needed: out[fn] = content continue per_module: Dict[str, List[str]] = {} for name in needed: mod = class_to_module.get(name) if not mod: continue per_module.setdefault(mod, []).append(name) lines = content.splitlines() # מצא את סוף הדוקסטרינג insert_idx = 0 quote_count = 0 for i, line in enumerate(lines): if line.strip().startswith('"""'): quote_count += 1 if quote_count == 2: insert_idx = i + 2 break new_imports: List[str] = [] for module_stem, names in per_module.items(): names_sorted = sorted(set(names)) imp = f"from .{module_stem} import {', '.join(names_sorted)}" exists = any(ln.strip().startswith(f"from .{module_stem} import") for ln in lines) if not exists: new_imports.append(imp) if new_imports: lines = lines[:insert_idx] + new_imports + [""] + lines[insert_idx:] out[fn] = "\n".join(lines) + "\n" return out # === DRY-RUN: זיהוי ומניעת תלות מעגלית באמצעות מיזוג מודולים צמודים === def _resolve_circular_imports( self, files: Dict[str, str], base_name: str ) -> Tuple[Dict[str, str], List[Tuple[str, str]], List[List[str]], List[List[str]]]: """ מזהה רכיבים חזקים (SCC) בגרף הייבוא בין המודולים שנוצרו. עבור כל מעגל (SCC בגודל > 1) – ממזג מודולים בזוגות קרובים כדי לפרק את המעגל, בהתאם לכלל ה-Coupling (הצמדה). מחזיר (files_after, merged_pairs) """ def _module_stem(fn: str) -> Optional[str]: if not fn.endswith(".py"): return None if os.path.basename(fn) == "__init__.py": return None if fn.endswith("_shared.py") or fn.endswith("_shared_core.py"): return None return Path(fn).stem def _build_graph(files_map: Dict[str, str]) -> Tuple[Dict[str, Set[str]], Dict[str, str]]: stems: Dict[str, str] = {} for fn in files_map.keys(): st = _module_stem(fn) if st: stems[st] = fn graph: Dict[str, Set[str]] = {st: set() for st in stems.keys()} import_re = re.compile(r'^\s*from\s+\.(\w+)\s+import\s+') for st, fn in stems.items(): content = files_map.get(fn, "") for line in content.splitlines(): m = import_re.match(line) if not m: continue target = m.group(1) if target in graph and target != st: graph[st].add(target) return graph, stems def _tarjan_scc(graph: Dict[str, Set[str]]) -> List[List[str]]: index = 0 indices: Dict[str, int] = {} lowlink: Dict[str, int] = {} stack: List[str] = [] onstack: Set[str] = set() sccs: List[List[str]] = [] def strongconnect(v: str) -> None: nonlocal index indices[v] = index lowlink[v] = index index += 1 stack.append(v) onstack.add(v) for w in graph.get(v, ()): if w not in indices: strongconnect(w) lowlink[v] = min(lowlink[v], lowlink[w]) elif w in onstack: lowlink[v] = min(lowlink[v], indices[w]) if lowlink[v] == indices[v]: comp: List[str] = [] while True: w = stack.pop() onstack.discard(w) comp.append(w) if w == v: break sccs.append(comp) for v in graph.keys(): if v not in indices: strongconnect(v) return sccs def _merge_two(files_map: Dict[str, str], stems_map: Dict[str, str], a: str, b: str) -> Tuple[Dict[str, str], Tuple[str, str]]: """ ממזג את המודול b לתוך a. מעדכן ייבואים בקבצים אחרים. """ a_file = stems_map[a] b_file = stems_map[b] a_content = files_map.get(a_file, "") b_content = files_map.get(b_file, "") # הסר דוקסטרינג עליון מ-b כדי למנוע כפילות מרחיבה def strip_top_docstring(text: str) -> str: lines = text.splitlines() out: List[str] = [] quote_count = 0 i = 0 while i < len(lines): line = lines[i] if quote_count < 2 and line.strip().startswith('"""'): quote_count += 1 i += 1 continue if quote_count < 2: i += 1 continue break # דלג גם על שורה ריקה שאחרי הדוקסטרינג if i < len(lines) and lines[i].strip() == "": i += 1 out = lines[i:] return "\n".join(out) merged = a_content.rstrip() + f"\n\n# ---- merged from {b_file} ----\n" + strip_top_docstring(b_content).lstrip() + "\n" files_map[a_file] = merged # מחק את קובץ b del files_map[b_file] # עדכן ייבואים בכל שאר הקבצים: from .b import -> from .a import b_pat = re.compile(rf'^(\s*from\s+)\.{re.escape(b)}(\s+import\s+)', re.M) for fn, content in list(files_map.items()): files_map[fn] = re.sub(b_pat, r'\1.' + a + r'\2', content) # הסר self-imports בקובץ המאוחד (from .a import ...) כדי למנוע טעינת מודול חלקית self_import_pat = re.compile(rf'^\s*from\s+\.{re.escape(a)}\s+import\s+.*$', re.M) files_map[a_file] = re.sub(self_import_pat, '', files_map[a_file]) # נקה רווחים כפולים שנוצרו מהסרה files_map[a_file] = re.sub(r'\n{3,}', '\n\n', files_map[a_file]).lstrip() + ("\n" if not files_map[a_file].endswith("\n") else "") return files_map, (stems_map[a], b_file) merged_pairs: List[Tuple[str, str]] = [] shared_bridges: List[List[str]] = [] skipped_canonical_cycles: List[List[str]] = [] iterations = 0 canonical_domains = set(self.CANONICAL_DOMAIN_PRIORITY.keys()) def _is_canonical(stem: str) -> bool: return stem in canonical_domains while iterations < 5: # הגבלת ניסיונות כדי למנוע לולאה graph, stems_map = _build_graph(files) sccs = _tarjan_scc(graph) cycles = [comp for comp in sccs if len(comp) > 1] if not cycles: break changed = False for comp in cycles: component_has_non_canonical = any(not _is_canonical(node) for node in comp) pair_to_merge: Optional[Tuple[str, str]] = None mutual: List[Tuple[str, str]] = [] for i in range(len(comp)): for j in range(i + 1, len(comp)): u, v = comp[i], comp[j] if v in graph.get(u, set()) and u in graph.get(v, set()): mutual.append((u, v)) if mutual: pair_to_merge = tuple( sorted(mutual, key=lambda p: (min(p[0], p[1]), max(p[0], p[1])))[0] ) # type: ignore[assignment] else: candidates: List[Tuple[str, str]] = [] for i in range(len(comp)): for j in range(i + 1, len(comp)): u, v = comp[i], comp[j] if not (_is_canonical(u) and _is_canonical(v)): candidates.append((u, v)) if candidates: def _score(pair: Tuple[str, str]) -> Tuple[int, str, str]: u, v = pair non_canon_count = int(not _is_canonical(u)) + int(not _is_canonical(v)) return (-non_canon_count, min(u, v), max(u, v)) pair_to_merge = sorted(candidates, key=_score)[0] else: pair_to_merge = (comp[0], comp[1]) a, b = pair_to_merge # type: ignore[misc] if not component_has_non_canonical and _is_canonical(a) and _is_canonical(b): files, bridged = self._introduce_shared_layer_for_cycle(files, stems_map, comp, base_name) if bridged: shared_bridges.append(sorted(comp)) changed = True break skipped_canonical_cycles.append(sorted(comp)) continue def _priority(stem: str) -> int: return self.CANONICAL_DOMAIN_PRIORITY.get(stem, 9) prioritized = False if _is_canonical(a) and not _is_canonical(b): prioritized = True elif _is_canonical(b) and not _is_canonical(a): a, b = b, a prioritized = True if not prioritized and _priority(a) > _priority(b): a, b = b, a prioritized = True if not prioritized: if (b < a) or (len(b) < len(a) and b not in graph.get(a, set())): a, b = b, a files, merged = _merge_two(files, stems_map, a, b) merged_pairs.append((merged[0], merged[1])) changed = True if not changed: break iterations += 1 return files, merged_pairs, shared_bridges, skipped_canonical_cycles def _introduce_shared_layer_for_cycle( self, files_map: Dict[str, str], stems_map: Dict[str, str], cycle: List[str], base_name: str, ) -> Tuple[Dict[str, str], bool]: cycle_set: Set[str] = set(cycle) reference_file = stems_map[cycle[0]] shared_filename, shared_stem = self._shared_core_filename(base_name, reference_file) wrappers_needed: Dict[str, Tuple[str, str]] = {} updated_files = dict(files_map) modified = False for module in cycle: fn = stems_map.get(module) if not fn: continue content = files_map[fn] new_content, wrappers = self._rewrite_imports_with_shared_layer(content, cycle_set, shared_stem) if wrappers: for alias, target in wrappers.items(): wrappers_needed[alias] = target updated_files[fn] = new_content modified = True if not modified or not wrappers_needed: return files_map, False updated_files = self._ensure_shared_core_file(updated_files, shared_filename, wrappers_needed) return updated_files, True def _shared_core_filename(self, base_name: str, reference_file: str) -> Tuple[str, str]: shared_name = f"{base_name}_shared_core.py" ref_path = Path(reference_file) parent = ref_path.parent if str(parent) == ".": filename = shared_name else: filename = str((parent / shared_name).as_posix()) if filename.startswith("./"): filename = filename[2:] return filename, Path(shared_name).stem def _rewrite_imports_with_shared_layer( self, content: str, cycle_set: Set[str], shared_stem: str ) -> Tuple[str, Dict[str, Tuple[str, str]]]: try: tree = ast.parse(content) except SyntaxError: return content, {} targets: List[ast.ImportFrom] = [ node for node in ast.walk(tree) if isinstance(node, ast.ImportFrom) and node.level == 1 and node.module in cycle_set ] if not targets: return content, {} lines = content.splitlines() wrappers_needed: Dict[str, Tuple[str, str]] = {} inserted_aliases: Set[Tuple[str, str]] = set() for node in sorted(targets, key=lambda n: (n.lineno, n.col_offset), reverse=True): module = node.module or "" indent_line = lines[node.lineno - 1] if node.lineno - 1 < len(lines) else "" indent = indent_line[: len(indent_line) - len(indent_line.lstrip())] replacement_lines: List[str] = [] for alias in node.names: original = alias.name alias_name = alias.asname or alias.name key = (module, alias_name) if key in inserted_aliases: continue wrapper_name = self._build_shared_alias_name(module, original) replacement_lines.append(f"{indent}from .{shared_stem} import {wrapper_name} as {alias_name}") inserted_aliases.add(key) wrappers_needed.setdefault(wrapper_name, (module, original)) lines[node.lineno - 1 : node.end_lineno] = replacement_lines new_content = "\n".join(lines) if not new_content.endswith("\n"): new_content += "\n" return new_content, wrappers_needed def _build_shared_alias_name(self, module: str, symbol: str) -> str: safe_module = re.sub(r"[^0-9a-zA-Z_]", "_", module) safe_symbol = re.sub(r"[^0-9a-zA-Z_]", "_", symbol) return f"{safe_module}__{safe_symbol}" def _ensure_shared_core_file( self, files_map: Dict[str, str], shared_filename: str, wrappers_needed: Dict[str, Tuple[str, str]], ) -> Dict[str, str]: registry = self._shared_symbol_registry.setdefault(shared_filename, {}) registry.update(wrappers_needed) lines: List[str] = [ '"""', "שכבת תיווך לדומיינים קנוניים כדי להימנע ממעגלי import", '"""', "", "from __future__ import annotations", "import importlib", "", "_LAZY_SYMBOLS = {", ] for alias, (module, symbol) in sorted(registry.items()): lines.append(f" '{alias}': ('{module}', '{symbol}'),") lines.append("}") lines.append("") lines.append("__all__ = list(_LAZY_SYMBOLS.keys())") lines.append("") lines.append("def __getattr__(name: str):") lines.append(" try:") lines.append(" module, symbol = _LAZY_SYMBOLS[name]") lines.append(" except KeyError as exc:") lines.append(" raise AttributeError(name) from exc") lines.append(" package = __package__") lines.append(" if package:") lines.append(" target_module = importlib.import_module(f'.{module}', package)") lines.append(" else:") lines.append(" target_module = importlib.import_module(module)") lines.append(" value = getattr(target_module, symbol)") lines.append(" globals()[name] = value") lines.append(" return value") lines.append("") files_map[shared_filename] = "\n".join(lines) return files_map def _module_level_import_lines(self, content: str) -> Set[int]: try: tree = ast.parse(content) except SyntaxError: return set() module_lines: Set[int] = set() def _collect(node: ast.AST, allow_nested_type_checking: bool = False) -> None: if isinstance(node, (ast.Import, ast.ImportFrom)): start = getattr(node, "lineno", None) end = getattr(node, "end_lineno", start) if start is None: return for ln in range(start, (end or start) + 1): module_lines.add(ln) elif isinstance(node, ast.If) and self._is_type_checking_guard(node.test): for child in node.body: _collect(child, allow_nested_type_checking=True) elif allow_nested_type_checking and isinstance(node, (ast.If, ast.With, ast.Try)): for child in getattr(node, "body", []): _collect(child, allow_nested_type_checking=True) for top in getattr(tree, "body", []): _collect(top, allow_nested_type_checking=False) return module_lines def _is_type_checking_guard(self, expr: ast.AST) -> bool: if isinstance(expr, ast.Name): return expr.id == "TYPE_CHECKING" if isinstance(expr, ast.Attribute): return expr.attr == "TYPE_CHECKING" if isinstance(expr, ast.Compare) and len(expr.ops) == 1 and isinstance(expr.ops[0], ast.Is): # Handles patterns like "if typing.TYPE_CHECKING is True:" left = expr.left return isinstance(left, (ast.Name, ast.Attribute)) and self._is_type_checking_guard(left) return False # === Naming helpers === def _choose_filename_for_group(self, base_name: str, group_name: str) -> str: """ קובע שם קובץ עבור קבוצה. כאשר מדובר בדומיין מוכר – משתמש בשם דומייני יציב (users.py, finance.py, inventory.py, network.py, workflows.py). אחרת – נשמר את שם הבסיס + הקבוצה (base_group.py) לשמירת תאימות. """ canonical_map = { "users": "users.py", "finance": "finance.py", "inventory": "inventory.py", "api_clients": "network.py", "workflows": "workflows.py", "analytics": f"{base_name}_analytics.py", # נשמר ממוקד-בסיס כדי לא לשבור ציפיות קיימות בטסטים "utils": f"{base_name}_utils.py", "files": f"{base_name}_files.py", "permissions": f"{base_name}_permissions.py", "main": f"{base_name}_main.py", "debug": f"{base_name}_debug.py", "compute": f"{base_name}_compute.py", "helpers": f"{base_name}_helpers.py", "io": f"{base_name}_io.py", } if group_name in canonical_map: return canonical_map[group_name] # ברירת מחדל: base_name_group.py safe_group = re.sub(r"[^a-z0-9_]", "_", group_name.lower()) return f"{base_name}_{safe_group}.py"
# Instance גלובלי refactoring_engine = RefactoringEngine()