import asyncio
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import Dict, List
[תיעוד]
class RateLimiter:
"""מגביל קצב פשוט בזיכרון לפי משתמש.
- חלון מתגלגל של 60 שניות
- ללא תלות חיצונית
"""
[תיעוד]
def __init__(self, max_per_minute: int = 30) -> None:
self.max_per_minute = max(1, int(max_per_minute or 30))
self._requests: Dict[int, List[datetime]] = defaultdict(list)
self._lock = asyncio.Lock()
[תיעוד]
async def check_rate_limit(self, user_id: int) -> bool:
"""מחזיר True אם מותר להמשיך, אחרת False."""
now = datetime.now(timezone.utc)
one_min_ago = now - timedelta(seconds=60)
async with self._lock:
entries = self._requests[user_id]
# נקה בקשות ישנות מהחלון (תיקון off-by-one כאשר כל הערכים פגי-תוקף)
delete_upto = len(entries)
for idx, ts in enumerate(entries):
if ts > one_min_ago:
delete_upto = idx
break
if delete_upto > 0:
del entries[:delete_upto]
if len(entries) >= self.max_per_minute:
return False
entries.append(now)
return True
[תיעוד]
async def get_current_usage_ratio(self, user_id: int) -> float:
"""מחזיר יחס שימוש נוכחי בחלון (0.0–1.0).
מנקה ערכים ישנים לפני החישוב כדי לשקף את החלון המתגלגל של 60 שניות.
"""
now = datetime.now(timezone.utc)
one_min_ago = now - timedelta(seconds=60)
async with self._lock:
entries = self._requests.get(user_id, [])
if entries:
delete_upto = len(entries)
for idx, ts in enumerate(entries):
if ts > one_min_ago:
delete_upto = idx
break
if delete_upto > 0:
del entries[:delete_upto]
used = len(self._requests.get(user_id, []))
limit = max(1, int(self.max_per_minute))
return min(1.0, float(used) / float(limit))