Skip to content

Commit 5d6cb04

Browse files
authored
[yaml] : switch js2py to pythonmonkey (#37560)
* switch js2py to pythonmonkey * move python monkey to the latest * try to fix recursion * fix lint * another lint * add missing parameter * address gemini review comments and add doc str * remove python_version
1 parent 5849f2e commit 5d6cb04

4 files changed

Lines changed: 183 additions & 93 deletions

File tree

sdks/python/apache_beam/yaml/standard_io.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@
145145
path: 'path'
146146
'WriteToJson':
147147
path: 'path'
148+
num_shards: 'num_shards'
148149
'ReadFromParquet':
149150
path: 'file_pattern'
150151
'WriteToParquet':

sdks/python/apache_beam/yaml/yaml_mapping.py

Lines changed: 165 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,15 @@
1616
#
1717

1818
"""This module defines the basic MapToFields operation."""
19+
import atexit
20+
import importlib
1921
import itertools
22+
import os
23+
import queue
2024
import re
25+
import sys
26+
import threading
27+
import uuid
2128
from collections import abc
2229
from collections.abc import Callable
2330
from collections.abc import Collection
@@ -53,14 +60,6 @@
5360
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn
5461
from apache_beam.yaml.yaml_provider import dicts_to_rows
5562

56-
# Import js2py package if it exists
57-
try:
58-
import js2py
59-
from js2py.base import JsObjectWrapper
60-
except ImportError:
61-
js2py = None
62-
JsObjectWrapper = object
63-
6463
_str_expression_fields = {
6564
'AssignTimestamps': 'timestamp',
6665
'Filter': 'keep',
@@ -178,20 +177,6 @@ def _check_mapping_arguments(
178177
raise ValueError(f'{transform_name} cannot specify "name" without "path"')
179178

180179

181-
# js2py's JsObjectWrapper object has a self-referencing __dict__ property
182-
# that cannot be pickled without implementing the __getstate__ and
183-
# __setstate__ methods.
184-
class _CustomJsObjectWrapper(JsObjectWrapper):
185-
def __init__(self, js_obj):
186-
super().__init__(js_obj.__dict__['_obj'])
187-
188-
def __getstate__(self):
189-
return self.__dict__.copy()
190-
191-
def __setstate__(self, state):
192-
self.__dict__.update(state)
193-
194-
195180
# TODO(yaml) Improve type inferencing for JS UDF's
196181
def py_value_to_js_dict(py_value):
197182
if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or
@@ -205,85 +190,181 @@ def py_value_to_js_dict(py_value):
205190
return py_value
206191

207192

208-
# TODO(yaml) Consider adding optional language version parameter to support
209-
# ECMAScript 5 and 6
193+
class PythonMonkeyDispatcher:
194+
"""Dispatcher for executing JavaScript code using pythonmonkey.
195+
196+
This class manages a worker thread to execute JavaScript, ensuring that
197+
pythonmonkey is only imported and used within that thread. It also handles
198+
process shutdown carefully to avoid segmentation faults known to occur
199+
when pythonmonkey is present during standard Python interpreter finalization.
200+
"""
201+
def __init__(self):
202+
self._req_queue = queue.Queue()
203+
self._resp_events = {}
204+
self._resp_data = {}
205+
self._lock = threading.Lock()
206+
self._thread = threading.Thread(target=self._worker, daemon=True)
207+
self._started = False
208+
# Register the stop method to be called on exit.
209+
# atexit handlers are executed in LIFO order. By registering at import time,
210+
# we ensure this handler runs last, allowing other cleanup handlers
211+
# (registered later) to execute first.
212+
atexit.register(self.stop)
213+
214+
def start(self):
215+
with self._lock:
216+
if not self._started:
217+
self._thread.start()
218+
self._started = True
219+
220+
def stop(self):
221+
# This method is called on process exit.
222+
if not self._started:
223+
return
224+
# Flush standard streams before forced exit to avoid data loss.
225+
try:
226+
sys.stdout.flush()
227+
sys.stderr.flush()
228+
except Exception:
229+
pass
230+
# Force an immediate exit to avoid a segmentation fault that occurs with
231+
# pythonmonkey during standard interpreter finalization.
232+
# Since this runs as one of the last atexit handlers (due to import-time
233+
# registration), most other cleanup should have already completed.
234+
os._exit(0)
235+
236+
def _worker(self):
237+
try:
238+
import pythonmonkey as pm
239+
except ImportError:
240+
pm = None
241+
242+
self._pm = pm
243+
self._cache = {}
244+
245+
while True:
246+
req = self._req_queue.get()
247+
if req is None:
248+
break
249+
250+
req_id, type_str, payload = req
251+
res = None
252+
is_err = False
253+
try:
254+
if self._pm is None:
255+
raise ImportError(
256+
"PythonMonkey not installed or failed to import in worker thread."
257+
)
258+
259+
if type_str == 'exec':
260+
source, row = payload
261+
if source not in self._cache:
262+
self._cache[source] = self._pm.eval(f"({source})")
263+
func = self._cache[source]
264+
res = func(row)
265+
except Exception as e:
266+
res = e
267+
is_err = True
268+
269+
with self._lock:
270+
if req_id in self._resp_events:
271+
self._resp_data[req_id] = (is_err, res)
272+
self._resp_events[req_id].set()
273+
274+
def eval_and_run(self, source, row):
275+
if not self._started:
276+
self.start()
277+
278+
req_id = str(uuid.uuid4())
279+
event = threading.Event()
280+
with self._lock:
281+
self._resp_events[req_id] = event
282+
283+
self._req_queue.put((req_id, 'exec', (source, row)))
284+
event.wait()
285+
286+
with self._lock:
287+
is_err, result = self._resp_data.pop(req_id)
288+
del self._resp_events[req_id]
289+
290+
if is_err:
291+
raise result
292+
return result
293+
294+
295+
_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
296+
297+
298+
class JavaScriptCallable:
299+
def __init__(self, source, name=None):
300+
self._source = source
301+
self._name = name
302+
303+
def __call__(self, row):
304+
# Check for pythonmonkey availability lazily (on first call)
305+
if importlib.util.find_spec("pythonmonkey") is None:
306+
raise RuntimeError(
307+
"PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
308+
"to use JavaScript mapping functions.")
309+
310+
row_as_dict = py_value_to_js_dict(row)
311+
try:
312+
# If we have a name, it means we evaluated a file and need to call
313+
# a specific function.
314+
# Dispatcher expects a self-contained source/expression.
315+
if self._name:
316+
# Wrap: (function() { <source>; return <name>; })()
317+
effective_source = (
318+
f"(function() {{ {self._source}; return {self._name}; }})()")
319+
else:
320+
# Expression/Callable case: Wrap in parens to be safe
321+
effective_source = f"({self._source})"
322+
323+
js_result = _pythonmonkey_dispatcher.eval_and_run(
324+
effective_source, row_as_dict)
325+
326+
except Exception as exn:
327+
raise RuntimeError(
328+
f"Error evaluating javascript expression: {exn}") from exn
329+
return dicts_to_rows(_finalize_js_result(js_result))
330+
331+
332+
def _finalize_js_result(obj):
333+
"""Coerce pythonmonkey objects to native Python objects (specifically
334+
strings).
335+
"""
336+
if isinstance(obj, str):
337+
return str(obj)
338+
if isinstance(obj, list):
339+
return [_finalize_js_result(x) for x in obj]
340+
if isinstance(obj, dict):
341+
return {k: _finalize_js_result(v) for k, v in obj.items()}
342+
return obj
343+
344+
210345
def _expand_javascript_mapping_func(
211346
original_fields, expression=None, callable=None, path=None, name=None):
212347

213-
# Check for installed js2py package
214-
if js2py is None:
348+
if importlib.util.find_spec("pythonmonkey") is None:
215349
raise ValueError(
216-
"Javascript mapping functions are not supported on"
217-
" Python 3.12 or later.")
218-
219-
# import remaining js2py objects
220-
from js2py import base
221-
from js2py.constructors import jsdate
222-
from js2py.internals import simplex
223-
224-
js_array_type = (
225-
base.PyJsArray,
226-
base.PyJsArrayBuffer,
227-
base.PyJsInt8Array,
228-
base.PyJsUint8Array,
229-
base.PyJsUint8ClampedArray,
230-
base.PyJsInt16Array,
231-
base.PyJsUint16Array,
232-
base.PyJsInt32Array,
233-
base.PyJsUint32Array,
234-
base.PyJsFloat32Array,
235-
base.PyJsFloat64Array)
236-
237-
def _js_object_to_py_object(obj):
238-
if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
239-
return base.to_python(obj)
240-
elif isinstance(obj, js_array_type):
241-
return [_js_object_to_py_object(value) for value in obj.to_list()]
242-
elif isinstance(obj, jsdate.PyJsDate):
243-
return obj.to_utc_dt()
244-
elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
245-
return None
246-
elif isinstance(obj, base.PyJsError):
247-
raise RuntimeError(obj['message'])
248-
elif isinstance(obj, base.PyJsObject):
249-
return {
250-
key: _js_object_to_py_object(value['value'])
251-
for (key, value) in obj.own.items()
252-
}
253-
elif isinstance(obj, base.JsObjectWrapper):
254-
return _js_object_to_py_object(obj._obj)
255-
256-
return obj
350+
"PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
351+
"to use JavaScript mapping functions.")
257352

258353
if expression:
259354
source = '\n'.join(['function(__row__) {'] + [
260355
f' {name} = __row__.{name}'
261356
for name in original_fields if name in expression
262357
] + [' return (' + expression + ')'] + ['}'])
263-
js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
358+
return JavaScriptCallable(source)
264359

265360
elif callable:
266-
js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
361+
return JavaScriptCallable(callable)
267362

268363
else:
269364
if not path.endswith('.js'):
270365
raise ValueError(f'File "{path}" is not a valid .js file.')
271366
udf_code = FileSystems.open(path).read().decode()
272-
js = js2py.EvalJs()
273-
js.eval(udf_code)
274-
js_func = _CustomJsObjectWrapper(getattr(js, name))
275-
276-
def js_wrapper(row):
277-
row_as_dict = py_value_to_js_dict(row)
278-
try:
279-
js_result = js_func(row_as_dict)
280-
except simplex.JsException as exn:
281-
raise RuntimeError(
282-
f"Error evaluating javascript expression: "
283-
f"{exn.mes['message']}") from exn
284-
return dicts_to_rows(_js_object_to_py_object(js_result))
285-
286-
return js_wrapper
367+
return JavaScriptCallable(udf_code, name=name)
287368

288369

289370
def _expand_python_mapping_func(

sdks/python/apache_beam/yaml/yaml_udf_test.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import importlib
1718
import logging
1819
import os
1920
import shutil
@@ -31,11 +32,17 @@
3132
from apache_beam.yaml.yaml_provider import dicts_to_rows
3233
from apache_beam.yaml.yaml_transform import YamlTransform
3334

35+
# We use find_spec to check for pythonmonkey availability without importing it.
36+
# Importing pythonmonkey initializes the engine and binds it to the current
37+
# thread (MainThread). This causes "too much recursion" errors when the
38+
# Dispatcher later tries to use it from a background thread.
3439
try:
35-
import js2py
40+
pm_available = importlib.util.find_spec("pythonmonkey") is not None
3641
except ImportError:
37-
js2py = None
38-
logging.warning('js2py is not installed; some tests will be skipped.')
42+
pm_available = False
43+
44+
if not pm_available:
45+
logging.warning('pythonmonkey is not installed; some tests will be skipped.')
3946

4047

4148
def as_rows():
@@ -63,7 +70,7 @@ def setUp(self):
6370
def tearDown(self):
6471
shutil.rmtree(self.tmpdir)
6572

66-
@unittest.skipIf(js2py is None, 'js2py not installed.')
73+
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
6774
def test_map_to_fields_filter_inline_js(self):
6875
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
6976
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -197,7 +204,7 @@ def test_map_to_fields_sql_reserved_keyword_append():
197204
beam.Row(label='389a', timestamp=2, label_copy="389a"),
198205
]))
199206

