"""
Manual instrumentation helpers built on OpenTelemetry (safe no-op fallback).
- traced decorator for sync/async functions that creates a span and records
duration and errors via OpenTelemetry metrics when available.
- All imports are done lazily to avoid hard dependency in environments where
OpenTelemetry is not installed (e.g., docs/CI minimal).
"""
from __future__ import annotations
from typing import Any, Callable, Optional, TypeVar, Union, cast
import time
import functools
class _NullSpanContext:
__slots__ = ()
def __enter__(self): # pragma: no cover - trivial
return None
def __exit__(self, *_exc_info): # pragma: no cover - trivial
return False
class _SpanContext:
__slots__ = ("_cm", "_attributes", "span")
def __init__(self, cm: Any, attributes: Optional[dict[str, Any]] = None) -> None:
self._cm = cm
self._attributes = attributes or {}
self.span = None
def __enter__(self):
if self._cm is None:
return None
try:
self.span = self._cm.__enter__()
except Exception:
self.span = None
return None
if self.span is not None and self._attributes:
try:
self.span.set_attributes(self._attributes) # type: ignore[attr-defined]
except Exception:
pass
return self.span
def __exit__(self, exc_type, exc, tb):
if self._cm is None:
return False
try:
return bool(self._cm.__exit__(exc_type, exc, tb))
except Exception:
return False
_T = TypeVar("_T")
def _get_tracer_and_meter():
"""Best-effort retrieval of tracer and meter; returns (tracer, meter) or (None, None)."""
try:
from opentelemetry import trace, metrics # type: ignore
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
return tracer, meter
except Exception:
return None, None
[תיעוד]
def start_span(span_name: str, attributes: Optional[dict[str, Any]] = None):
"""Start a span manually (best-effort).
Returns a context manager whose __enter__ yields the span (or None when OTEL missing).
Safe to use as::
span_cm = start_span("my.span", {"key": "value"})
span = span_cm.__enter__()
try:
...
finally:
span_cm.__exit__(None, None, None)
"""
tracer, _meter = _get_tracer_and_meter()
if tracer is None:
return _NullSpanContext()
try:
cm = tracer.start_as_current_span(span_name)
except Exception:
return _NullSpanContext()
return _SpanContext(cm, attributes)
[תיעוד]
def set_current_span_attributes(attributes: Optional[dict[str, Any]] = None) -> None:
"""Attach attributes to the current span if tracing is active."""
if not attributes:
return
try:
from opentelemetry import trace # type: ignore
except Exception:
return
try:
span = trace.get_current_span()
except Exception:
return
if span is None:
return
for key, value in attributes.items():
try:
span.set_attribute(str(key), value) # type: ignore[attr-defined]
except Exception:
continue
[תיעוד]
def traced(span_name: Optional[str] = None, attributes: Optional[dict[str, Any]] = None):
"""Decorator to add OpenTelemetry tracing around a function.
- Works for both sync and async functions.
- If OpenTelemetry is not available, acts as a no-op decorator.
"""
def decorator(func: Callable[..., _T]) -> Callable[..., _T]:
tracer, meter = _get_tracer_and_meter()
# Prepare optional metrics (best-effort)
duration_hist = None
error_counter = None
active_updown = None
if meter is not None:
try:
duration_hist = meter.create_histogram(
"request.duration",
description="Function duration in seconds",
unit="s",
)
except Exception:
duration_hist = None
try:
error_counter = meter.create_counter(
"errors.total",
description="Total number of errors",
unit="1",
)
except Exception:
error_counter = None
try:
active_updown = meter.create_up_down_counter(
"requests.active",
description="Number of active in-flight function calls",
unit="1",
)
except Exception:
active_updown = None
is_async = hasattr(func, "__code__") and getattr(func, "__code__").co_flags & 0x80
def _start_span(name: str):
if tracer is None:
return None
try:
return tracer.start_as_current_span(name)
except Exception:
return None
if not is_async:
@functools.wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any):
name = span_name or f"{func.__module__}.{func.__name__}"
cm = _start_span(name)
# context manager enter
token = None
if cm is not None:
try:
token = cm.__enter__()
if attributes and token is not None:
try:
token.set_attributes(attributes) # type: ignore[attr-defined]
except Exception:
pass
except Exception:
token = None
start = time.perf_counter()
exc: Exception | None = None
try:
if active_updown is not None:
try:
active_updown.add(1)
except Exception:
pass
return func(*args, **kwargs)
except Exception as e:
exc = e
# record exception in span if available
try:
if token is not None:
token.record_exception(e) # type: ignore[attr-defined]
except Exception:
pass
# error counter (best-effort) to align with async path
if error_counter is not None:
try:
error_counter.add(1, {"function": func.__name__, "error_type": type(e).__name__}) # type: ignore[attr-defined]
except Exception:
pass
raise
finally:
# Decrement active counter
if active_updown is not None:
try:
active_updown.add(-1)
except Exception:
pass
# Record duration exactly once; include error attribute when relevant
if duration_hist is not None:
try:
attrs: dict[str, object] = {"function": func.__name__}
if exc is not None:
attrs["error"] = True
duration_hist.record(max(0.0, time.perf_counter() - start), attrs) # type: ignore[attr-defined]
except Exception:
pass
if cm is not None:
try:
cm.__exit__(None, None, None)
except Exception:
pass
return cast(Callable[..., _T], sync_wrapper)
else:
@functools.wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any):
name = span_name or f"{func.__module__}.{func.__name__}"
cm = _start_span(name)
token = None
if cm is not None:
try:
token = cm.__enter__()
if attributes and token is not None:
try:
token.set_attributes(attributes) # type: ignore[attr-defined]
except Exception:
pass
except Exception:
token = None
start = time.perf_counter()
exc: Exception | None = None
try:
if active_updown is not None:
try:
active_updown.add(1)
except Exception:
pass
return await cast(Callable[..., Any], func)(*args, **kwargs)
except Exception as e:
exc = e
try:
if token is not None:
token.record_exception(e) # type: ignore[attr-defined]
except Exception:
pass
# error counter (best-effort)
if error_counter is not None:
try:
error_counter.add(1, {"function": func.__name__, "error_type": type(e).__name__}) # type: ignore[attr-defined]
except Exception:
pass
raise
finally:
if active_updown is not None:
try:
active_updown.add(-1)
except Exception:
pass
if duration_hist is not None:
try:
attrs: dict[str, object] = {"function": func.__name__}
if exc is not None:
attrs["error"] = True
duration_hist.record(max(0.0, time.perf_counter() - start), attrs) # type: ignore[attr-defined]
except Exception:
pass
if cm is not None:
try:
cm.__exit__(None, None, None)
except Exception:
pass
return cast(Callable[..., _T], async_wrapper)
return decorator
__all__ = ["traced", "start_span", "set_current_span_attributes"]