from __future__ import annotations
import os
import threading
from typing import Optional, Any, Mapping
import time
import logging
from resilience import (
DEFAULT_CIRCUIT_POLICY,
DEFAULT_RETRY_POLICY,
CircuitBreakerPolicy,
RetryPolicy,
compute_backoff_delay,
get_circuit_breaker,
note_request_duration,
note_retry,
resolve_labels,
)
import requests
from requests.adapters import HTTPAdapter
try:
from observability import emit_event as _emit_event # type: ignore
except Exception: # pragma: no cover
def _emit_event(_event: str, **_fields): # type: ignore
return None
try:
# Retry is optional depending on urllib3 version bundled with requests
from urllib3.util import Retry # type: ignore
except Exception: # pragma: no cover
Retry = None # type: ignore
_local = threading.local()
try:
from observability_instrumentation import start_span, set_current_span_attributes
except Exception: # pragma: no cover
class _NoSpan:
def __enter__(self): # pragma: no cover
return None
def __exit__(self, *_exc): # pragma: no cover
return False
def start_span(*_a, **_k): # type: ignore
return _NoSpan()
def set_current_span_attributes(*_a, **_k): # type: ignore
return None
def _extract_retry_count(resp: requests.Response) -> Optional[int]:
try:
raw = getattr(resp, "raw", None)
retries_obj = getattr(raw, "retries", None)
if retries_obj is None:
return None
history = getattr(retries_obj, "history", None)
if history is None:
return None
return int(len(history))
except Exception:
return None
def _to_int(env_name: str, default: int) -> int:
try:
val = os.getenv(env_name)
if val is None or val == "":
return int(default)
return int(val)
except Exception:
return int(default)
def _to_float(env_name: str, default: float) -> float:
try:
val = os.getenv(env_name)
if val is None or val == "":
return float(default)
return float(val)
except Exception:
return float(default)
[תיעוד]
class CircuitOpenError(requests.RequestException):
[תיעוד]
def __init__(self, service: str, endpoint: str):
super().__init__(f"Circuit open for {service}:{endpoint}")
self.service = service
self.endpoint = endpoint
_RETRYABLE_STATUS_EXTRA = {408, 425, 429}
def _should_retry_status(status_code: int) -> bool:
try:
code = int(status_code)
except Exception:
return False
if code >= 500:
return True
return code in _RETRYABLE_STATUS_EXTRA
def _is_retryable_exception(exc: Exception) -> bool:
retryable_types = (
requests.ConnectionError,
requests.Timeout,
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ContentDecodingError,
requests.exceptions.ProxyError,
requests.exceptions.SSLError,
)
return isinstance(exc, retryable_types)
def _sleep_with_backoff(attempt: int, policy: RetryPolicy) -> None:
delay = compute_backoff_delay(attempt, policy)
if delay <= 0:
return
try:
time.sleep(delay)
except Exception:
return
def _create_session() -> requests.Session:
pool_conns = _to_int("REQUESTS_POOL_CONNECTIONS", 20)
pool_max = _to_int("REQUESTS_POOL_MAXSIZE", 100)
retries = _to_int("REQUESTS_RETRIES", 2)
backoff = _to_float("REQUESTS_RETRY_BACKOFF", 0.2)
sess = requests.Session()
adapter_kwargs = {"pool_connections": pool_conns, "pool_maxsize": pool_max}
if Retry is not None:
status_forcelist = (500, 502, 503, 504)
allowed = frozenset(["GET", "POST", "PUT", "PATCH", "DELETE"]) # HEAD rarely used here
try:
retry = Retry(
total=retries,
connect=retries,
read=0, # do not retry read timeouts to avoid long hangs
status=retries,
backoff_factor=backoff,
status_forcelist=status_forcelist,
allowed_methods=allowed,
raise_on_status=False,
)
except TypeError:
# Older urllib3 uses method_whitelist
retry = Retry(
total=retries,
connect=retries,
read=0, # do not retry read timeouts to avoid long hangs
status=retries,
backoff_factor=backoff,
status_forcelist=status_forcelist,
method_whitelist=allowed, # type: ignore[arg-type]
raise_on_status=False,
)
adapter_kwargs["max_retries"] = retry
adapter = HTTPAdapter(**adapter_kwargs)
sess.mount("https://", adapter)
sess.mount("http://", adapter)
return sess
[תיעוד]
def get_session() -> requests.Session:
sess: Optional[requests.Session] = getattr(_local, "session", None)
if sess is None:
sess = _create_session()
_local.session = sess
return sess
[תיעוד]
def request(method: str, url: str, **kwargs):
timeout = kwargs.pop("timeout", _to_float("REQUESTS_TIMEOUT", 8.0))
slow_ms = _to_float("HTTP_SLOW_MS", 0.0)
logger = logging.getLogger(__name__)
service_hint = kwargs.pop("service", None)
endpoint_hint = kwargs.pop("endpoint", None)
retry_policy_override = kwargs.pop("retry_policy", None)
circuit_policy_override = kwargs.pop("circuit_policy", None)
max_attempts_override = kwargs.pop("max_attempts", None)
backoff_base_override = kwargs.pop("backoff_base", None)
backoff_cap_override = kwargs.pop("backoff_cap", None)
jitter_override = kwargs.pop("jitter", None)
policy = retry_policy_override or DEFAULT_RETRY_POLICY
if (
max_attempts_override is not None
or backoff_base_override is not None
or backoff_cap_override is not None
or jitter_override is not None
):
def _maybe_int(value, fallback):
try:
return max(1, int(value))
except Exception:
return fallback
def _maybe_float(value, fallback):
try:
return float(value)
except Exception:
return fallback
policy = RetryPolicy(
max_attempts=_maybe_int(max_attempts_override, policy.max_attempts),
backoff_base=_maybe_float(backoff_base_override, policy.backoff_base),
backoff_cap=_maybe_float(backoff_cap_override, policy.backoff_cap),
jitter=_maybe_float(jitter_override, policy.jitter),
)
circuit_policy: CircuitBreakerPolicy = circuit_policy_override or DEFAULT_CIRCUIT_POLICY
service_label, endpoint_label, display_service, display_endpoint = resolve_labels(
url, service_hint, endpoint_hint
)
breaker = get_circuit_breaker(
service_label,
endpoint_label,
display_service=display_service,
display_endpoint=display_endpoint,
policy=circuit_policy,
)
if not breaker.allow_request():
breaker.record_skip()
note_request_duration(service_label, endpoint_label, "circuit_open", 0.0)
try:
_emit_event(
"circuit_open_block",
severity="warning",
service=display_service,
endpoint=display_endpoint,
)
except Exception:
pass
raise CircuitOpenError(display_service, display_endpoint)
headers = kwargs.get("headers")
merged_headers = _merge_observability_headers(headers)
if merged_headers is not None:
kwargs["headers"] = merged_headers
span_attrs = {
"http.method": str(method).upper(),
"http.url": str(url),
"timeout": float(timeout),
"service": service_label,
"endpoint": endpoint_label,
}
span_cm = start_span("http.client", span_attrs)
span = span_cm.__enter__()
if span is not None:
try:
set_current_span_attributes(
{
"component": "http.sync",
"service": service_label,
"endpoint": endpoint_label,
}
)
except Exception:
pass
error: Exception | None = None
result: requests.Response | None = None
retries_performed = 0
observed_retry_history = 0
last_status_code: Optional[int] = None
last_duration_ms: Optional[float] = None
last_status_label: Optional[str] = None
last_error_signature: Optional[str] = None
def _total_retries() -> int:
return max(0, retries_performed + observed_retry_history)
max_attempts = max(1, policy.max_attempts)
try:
for attempt in range(1, max_attempts + 1):
t0 = time.perf_counter()
try:
resp = get_session().request(method=method, url=url, timeout=timeout, **kwargs)
duration_seconds = time.perf_counter() - t0
duration_ms = duration_seconds * 1000.0
status_code = int(getattr(resp, "status_code", 0) or 0)
last_status_code = status_code
last_duration_ms = duration_ms
history_len = _extract_retry_count(resp)
if history_len is not None:
try:
observed_retry_history = max(observed_retry_history, int(history_len))
except Exception:
observed_retry_history = max(observed_retry_history, 0)
if slow_ms and slow_ms > 0 and duration_ms > slow_ms:
try:
logger.warning(
"slow_http",
extra={
"method": str(method).upper(),
"url": str(url),
"status": status_code,
"ms": round(duration_ms, 1),
},
)
except Exception:
pass
if _should_retry_status(status_code):
breaker.record_failure()
note_request_duration(service_label, endpoint_label, "http_error", duration_seconds)
last_status_label = "http_error"
if attempt >= max_attempts:
last_error_signature = "HTTPStatus"
try:
_emit_event(
"external_request_failure",
severity="error",
service=display_service,
endpoint=display_endpoint,
error_signature="HTTPStatus",
retries=_total_retries(),
)
except Exception:
pass
result = resp
break
try:
resp.close()
except Exception:
pass
retries_performed += 1
note_retry(service_label, endpoint_label)
_sleep_with_backoff(attempt, policy)
continue
breaker.record_success()
note_request_duration(service_label, endpoint_label, "success", duration_seconds)
retries_performed = max(retries_performed, attempt - 1)
last_status_label = "success"
result = resp
break
except Exception as exc:
duration_seconds = time.perf_counter() - t0
duration_ms = duration_seconds * 1000.0
last_duration_ms = duration_ms
error = exc
last_error_signature = type(exc).__name__
last_status_label = "exception"
note_request_duration(service_label, endpoint_label, "exception", duration_seconds)
breaker.record_failure()
if not _is_retryable_exception(exc) or attempt >= max_attempts:
break
retries_performed += 1
note_retry(service_label, endpoint_label)
_sleep_with_backoff(attempt, policy)
error = None
continue
return_value = result
if return_value is not None:
if span is not None:
try:
if last_duration_ms is not None:
span.set_attribute("duration_ms", float(last_duration_ms)) # type: ignore[attr-defined]
except Exception:
pass
try:
if last_status_code is not None:
span.set_attribute("http.status_code", int(last_status_code)) # type: ignore[attr-defined]
except Exception:
pass
try:
retry_count_value = _total_retries()
span.set_attribute("retry_count", int(max(0, retry_count_value))) # type: ignore[attr-defined]
except Exception:
pass
try:
span.set_attribute("status", "ok" if last_status_label == "success" else "error") # type: ignore[attr-defined]
except Exception:
pass
return return_value
if error is not None:
if span is not None:
try:
if last_duration_ms is not None:
span.set_attribute("duration_ms", float(last_duration_ms)) # type: ignore[attr-defined]
except Exception:
pass
try:
span.set_attribute("status", "error") # type: ignore[attr-defined]
if last_error_signature:
span.set_attribute("error_signature", last_error_signature) # type: ignore[attr-defined]
except Exception:
pass
try:
retry_count_value = _total_retries()
span.set_attribute("retry_count", int(max(0, retry_count_value))) # type: ignore[attr-defined]
except Exception:
pass
try:
_emit_event(
"external_request_failure",
severity="error",
service=display_service,
endpoint=display_endpoint,
error_signature=last_error_signature or type(error).__name__,
retries=_total_retries(),
)
except Exception:
pass
raise error
if span is not None:
try:
if last_duration_ms is not None:
span.set_attribute("duration_ms", float(last_duration_ms)) # type: ignore[attr-defined]
except Exception:
pass
try:
span.set_attribute("status", "error") # type: ignore[attr-defined]
span.set_attribute(
"error_signature", last_error_signature or "RetriesExhausted"
) # type: ignore[attr-defined]
except Exception:
pass
try:
retry_count_value = _total_retries()
span.set_attribute("retry_count", int(max(0, retry_count_value))) # type: ignore[attr-defined]
except Exception:
pass
error = requests.RequestException(
f"HTTP request failed after {max_attempts} attempts"
)
try:
_emit_event(
"external_request_failure",
severity="error",
service=display_service,
endpoint=display_endpoint,
error_signature=last_error_signature or "RetriesExhausted",
retries=_total_retries(),
)
except Exception:
pass
raise error
finally:
if error is None:
span_cm.__exit__(None, None, None)
else:
span_cm.__exit__(type(error), error, getattr(error, "__traceback__", None))
def _merge_observability_headers(headers: Any) -> Mapping[str, str] | None:
base: dict[str, str] = {}
if headers is None:
base = {}
elif isinstance(headers, Mapping):
for key, value in headers.items():
if key is None or value is None:
continue
try:
base[str(key)] = str(value)
except Exception:
continue
else:
try:
for key, value in dict(headers).items(): # type: ignore[arg-type]
if key is None or value is None:
continue
base[str(key)] = str(value)
except Exception:
base = {}
try:
from observability import prepare_outgoing_headers # type: ignore
merged = prepare_outgoing_headers(base or None)
except Exception:
merged = None
if merged:
return merged
if headers is None:
# לא נוספו כותרות – נחזיר None כדי לשמור על ברירת המחדל של requests
return None
return base or None