-
Notifications
You must be signed in to change notification settings - Fork 625
Expand file tree
/
Copy pathrq.py
More file actions
225 lines (179 loc) · 7.81 KB
/
Copy pathrq.py
File metadata and controls
225 lines (179 loc) · 7.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import functools
import weakref
import sentry_sdk
from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.scope import Scope, should_send_default_pii
from sentry_sdk.traces import SegmentSource
from sentry_sdk.tracing import TransactionSource
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
SENSITIVE_DATA_SUBSTITUTE,
capture_internal_exceptions,
event_from_exception,
format_timestamp,
parse_version,
)
try:
from rq.job import JobStatus
from rq.queue import Queue
from rq.timeouts import JobTimeoutException
from rq.version import VERSION as RQ_VERSION
from rq.worker import Worker
except ImportError:
raise DidNotEnable("RQ not installed")
try:
from rq.worker import BaseWorker
if not hasattr(BaseWorker, "perform_job"):
BaseWorker = None
except ImportError:
BaseWorker = None
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable
from rq.job import Job
from sentry_sdk._types import Event, EventProcessor
from sentry_sdk.utils import ExcInfo
class RqIntegration(Integration):
identifier = "rq"
origin = f"auto.queue.{identifier}"
@staticmethod
def setup_once() -> None:
version = parse_version(RQ_VERSION)
_check_minimum_version(RqIntegration, version)
# In rq 2.7.0+, SimpleWorker inherits from BaseWorker directly
# instead of Worker, so we need to patch BaseWorker to cover both.
# For older versions where BaseWorker doesn't exist or doesn't have
# perform_job, we patch Worker.
worker_cls = BaseWorker if BaseWorker is not None else Worker
old_perform_job = worker_cls.perform_job
@functools.wraps(old_perform_job)
def sentry_patched_perform_job(
self: "Any", job: "Job", *args: "Queue", **kwargs: "Any"
) -> bool:
client = sentry_sdk.get_client()
if client.get_integration(RqIntegration) is None:
return old_perform_job(self, job, *args, **kwargs)
with sentry_sdk.new_scope() as scope:
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
if has_span_streaming_enabled(client.options):
sentry_sdk.traces.continue_trace(
job.meta.get("_sentry_trace_headers") or {}
)
Scope.set_custom_sampling_context({"rq_job": job})
func_name = None
with capture_internal_exceptions():
func_name = job.func_name
with sentry_sdk.traces.start_span(
name="unknown RQ task" if func_name is None else func_name,
attributes={
"sentry.op": OP.QUEUE_TASK_RQ,
"sentry.origin": RqIntegration.origin,
"sentry.span.source": SegmentSource.TASK,
SPANDATA.MESSAGING_MESSAGE_ID: job.id,
},
parent_span=None,
) as span:
if func_name is not None:
span.set_attribute(SPANDATA.CODE_FUNCTION_NAME, func_name)
rv = old_perform_job(self, job, *args, **kwargs)
else:
transaction = continue_trace(
job.meta.get("_sentry_trace_headers") or {},
op=OP.QUEUE_TASK_RQ,
name="unknown RQ task",
source=TransactionSource.TASK,
origin=RqIntegration.origin,
)
with capture_internal_exceptions():
transaction.name = job.func_name
with sentry_sdk.start_transaction(
transaction,
custom_sampling_context={"rq_job": job},
):
rv = old_perform_job(self, job, *args, **kwargs)
if self.is_horse:
# We're inside of a forked process and RQ is
# about to call `os._exit`. Make sure that our
# events get sent out.
sentry_sdk.get_client().flush()
return rv
worker_cls.perform_job = sentry_patched_perform_job
old_handle_exception = worker_cls.handle_exception
def sentry_patched_handle_exception(
self: "Worker", job: "Any", *exc_info: "Any", **kwargs: "Any"
) -> "Any":
retry = (
hasattr(job, "retries_left")
and job.retries_left
and job.retries_left > 0
)
failed = job._status == JobStatus.FAILED or job.is_failed
if failed and not retry:
_capture_exception(exc_info)
return old_handle_exception(self, job, *exc_info, **kwargs)
worker_cls.handle_exception = sentry_patched_handle_exception
old_enqueue_job = Queue.enqueue_job
@functools.wraps(old_enqueue_job)
def sentry_patched_enqueue_job(
self: "Queue", job: "Any", **kwargs: "Any"
) -> "Any":
client = sentry_sdk.get_client()
if client.get_integration(RqIntegration) is None:
return old_enqueue_job(self, job, **kwargs)
scope = sentry_sdk.get_current_scope()
span = (
scope.streamed_span
if has_span_streaming_enabled(client.options)
else scope.span
)
if span is not None:
job.meta["_sentry_trace_headers"] = dict(
scope.iter_trace_propagation_headers()
)
return old_enqueue_job(self, job, **kwargs)
Queue.enqueue_job = sentry_patched_enqueue_job
ignore_logger("rq.worker")
def _make_event_processor(weak_job: "Callable[[], Job]") -> "EventProcessor":
def event_processor(event: "Event", hint: "dict[str, Any]") -> "Event":
job = weak_job()
if job is not None:
with capture_internal_exceptions():
extra = event.setdefault("extra", {})
rq_job = {
"job_id": job.id,
"func": job.func_name,
"args": (
job.args
if should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE
),
"kwargs": (
job.kwargs
if should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE
),
"description": job.description,
}
if job.enqueued_at:
rq_job["enqueued_at"] = format_timestamp(job.enqueued_at)
if job.started_at:
rq_job["started_at"] = format_timestamp(job.started_at)
extra["rq-job"] = rq_job
if "exc_info" in hint:
with capture_internal_exceptions():
if issubclass(hint["exc_info"][0], JobTimeoutException):
event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name]
return event
return event_processor
def _capture_exception(exc_info: "ExcInfo", **kwargs: "Any") -> None:
client = sentry_sdk.get_client()
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={"type": "rq", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)