import json

import sentry_sdk
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
from sentry_sdk.tracing import (
    BAGGAGE_HEADER_NAME,
    SENTRY_TRACE_HEADER_NAME,
    TransactionSource,
)
from sentry_sdk.utils import (
    AnnotatedValue,
    capture_internal_exceptions,
    event_from_exception,
)
from typing import TypeVar

R = TypeVar("R")

try:
    from dramatiq.broker import Broker
    from dramatiq.middleware import Middleware, default_middleware
    from dramatiq.errors import Retry
    from dramatiq.message import Message
except ImportError:
    raise DidNotEnable("Dramatiq is not installed")

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any, Callable, Dict, Optional, Union
    from sentry_sdk._types import Event, Hint


class DramatiqIntegration(Integration):
    """
    Dramatiq integration for Sentry

    Please make sure that you call `sentry_sdk.init` *before* initializing
    your broker, as it monkey patches `Broker.__init__`.

    This integration was originally developed and maintained
    by https://github.com/jacobsvante and later donated to the Sentry
    project.
    """

    identifier = "dramatiq"
    origin = f"auto.queue.{identifier}"

    @staticmethod
    def setup_once():
        # type: () -> None

        _patch_dramatiq_broker()


def _patch_dramatiq_broker():
    # type: () -> None
    original_broker__init__ = Broker.__init__

    def sentry_patched_broker__init__(self, *args, **kw):
        # type: (Broker, *Any, **Any) -> None
        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)

        try:
            middleware = kw.pop("middleware")
        except KeyError:
            # Unfortunately Broker and StubBroker allows middleware to be
            # passed in as positional arguments, whilst RabbitmqBroker and
            # RedisBroker does not.
            if len(args) == 1:
                middleware = args[0]
                args = []  # type: ignore
            else:
                middleware = None

        if middleware is None:
            middleware = list(m() for m in default_middleware)
        else:
            middleware = list(middleware)

        if integration is not None:
            middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)]
            middleware.insert(0, SentryMiddleware())

        kw["middleware"] = middleware
        original_broker__init__(self, *args, **kw)

    Broker.__init__ = sentry_patched_broker__init__


class SentryMiddleware(Middleware):  # type: ignore[misc]
    """
    A Dramatiq middleware that automatically captures and sends
    exceptions to Sentry.

    This is automatically added to every instantiated broker via the
    DramatiqIntegration.
    """

    SENTRY_HEADERS_NAME = "_sentry_headers"

    def before_enqueue(self, broker, message, delay):
        # type: (Broker, Message[R], int) -> None
        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
        if integration is None:
            return

        message.options[self.SENTRY_HEADERS_NAME] = {
            BAGGAGE_HEADER_NAME: get_baggage(),
            SENTRY_TRACE_HEADER_NAME: get_traceparent(),
        }

    def before_process_message(self, broker, message):
        # type: (Broker, Message[R]) -> None
        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
        if integration is None:
            return

        message._scope_manager = sentry_sdk.isolation_scope()
        scope = message._scope_manager.__enter__()
        scope.clear_breadcrumbs()
        scope.set_extra("dramatiq_message_id", message.message_id)
        scope.add_event_processor(_make_message_event_processor(message, integration))

        sentry_headers = message.options.get(self.SENTRY_HEADERS_NAME) or {}
        if "retries" in message.options:
            # start new trace in case of retrying
            sentry_headers = {}

        transaction = continue_trace(
            sentry_headers,
            name=message.actor_name,
            op=OP.QUEUE_TASK_DRAMATIQ,
            source=TransactionSource.TASK,
            origin=DramatiqIntegration.origin,
        )
        transaction.set_status(SPANSTATUS.OK)
        sentry_sdk.start_transaction(
            transaction,
            name=message.actor_name,
            op=OP.QUEUE_TASK_DRAMATIQ,
            source=TransactionSource.TASK,
        )
        transaction.__enter__()

    def after_process_message(self, broker, message, *, result=None, exception=None):
        # type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None
        integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
        if integration is None:
            return

        actor = broker.get_actor(message.actor_name)
        throws = message.options.get("throws") or actor.options.get("throws")

        scope_manager = message._scope_manager
        transaction = sentry_sdk.get_current_scope().transaction
        if not transaction:
            return None

        is_event_capture_required = (
            exception is not None
            and not (throws and isinstance(exception, throws))
            and not isinstance(exception, Retry)
        )
        if not is_event_capture_required:
            # normal transaction finish
            transaction.__exit__(None, None, None)
            scope_manager.__exit__(None, None, None)
            return

        event, hint = event_from_exception(
            exception,  # type: ignore[arg-type]
            client_options=sentry_sdk.get_client().options,
            mechanism={
                "type": DramatiqIntegration.identifier,
                "handled": False,
            },
        )
        sentry_sdk.capture_event(event, hint=hint)
        # transaction error
        transaction.__exit__(type(exception), exception, None)
        scope_manager.__exit__(type(exception), exception, None)


def _make_message_event_processor(message, integration):
    # type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]

    def inner(event, hint):
        # type: (Event, Hint) -> Optional[Event]
        with capture_internal_exceptions():
            DramatiqMessageExtractor(message).extract_into_event(event)

        return event

    return inner


class DramatiqMessageExtractor:
    def __init__(self, message):
        # type: (Message[R]) -> None
        self.message_data = dict(message.asdict())

    def content_length(self):
        # type: () -> int
        return len(json.dumps(self.message_data))

    def extract_into_event(self, event):
        # type: (Event) -> None
        client = sentry_sdk.get_client()
        if not client.is_active():
            return

        contexts = event.setdefault("contexts", {})
        request_info = contexts.setdefault("dramatiq", {})
        request_info["type"] = "dramatiq"

        data = None  # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
        if not request_body_within_bounds(client, self.content_length()):
            data = AnnotatedValue.removed_because_over_size_limit()
        else:
            data = self.message_data

        request_info["data"] = data
