-
Notifications
You must be signed in to change notification settings - Fork 240
Expand file tree
/
Copy pathspan.py
More file actions
299 lines (273 loc) · 12.9 KB
/
span.py
File metadata and controls
299 lines (273 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import datetime
import types as python_types
import typing
import urllib.parse
from typing import Optional, Union
from opentelemetry.context import Context
from opentelemetry.sdk import trace as oteltrace
from opentelemetry.trace import SpanKind
from opentelemetry.trace.propagation import _SPAN_KEY
from opentelemetry.trace.span import SpanContext, TraceFlags, TraceState
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util import types
import elasticapm
import elasticapm.conf.constants as constants
import elasticapm.traces
class Span(oteltrace.Span):
"""
This is a wrapper around an Elastic APM Span/Transaction object, to match the
otel Span API
"""
def __init__(
self,
name: str,
elastic_span: elasticapm.traces.BaseSpan,
set_status_on_exception: Optional[bool] = None,
client: Optional[elasticapm.Client] = None,
) -> None:
self.elastic_span = elastic_span
self.otel_context = Context({_SPAN_KEY: self})
elastic_span.otel_wrapper = self
self.set_status_on_exception = set_status_on_exception
self.client = client if client else elasticapm.get_client()
self._name = name
def end(self, end_time: Optional[int] = None) -> None:
"""Sets the current time as the span's end time.
The span's end time is the wall time at which the operation finished.
Only the first call to `end` should modify the span, and
implementations are free to ignore or raise on further calls.
"""
is_transaction = isinstance(self.elastic_span, elasticapm.traces.Transaction)
if self.elastic_span.ended_time:
# Already ended
return
self._set_types()
if end_time:
if is_transaction:
self.client.end_transaction(
name=self._name,
result=self.elastic_span.outcome or "OK",
duration=end_time - self.elastic_span.timestamp,
)
else:
self.elastic_span.end(duration=end_time - self.elastic_span.timestamp)
else:
if is_transaction:
self.client.end_transaction(name=self._name, result=self.elastic_span.outcome or "OK")
else:
self.elastic_span.end()
def get_span_context(self) -> "SpanContext":
"""Gets the span's SpanContext.
Get an immutable, serializable identifier for this span that can be
used to create new child spans.
Returns:
A :class:`opentelemetry.trace.SpanContext` with a copy of this span's immutable state.
"""
return SpanContext(
trace_id=int(self.elastic_span.transaction.trace_parent.trace_id, base=16),
span_id=int(self.elastic_span.id, base=16),
is_remote=False,
trace_flags=TraceFlags(
TraceFlags.SAMPLED if self.elastic_span.transaction.is_sampled else TraceFlags.DEFAULT
),
trace_state=TraceState(list(self.elastic_span.transaction.trace_parent.tracestate_dict.items())),
)
def set_attributes(self, attributes: typing.Dict[str, types.AttributeValue]) -> None:
"""Sets Attributes.
Sets Attributes with the key and value passed as arguments dict.
Note: The behavior of `None` value attributes is undefined, and hence strongly discouraged.
"""
if not attributes:
return
for key, value in attributes.items():
self.set_attribute(key, value)
def set_attribute(self, key: str, value: types.AttributeValue) -> None:
"""Sets an Attribute.
Sets a single Attribute with the key and value passed as arguments.
Note: The behavior of `None` value attributes is undefined, and hence strongly discouraged.
"""
if "otel_attributes" not in self.elastic_span.context:
self.elastic_span.context["otel_attributes"] = {}
self.elastic_span.context["otel_attributes"][key] = value
def add_event(
self,
name: str,
attributes: types.Attributes = None,
timestamp: Optional[int] = None,
) -> None:
"""Adds an `Event`.
Adds a single `Event` with the name and, optionally, a timestamp and
attributes passed as arguments. Implementations should generate a
timestamp if the `timestamp` argument is omitted.
"""
raise NotImplementedError("Events are not implemented in the otel bridge at this time")
def update_name(self, name: str) -> None:
"""Updates the `Span` name.
This will override the name provided via :func:`opentelemetry.trace.Tracer.start_span`.
Upon this update, any sampling behavior based on Span name will depend
on the implementation.
"""
self.elastic_span.name = name
def is_recording(self) -> bool:
"""Returns whether this span will be recorded.
Returns true if this Span is active and recording information like
events with the add_event operation and attributes using set_attribute.
"""
return self.elastic_span.transaction.is_sampled and not self.elastic_span.ended_time
def set_status(self, status: Union[Status, StatusCode], description: Optional[str] = None) -> None:
"""Sets the Status of the Span. If used, this will override the default
Span status.
"""
# Handle both Status objects and StatusCode enums
if isinstance(status, Status):
status_code = status.status_code
else:
status_code = status
if status_code == StatusCode.ERROR:
self.elastic_span.outcome = constants.OUTCOME.FAILURE
elif status_code == StatusCode.OK:
self.elastic_span.outcome = constants.OUTCOME.SUCCESS
else:
self.elastic_span.outcome = constants.OUTCOME.UNKNOWN
def record_exception(
self,
exception: Exception,
attributes: types.Attributes = None,
timestamp: Optional[int] = None,
escaped: bool = False,
) -> None:
"""Records an exception as a span event."""
client = elasticapm.get_client()
client.capture_exception(
exc_info=(type(exception), exception, exception.__traceback__),
date=datetime.datetime.fromtimestamp(timestamp),
context={"otel_attributes": attributes} if attributes else None,
handled=escaped,
)
def __enter__(self) -> "Span":
"""Invoked when `Span` is used as a context manager.
Returns the `Span` itself.
"""
return self
def __exit__(
self,
exc_type: Optional[typing.Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[python_types.TracebackType],
) -> None:
"""Ends context manager and calls `end` on the `Span`."""
self.end()
def _set_types(self) -> None:
"""
Set the types and subtypes for the underlying Elastic transaction/span
"""
if isinstance(self.elastic_span, elasticapm.traces.Transaction):
attributes = self.elastic_span.context.get("otel_attributes", {})
span_kind = self.elastic_span.context["otel_spankind"]
is_rpc = bool(attributes.get("rpc.system"))
is_http = bool(attributes.get("http.url")) or bool(attributes.get("http.scheme"))
is_messaging = bool(attributes.get("messaging.system"))
if span_kind == SpanKind.SERVER.name and (is_rpc or is_http):
transaction_type = "request"
elif span_kind == SpanKind.CONSUMER.name and is_messaging:
transaction_type = "messaging"
else:
transaction_type = "unknown"
self.elastic_span.transaction_type = transaction_type
else:
attributes = self.elastic_span.context.get("otel_attributes", {})
span_type = None
span_subtype = None
resource = None
def http_port_from_scheme(scheme: str):
if scheme == "http":
return 80
elif scheme == "https":
return 443
return None
def parse_net_name(url: str):
u = urllib.parse.urlparse(url)
if u.port:
return u.netloc
else:
port = http_port_from_scheme(u.scheme)
return u.netloc if not port else "{}:{}".format(u.netloc, port)
net_port = attributes.get("net.peer.port", -1)
net_name = net_peer = attributes.get("net.peer.name", attributes.get("net.peer.ip", ""))
if net_name and (net_port > 0):
net_name = f"{net_name}:{net_port}"
if attributes.get("db.system"):
span_type = "db"
span_subtype = attributes.get("db.system")
resource = net_name or span_subtype
if attributes.get("db.name"):
resource = "{}/{}".format(resource, attributes.get("db.name"))
elif attributes.get("messaging.system"):
span_type = "messaging"
span_subtype = attributes.get("messaging.system")
if not net_name and attributes.get("messaging.url"):
net_name = parse_net_name(attributes.get("messaging.url"))
resource = net_name or span_subtype
if attributes.get("messaging.destination"):
resource = "{}/{}".format(resource, attributes.get("messaging.destination"))
elif attributes.get("rpc.system"):
span_type = "external"
span_subtype = attributes.get("rpc.system")
resource = net_name or span_subtype
if attributes.get("rpc.service"):
resource = "{}/{}".format(resource, attributes.get("rpc.service"))
elif attributes.get("http.url") or attributes.get("http.scheme"):
span_type = "external"
span_subtype = "http"
http_host = attributes.get("http.host", net_peer)
if http_host:
if net_port < 0:
net_port = http_port_from_scheme(attributes.get("http.scheme"))
resource = http_host if net_port < 0 else f"{http_host}:{net_port}"
elif attributes.get("http.url"):
resource = parse_net_name(attributes["http.url"])
if not span_type:
span_kind = self.elastic_span.context["otel_spankind"]
if span_kind == SpanKind.INTERNAL.name:
span_type = "app"
span_subtype = "internal"
else:
span_type = "unknown"
self.elastic_span.type = span_type
self.elastic_span.subtype = span_subtype
if resource:
if "destination" not in self.elastic_span.context:
self.elastic_span.context["destination"] = {"service": {"resource": resource}}
elif "service" not in self.elastic_span.context["destination"]:
self.elastic_span.context["destination"]["service"] = {"resource": resource}
else:
self.elastic_span.context["destination"]["service"]["resource"] = resource