הראה קוד מקור ל repo_analyzer

import os
import json
import logging
from typing import Dict, List, Any, Optional
try:
    from github import Github, GithubException
except ModuleNotFoundError:  # pragma: no cover - optional dependency for tests
    class GithubException(Exception):  # type: ignore
        """Placeholder when PyGithub is unavailable."""

    class Github:  # type: ignore
        def __init__(self, *args, **kwargs):
            raise RuntimeError(
                "PyGithub is required for repo analysis. "
                "Install via 'pip install PyGithub' to enable this feature."
            )
import base64
import re
from datetime import datetime, timedelta
from urllib.parse import urlparse

# Structured logging + metrics (safe fallbacks for tests)
try:
    from observability import emit_event
except Exception:  # pragma: no cover
[תיעוד] def emit_event(event: str, severity: str = "info", **fields): return None
try: from metrics import track_performance except Exception: # pragma: no cover from contextlib import contextmanager @contextmanager def track_performance(operation: str, labels: Optional[Dict[str, str]] = None): yield logger = logging.getLogger(__name__)
[תיעוד] class RepoAnalyzer: """מנתח ריפוזיטורי GitHub ומציע שיפורים""" # הגדרות וקבועים MAX_FILE_SIZE = 100 * 1024 # 100KB MAX_FILES = 50 LARGE_FILE_LINES = 500 LONG_FUNCTION_LINES = 50 # סוגי קבצים לניתוח CODE_EXTENSIONS = { '.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.c', '.cs', '.go', '.rs', '.php', '.rb', '.swift', '.kt', '.scala', '.r', '.m' } CONFIG_FILES = { 'requirements.txt', 'package.json', 'pyproject.toml', 'Pipfile', 'Cargo.toml', 'go.mod', 'pom.xml', 'build.gradle', 'composer.json', 'Gemfile', 'Package.swift' } IMPORTANT_FILES = { 'README.md', 'README.rst', 'README.txt', 'README', 'LICENSE', 'LICENSE.md', 'LICENSE.txt', '.gitignore', '.dockerignore', 'Dockerfile', 'docker-compose.yml', 'docker-compose.yaml', '.github/workflows', '.gitlab-ci.yml', 'azure-pipelines.yml' }
[תיעוד] def __init__(self, github_token: Optional[str] = None): """אתחול המנתח""" self.github_token = github_token self.github_client = Github(github_token) if github_token else None
[תיעוד] def parse_github_url(self, url: str) -> tuple[str, str]: """מחלץ owner ו-repo מ-URL של GitHub""" try: parsed = urlparse(url) path_parts = parsed.path.strip('/').split('/') if len(path_parts) >= 2: owner = path_parts[0] repo = path_parts[1].replace('.git', '') return owner, repo else: raise ValueError("Invalid GitHub URL format") except Exception as e: # שמירה כאירוע מובנה לניתוח על-ידי AI/דשבורד try: emit_event("repo_parse_url_error", severity="warn", error=str(e)) except Exception: pass logger.error(f"Error parsing GitHub URL: {e}") raise ValueError(f"לא הצלחתי לנתח את ה-URL: {url}") from e
[תיעוד] async def fetch_and_analyze_repo(self, repo_url: str) -> Dict[str, Any]: """שולף ומנתח ריפוזיטורי מ-GitHub""" try: emit_event("repo_analysis_start", severity="info", repo_url=str(repo_url)) except Exception: pass try: owner, repo_name = self.parse_github_url(repo_url) try: emit_event("repo_analysis_parsed", severity="info", owner=str(owner), repo=str(repo_name)) except Exception: pass if not self.github_client: raise ValueError("נדרש GitHub token לניתוח ריפוזיטורי") # קבל את הריפו repo_full = f"{owner}/{repo_name}" repo = self.github_client.get_repo(repo_full) analysis = { 'repo_name': repo.name, 'repo_url': repo.html_url, 'description': repo.description, 'stars': repo.stargazers_count, 'forks': repo.forks_count, 'language': repo.language, 'created_at': repo.created_at.isoformat() if repo.created_at else None, 'updated_at': repo.updated_at.isoformat() if repo.updated_at else None, 'has_readme': False, 'has_license': False, 'has_gitignore': False, 'has_ci_cd': False, 'file_count': 0, 'files_by_type': {}, 'dependencies': [], 'large_files': [], 'long_functions': [], 'directory_structure': {}, 'test_coverage': False, 'documentation_quality': 'none', 'issues_count': repo.open_issues_count, 'pull_requests_count': 0 } # מדידת ביצועים כוללת לניתוח הריפו with track_performance("github_repo_analyze", labels={"repo": repo_full}): # בדוק אם יש LICENSE try: license_info = repo.get_license() if license_info: analysis['has_license'] = True analysis['license_type'] = license_info.license.name except Exception: pass # סרוק קבצים בריפו (עומק 1) contents = repo.get_contents("") files_analyzed = 0 while contents and files_analyzed < self.MAX_FILES: file_content = contents.pop(0) if file_content.type == "dir": # הוסף תיקייה למבנה analysis['directory_structure'][file_content.path] = { 'type': 'directory', 'name': file_content.name } # בדוק תיקיות מיוחדות if file_content.name in ['tests', 'test', 'spec', '__tests__']: analysis['test_coverage'] = True elif file_content.name == '.github': # בדוק אם יש workflows try: workflows = repo.get_contents(".github/workflows") if workflows: analysis['has_ci_cd'] = True except GithubException as e: logger.debug(f"Could not access workflows dir: {e}") # הוסף תוכן התיקייה לסריקה (רק עומק 1) if files_analyzed < self.MAX_FILES - 10: # השאר מקום לקבצים חשובים try: sub_contents = repo.get_contents(file_content.path) contents.extend(sub_contents[:10]) # הגבל תת-תיקיות except GithubException as e: logger.debug(f"Could not list subdir {file_content.path}: {e}") elif file_content.type == "file": files_analyzed += 1 file_name = file_content.name file_path = file_content.path # בדוק קבצים חשובים if file_name.upper() in ['README.MD', 'README.RST', 'README.TXT', 'README']: analysis['has_readme'] = True # נסה לקרוא את ה-README לניתוח איכות try: if file_content.size < 50000: # מקסימום 50KB readme_content = base64.b64decode(file_content.content).decode('utf-8') analysis['readme_length'] = len(readme_content) # בדוק איכות תיעוד בסיסית if len(readme_content) > 500: if any(section in readme_content.lower() for section in ['installation', 'usage', 'example', 'התקנה', 'שימוש', 'דוגמה']): analysis['documentation_quality'] = 'good' else: analysis['documentation_quality'] = 'basic' except (ValueError, UnicodeDecodeError) as e: logger.debug(f"Could not parse README content: {e}") if file_name.upper() in ['LICENSE', 'LICENSE.MD', 'LICENSE.TXT']: analysis['has_license'] = True if file_name == '.gitignore': analysis['has_gitignore'] = True # ספור קבצים לפי סוג ext = os.path.splitext(file_name)[1].lower() if ext in self.CODE_EXTENSIONS: analysis['files_by_type'][ext] = analysis['files_by_type'].get(ext, 0) + 1 analysis['file_count'] += 1 # בדוק גודל קובץ if file_content.size > 0: try: # נסה לקרוא את הקובץ אם הוא לא גדול מדי if file_content.size < self.MAX_FILE_SIZE: code_content = base64.b64decode(file_content.content).decode('utf-8') lines = code_content.split('\n') line_count = len(lines) if line_count > self.LARGE_FILE_LINES: analysis['large_files'].append({ 'path': file_path, 'lines': line_count, 'size': file_content.size }) # חפש פונקציות ארוכות (Python/JS) if ext in ['.py', '.js', '.ts']: long_funcs = self._find_long_functions(code_content, ext) analysis['long_functions'].extend(long_funcs) except Exception as e: logger.debug(f"Could not analyze file {file_path}: {e}") # בדוק קבצי תלויות if file_name.lower() in self.CONFIG_FILES: try: if file_content.size < 50000: config_content = base64.b64decode(file_content.content).decode('utf-8') deps = self._extract_dependencies(file_name, config_content) analysis['dependencies'].extend(deps) except (ValueError, UnicodeDecodeError) as e: logger.debug(f"Could not parse config {file_name}: {e}") # חשב ציון איכות כללי analysis['quality_score'] = self._calculate_quality_score(analysis) try: emit_event( "repo_analysis_done", severity="info", owner=str(owner), repo=str(repo_name), file_count=int(analysis.get('file_count', 0)), ) except Exception: pass return analysis except GithubException as e: logger.error(f"GitHub API error: {e}") try: msg = getattr(e, 'data', {}).get('message', str(e)) if hasattr(e, 'data') else str(e) emit_event("repo_analysis_github_api_error", severity="error", error=str(msg)) except Exception: pass raise ValueError( f"שגיאה בגישה לריפוזיטורי: {e.data.get('message', str(e))}" ) from e except Exception as e: logger.error(f"Error analyzing repo: {e}") try: emit_event("repo_analysis_error", severity="error", error=str(e)) except Exception: pass raise ValueError( f"שגיאה בניתוח הריפוזיטורי: {str(e)}" ) from e
def _find_long_functions(self, code: str, ext: str) -> List[Dict[str, Any]]: """מוצא פונקציות ארוכות בקוד""" long_functions = [] try: if ext == '.py': # חפש פונקציות Python pattern = r'^(def|class)\s+(\w+)' lines = code.split('\n') for i, line in enumerate(lines): if re.match(pattern, line): # ספור שורות עד הפונקציה הבאה או סוף הקובץ func_lines = 1 indent_level = len(line) - len(line.lstrip()) for j in range(i + 1, len(lines)): next_line = lines[j] if next_line.strip(): next_indent = len(next_line) - len(next_line.lstrip()) if next_indent <= indent_level and re.match(pattern, next_line): break func_lines += 1 if func_lines > self.LONG_FUNCTION_LINES: match = re.match(pattern, line) if match: long_functions.append({ 'name': match.group(2), 'type': match.group(1), 'lines': func_lines, 'line_number': i + 1 }) elif ext in ['.js', '.ts']: # חפש פונקציות JavaScript/TypeScript pattern = r'(function\s+(\w+)|const\s+(\w+)\s*=\s*\(|class\s+(\w+))' lines = code.split('\n') for i, line in enumerate(lines): match = re.search(pattern, line) if match: # ספור שורות בצורה פשוטה (עד הסוגר הסוגר) func_lines = 1 brace_count = line.count('{') - line.count('}') for j in range(i + 1, len(lines)): func_lines += 1 brace_count += lines[j].count('{') - lines[j].count('}') if brace_count <= 0: break if func_lines > self.LONG_FUNCTION_LINES: func_name = match.group(2) or match.group(3) or match.group(4) or 'anonymous' long_functions.append({ 'name': func_name, 'type': 'function', 'lines': func_lines, 'line_number': i + 1 }) except Exception as e: logger.debug(f"Error finding long functions: {e}") return long_functions def _extract_dependencies(self, filename: str, content: str) -> List[Dict[str, str]]: """מחלץ תלויות מקבצי קונפיגורציה""" dependencies = [] try: if filename == 'requirements.txt': for line in content.split('\n'): line = line.strip() if line and not line.startswith('#'): # נסה לחלץ שם וגרסה parts = re.split(r'[>=<~!]', line) if parts: dependencies.append({ 'name': parts[0].strip(), 'type': 'python', 'version': parts[1].strip() if len(parts) > 1 else 'any' }) elif filename == 'package.json': import json data = json.loads(content) for dep_type in ['dependencies', 'devDependencies']: if dep_type in data: for name, version in data[dep_type].items(): dependencies.append({ 'name': name, 'type': 'npm', 'version': version, 'dev': dep_type == 'devDependencies' }) elif filename == 'pyproject.toml': # ניתוח בסיסי של TOML in_deps = False for line in content.split('\n'): if '[tool.poetry.dependencies]' in line or '[project.dependencies]' in line: in_deps = True elif line.startswith('[') and in_deps: in_deps = False elif in_deps and '=' in line: parts = line.split('=') if len(parts) == 2: name = parts[0].strip().strip('"') version = parts[1].strip().strip('"') if name != 'python': dependencies.append({ 'name': name, 'type': 'python', 'version': version }) except Exception as e: logger.debug(f"Error extracting dependencies from {filename}: {e}") return dependencies def _calculate_quality_score(self, analysis: Dict[str, Any]) -> int: """מחשב ציון איכות כללי (0-100)""" score = 0 # קבצים בסיסיים (40 נקודות) if analysis['has_readme']: score += 15 if analysis.get('documentation_quality') == 'good': score += 10 elif analysis.get('documentation_quality') == 'basic': score += 5 if analysis['has_license']: score += 10 if analysis['has_gitignore']: score += 5 # CI/CD ובדיקות (30 נקודות) if analysis['has_ci_cd']: score += 15 if analysis['test_coverage']: score += 15 # איכות קוד (30 נקודות) if not analysis['large_files']: score += 10 elif len(analysis['large_files']) < 3: score += 5 if not analysis['long_functions']: score += 10 elif len(analysis['long_functions']) < 3: score += 5 # מבנה פרויקט if analysis['file_count'] > 0: score += 5 # פופולריות (בונוס עד 5 נקודות) if analysis.get('stars', 0) > 100: score += 5 elif analysis.get('stars', 0) > 10: score += 3 return min(100, score)
[תיעוד] def generate_improvement_suggestions(self, analysis_data: Dict[str, Any]) -> List[Dict[str, Any]]: """מייצר הצעות לשיפור על בסיס הניתוח""" suggestions = [] # בדוק קבצים בסיסיים if not analysis_data.get('has_license'): suggestions.append({ 'id': 'add_license', 'title': '🔒 הוסף קובץ LICENSE', 'why': 'פרויקט ללא רישיון = כל הזכויות שמורות. זה מונע מאחרים להשתמש בקוד', 'how': 'הוסף קובץ LICENSE עם רישיון מתאים (MIT, Apache 2.0, GPL)', 'impact': 'high', 'effort': 'low', 'category': 'legal' }) if not analysis_data.get('has_readme'): suggestions.append({ 'id': 'add_readme', 'title': '📝 צור קובץ README', 'why': 'README הוא הדבר הראשון שאנשים רואים. בלעדיו אף אחד לא יבין מה הפרויקט עושה', 'how': 'צור README.md עם: תיאור, התקנה, שימוש, דוגמאות', 'impact': 'high', 'effort': 'medium', 'category': 'documentation' }) elif analysis_data.get('documentation_quality') == 'basic': suggestions.append({ 'id': 'improve_readme', 'title': '📚 שפר את ה-README', 'why': 'README בסיסי לא מספיק מידע למשתמשים', 'how': 'הוסף: הוראות התקנה, דוגמאות קוד, API reference, תרומה לפרויקט', 'impact': 'medium', 'effort': 'medium', 'category': 'documentation' }) if not analysis_data.get('has_gitignore'): suggestions.append({ 'id': 'add_gitignore', 'title': '🔧 הוסף .gitignore', 'why': 'מונע העלאת קבצים לא רצויים (node_modules, __pycache__, .env)', 'how': 'צור .gitignore מתאים לשפה. השתמש ב-gitignore.io', 'impact': 'medium', 'effort': 'low', 'category': 'configuration' }) # CI/CD ובדיקות if not analysis_data.get('has_ci_cd'): suggestions.append({ 'id': 'add_ci_cd', 'title': '🔄 הוסף GitHub Actions CI/CD', 'why': 'בדיקות אוטומטיות מונעות באגים ומשפרות איכות', 'how': 'צור .github/workflows עם בדיקות, linting, ו-deployment', 'impact': 'high', 'effort': 'medium', 'category': 'automation' }) if not analysis_data.get('test_coverage'): suggestions.append({ 'id': 'add_tests', 'title': '🧪 הוסף בדיקות (tests)', 'why': 'בדיקות מבטיחות שהקוד עובד ומונעות רגרסיות', 'how': 'צור תיקיית tests עם unit tests ו-integration tests', 'impact': 'high', 'effort': 'high', 'category': 'quality' }) # איכות קוד if analysis_data.get('large_files'): for file_info in analysis_data['large_files'][:3]: # מקסימום 3 הצעות suggestions.append({ 'id': f'split_file_{file_info["path"].replace("/", "_")}', 'title': f'⚡ פצל קובץ גדול: {file_info["path"]}', 'why': f'הקובץ מכיל {file_info["lines"]} שורות. קבצים גדולים קשים לתחזוקה', 'how': 'פצל למודולים לוגיים, הפרד concerns, צור קבצי utility', 'impact': 'medium', 'effort': 'medium', 'category': 'refactoring' }) if analysis_data.get('long_functions'): suggestions.append({ 'id': 'refactor_long_functions', 'title': f'♻️ פרק פונקציות ארוכות ({len(analysis_data["long_functions"])} פונקציות)', 'why': 'פונקציות ארוכות (50+ שורות) קשות להבנה ולתחזוקה', 'how': 'פרק לפונקציות קטנות, השתמש ב-Single Responsibility Principle', 'impact': 'medium', 'effort': 'medium', 'category': 'refactoring' }) # תלויות deps = analysis_data.get('dependencies', []) if deps: # בדוק אם יש תלויות ללא גרסה מנעולה unpinned = [d for d in deps if d.get('version') in ['*', 'latest', 'any', '']] if unpinned: suggestions.append({ 'id': 'pin_dependencies', 'title': f'📦 נעל גרסאות תלויות ({len(unpinned)} לא נעולות)', 'why': 'תלויות לא נעולות יכולות לגרום לבעיות תאימות', 'how': 'ציין גרסאות מדויקות או טווחים (^1.2.3, ~1.2.0)', 'impact': 'medium', 'effort': 'low', 'category': 'dependencies' }) # בדוק עדכניות (אם הפרויקט לא עודכן יותר משנה) if analysis_data.get('updated_at'): try: updated_raw = analysis_data['updated_at'] if isinstance(updated_raw, str): updated = datetime.fromisoformat(updated_raw.replace('Z', '+00:00')) if datetime.now(updated.tzinfo) - updated > timedelta(days=365): suggestions.append({ 'id': 'update_project', 'title': '⬆️ עדכן את הפרויקט', 'why': 'הפרויקט לא עודכן יותר משנה. ייתכן שיש עדכוני אבטחה', 'how': 'עדכן תלויות, בדוק deprecations, הוסף תכונות חדשות', 'impact': 'high', 'effort': 'high', 'category': 'maintenance' }) except (KeyError, ValueError, TypeError) as e: logger.debug(f"Could not evaluate updated_at recency: {e}") # הוסף המלצות נוספות בהתאם לשפה main_language = analysis_data.get('language', '').lower() if main_language == 'python': if not any('black' in d.get('name', '') or 'pylint' in d.get('name', '') or 'flake8' in d.get('name', '') for d in deps): suggestions.append({ 'id': 'add_linter', 'title': '🎨 הוסף linter ו-formatter', 'why': 'כלים אלו משפרים איכות ועקביות הקוד', 'how': 'הוסף black, pylint או flake8. הגדר ב-CI/CD', 'impact': 'medium', 'effort': 'low', 'category': 'quality' }) elif main_language == 'javascript' or main_language == 'typescript': if not any('eslint' in d.get('name', '') or 'prettier' in d.get('name', '') for d in deps): suggestions.append({ 'id': 'add_linter_js', 'title': '🎨 הוסף ESLint ו-Prettier', 'why': 'כלים אלו משפרים איכות ועקביות הקוד', 'how': 'הוסף eslint, prettier. הגדר כללים ב-.eslintrc', 'impact': 'medium', 'effort': 'low', 'category': 'quality' }) # הוסף המלצה לתיעוד API אם יש הרבה קבצים if analysis_data.get('file_count', 0) > 20: suggestions.append({ 'id': 'add_api_docs', 'title': '📖 הוסף תיעוד API', 'why': 'פרויקט גדול צריך תיעוד מפורט של הממשקים', 'how': 'השתמש ב-Sphinx (Python), JSDoc (JS), או כלי דומה', 'impact': 'medium', 'effort': 'high', 'category': 'documentation' }) # מיין לפי חשיבות (impact) ומאמץ (effort) priority_order = { ('high', 'low'): 1, ('high', 'medium'): 2, ('medium', 'low'): 3, ('high', 'high'): 4, ('medium', 'medium'): 5, ('medium', 'high'): 6, ('low', 'low'): 7, ('low', 'medium'): 8, ('low', 'high'): 9 } suggestions.sort(key=lambda x: priority_order.get((x['impact'], x['effort']), 10)) return suggestions