Files
dify/api/extensions/otel/instrumentation.py
wangji0923 f7b78b08fd refactor(api): narrow otel instrumentor typing (#33853)
Co-authored-by: 复试资料 <study@example.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
2026-03-31 10:13:31 +08:00

163 lines
5.7 KiB
Python

import contextlib
import logging
from collections.abc import Callable
from typing import Protocol, cast
import flask
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.metrics import get_meter, get_meter_provider
from opentelemetry.semconv.attributes.http_attributes import ( # type: ignore[import-untyped]
HTTP_REQUEST_METHOD,
HTTP_ROUTE,
)
from opentelemetry.trace import Span, get_tracer_provider
from opentelemetry.trace.status import StatusCode
from configs import dify_config
from dify_app import DifyApp
from extensions.otel.runtime import is_celery_worker
logger = logging.getLogger(__name__)
class SupportsInstrument(Protocol):
def instrument(self, **kwargs: object) -> None: ...
class SupportsFlaskInstrumentor(Protocol):
def instrument_app(
self, app: DifyApp, response_hook: Callable[[Span, str, list], None] | None = None, **kwargs: object
) -> None: ...
# Some OpenTelemetry instrumentor constructors are typed loosely enough that
# pyrefly infers `NoneType`. Narrow the instances to just the methods we use
# while leaving runtime behavior unchanged.
def _new_celery_instrumentor() -> SupportsInstrument:
return cast(
SupportsInstrument,
CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()),
)
def _new_httpx_instrumentor() -> SupportsInstrument:
return cast(SupportsInstrument, HTTPXClientInstrumentor())
def _new_redis_instrumentor() -> SupportsInstrument:
return cast(SupportsInstrument, RedisInstrumentor())
def _new_sqlalchemy_instrumentor() -> SupportsInstrument:
return cast(SupportsInstrument, SQLAlchemyInstrumentor())
class ExceptionLoggingHandler(logging.Handler):
"""
Handler that records exceptions to the current OpenTelemetry span.
Unlike creating a new span, this records exceptions on the existing span
to maintain trace context consistency throughout the request lifecycle.
"""
def emit(self, record: logging.LogRecord):
with contextlib.suppress(Exception):
if not record.exc_info:
return
from opentelemetry.trace import get_current_span
span = get_current_span()
if not span or not span.is_recording():
return
# Record exception on the current span instead of creating a new one
span.set_status(StatusCode.ERROR, record.getMessage())
# Add log context as span events/attributes
span.add_event(
"log.exception",
attributes={
"log.level": record.levelname,
"log.message": record.getMessage(),
"log.logger": record.name,
"log.file.path": record.pathname,
"log.file.line": record.lineno,
},
)
if record.exc_info[1]:
span.record_exception(record.exc_info[1])
if record.exc_info[0]:
span.set_attribute("exception.type", record.exc_info[0].__name__)
def instrument_exception_logging() -> None:
exception_handler = ExceptionLoggingHandler()
logging.getLogger().addHandler(exception_handler)
def init_flask_instrumentor(app: DifyApp) -> None:
meter = get_meter("http_metrics", version=dify_config.project.version)
_http_response_counter = meter.create_counter(
"http.server.response.count",
description="Total number of HTTP responses by status code, method and target",
unit="{response}",
)
def response_hook(span: Span, status: str, response_headers: list) -> None:
if span and span.is_recording():
try:
if status.startswith("2"):
span.set_status(StatusCode.OK)
else:
span.set_status(StatusCode.ERROR, status)
status = status.split(" ")[0]
status_code = int(status)
status_class = f"{status_code // 100}xx"
attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class}
request = flask.request
if request and request.url_rule:
attributes[HTTP_ROUTE] = str(request.url_rule.rule)
if request and request.method:
attributes[HTTP_REQUEST_METHOD] = str(request.method)
_http_response_counter.add(1, attributes)
except Exception:
logger.exception("Error setting status and attributes")
from opentelemetry.instrumentation.flask import FlaskInstrumentor
instrumentor = cast(SupportsFlaskInstrumentor, FlaskInstrumentor())
if dify_config.DEBUG:
logger.info("Initializing Flask instrumentor")
instrumentor.instrument_app(app, response_hook=response_hook)
def init_sqlalchemy_instrumentor(app: DifyApp) -> None:
with app.app_context():
engines = list(app.extensions["sqlalchemy"].engines.values())
_new_sqlalchemy_instrumentor().instrument(enable_commenter=True, engines=engines)
def init_redis_instrumentor() -> None:
_new_redis_instrumentor().instrument()
def init_httpx_instrumentor() -> None:
_new_httpx_instrumentor().instrument()
def init_instruments(app: DifyApp) -> None:
if not is_celery_worker():
init_flask_instrumentor(app)
_new_celery_instrumentor().instrument()
instrument_exception_logging()
init_sqlalchemy_instrumentor(app)
init_redis_instrumentor()
init_httpx_instrumentor()