-
Notifications
You must be signed in to change notification settings - Fork 7.4k
Expand file tree
/
Copy pathobject_ref.pxi
More file actions
182 lines (148 loc) · 6.12 KB
/
object_ref.pxi
File metadata and controls
182 lines (148 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from ray.includes.unique_ids cimport CObjectID
from ray.includes.optional cimport (
optional,
nullopt,
)
import asyncio
import concurrent.futures
import functools
import logging
import threading
from typing import Callable, Any, Union, Optional
from _collections_abc import GenericAlias
from builtins import StopAsyncIteration
import ray
import cython
logger = logging.getLogger(__name__)
def _set_future_helper(
result: Any,
*,
py_future: Union[asyncio.Future, concurrent.futures.Future],
):
# Issue #11030, #8841
# If this future has result set already, we just need to
# skip the set result/exception procedure.
if py_future.done():
return
if isinstance(result, RayTaskError):
exc = result.as_instanceof_cause()
# Convert StopIteration to RuntimeError to prevent segfaults due to
# Cpython's behavior w.r.t. PEP479
if isinstance(exc, StopIteration) or isinstance(exc, StopAsyncIteration):
runtime_error = RuntimeError(f"generator raised {type(exc).__name__}")
runtime_error.__cause__ = exc
py_future.set_exception(runtime_error)
else:
py_future.set_exception(exc)
elif isinstance(result, RayError):
# Directly raise exception for RayActorError
py_future.set_exception(result)
else:
py_future.set_result(result)
cdef class ObjectRef(BaseID):
__class_getitem__ = classmethod(GenericAlias) # should match how typing.Generic works
def __cinit__(self):
self.in_core_worker = False
def __init__(
self, id, owner_addr="", call_site_data="",
skip_adding_local_ref=False, tensor_transport: Optional[str] = None):
self._set_id(id)
self.owner_addr = owner_addr
self.in_core_worker = False
self.call_site_data = call_site_data
self._tensor_transport = tensor_transport
worker = ray._private.worker.global_worker
# TODO(edoakes): We should be able to remove the in_core_worker flag.
# But there are still some dummy object refs being created outside the
# context of a core worker.
if hasattr(worker, "core_worker"):
if not skip_adding_local_ref:
worker.core_worker.add_object_ref_reference(self)
self.in_core_worker = True
def __dealloc__(self):
if self.in_core_worker:
try:
worker = ray._private.worker.global_worker
worker.core_worker.remove_object_ref_reference(self)
except Exception as e:
# There is a strange error in rllib that causes the above to
# fail. Somehow the global 'ray' variable corresponding to the
# imported package is None when this gets called. Unfortunately
# this is hard to debug because __dealloc__ is called during
# garbage collection so we can't get a good stack trace. In any
# case, there's not much we can do besides ignore it
# (re-importing ray won't help).
pass
cdef CObjectID native(self):
return <CObjectID>self.data
def binary(self):
return self.data.Binary()
def hex(self):
return decode(self.data.Hex())
def is_nil(self):
return self.data.IsNil()
cdef size_t hash(self):
return self.data.Hash()
def task_id(self):
return TaskID(self.data.TaskId().Binary())
def job_id(self):
return self.task_id().job_id()
def owner_address(self):
return self.owner_addr
def call_site(self):
return decode(self.call_site_data)
@classmethod
def size(cls):
return CObjectID.Size()
def _set_id(self, id):
check_id(id)
self.data = CObjectID.FromBinary(<c_string>id)
@classmethod
def nil(cls):
return cls(CObjectID.Nil().Binary())
@classmethod
def from_random(cls):
return cls(CObjectID.FromRandom().Binary())
def future(self) -> concurrent.futures.Future:
"""Wrap ObjectRef with a concurrent.futures.Future
Note that the future cancellation will not cancel the correspoding
task when the ObjectRef representing return object of a task.
Additionally, future.running() will always be ``False`` even if the
underlying task is running.
"""
py_future = concurrent.futures.Future()
self._on_completed(
functools.partial(_set_future_helper, py_future=py_future))
# A hack to keep a reference to the object ref for ref counting.
py_future.object_ref = self
return py_future
def __await__(self):
return self.as_future(_internal=True).__await__()
def as_future(self, _internal=False) -> asyncio.Future:
"""Wrap ObjectRef with an asyncio.Future.
Note that the future cancellation will not cancel the correspoding
task when the ObjectRef representing return object of a task.
"""
if not _internal:
logger.warning("ref.as_future() is deprecated in favor of "
"asyncio.wrap_future(ref.future()).")
return asyncio.wrap_future(self.future())
def _on_completed(self, py_callback: Callable[[Any], None]):
"""Register a callback that will be called after Object is ready.
If the ObjectRef is already ready, the callback will be called soon.
The callback should take the result as the only argument. The result
can be an exception object in case of task error.
"""
core_worker = ray._private.worker.global_worker.core_worker
core_worker.set_get_async_callback(self, py_callback)
return self
def tensor_transport(self):
return self._tensor_transport
cdef optional[c_string] c_tensor_transport(self):
cdef:
optional[c_string] c_tensor_transport = nullopt
c_string c_tensor_transport_str
if self._tensor_transport is not None:
c_tensor_transport_str = self._tensor_transport.encode("utf-8")
c_tensor_transport.emplace(move(c_tensor_transport_str))
return c_tensor_transport