200-
@unittest.skipIf(js2py is None, 'js2py not installed.')
207+
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
201208
def test_filter_inline_js(self):
202209
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
203210
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -252,7 +259,7 @@ def test_filter_inline_py(self):
252259
row=beam.Row(rank=2, values=[7, 8, 9])),
253260
]))
254261

255-
@unittest.skipIf(js2py is None, 'js2py not installed.')
262+
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
256263
def test_filter_expression_js(self):
257264
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
258265
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -296,7 +303,7 @@ def test_filter_expression_py(self):
296303
row=beam.Row(rank=0, values=[1, 2, 3])),
297304
]))
298305

299-
@unittest.skipIf(js2py is None, 'js2py not installed.')
306+
@unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
300307
def test_filter_inline_js_file(self):
301308
data = '''
302309
function f(x) {

sdks/python/setup.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,8 +574,9 @@ def get_portability_package_data():
574574
'docstring-parser>=0.15,<1.0',
575575
'jinja2>=3.0,<3.2',
576576
'virtualenv-clone>=0.5,<1.0',
577-
# https://github.com/PiotrDabkowski/Js2Py/issues/317
578-
'js2py>=0.74,<1; python_version<"3.12"',
577+
# pythonmonkey is used for Javascript mapping support
578+
# Please install NPM and Node.js before installing PythonMonkey.
579+
'pythonmonkey>=1.3.0',
579580
'jsonschema>=4.0.0,<5.0.0',
580581
] + dataframe_dependency,
581582
# Keep the following dependencies in line with what we test against

0 commit comments

Comments
 (0)