1616#
1717
1818"""This module defines the basic MapToFields operation."""
19+ import atexit
20+ import importlib
1921import itertools
22+ import os
23+ import queue
2024import re
25+ import sys
26+ import threading
27+ import uuid
2128from collections import abc
2229from collections .abc import Callable
2330from collections .abc import Collection
5360from apache_beam .yaml .yaml_errors import maybe_with_exception_handling_transform_fn
5461from apache_beam .yaml .yaml_provider import dicts_to_rows
5562
56- # Import js2py package if it exists
57- try :
58- import js2py
59- from js2py .base import JsObjectWrapper
60- except ImportError :
61- js2py = None
62- JsObjectWrapper = object
63-
6463_str_expression_fields = {
6564 'AssignTimestamps' : 'timestamp' ,
6665 'Filter' : 'keep' ,
@@ -178,20 +177,6 @@ def _check_mapping_arguments(
178177 raise ValueError (f'{ transform_name } cannot specify "name" without "path"' )
179178
180179
181- # js2py's JsObjectWrapper object has a self-referencing __dict__ property
182- # that cannot be pickled without implementing the __getstate__ and
183- # __setstate__ methods.
184- class _CustomJsObjectWrapper (JsObjectWrapper ):
185- def __init__ (self , js_obj ):
186- super ().__init__ (js_obj .__dict__ ['_obj' ])
187-
188- def __getstate__ (self ):
189- return self .__dict__ .copy ()
190-
191- def __setstate__ (self , state ):
192- self .__dict__ .update (state )
193-
194-
195180# TODO(yaml) Improve type inferencing for JS UDF's
196181def py_value_to_js_dict (py_value ):
197182 if ((isinstance (py_value , tuple ) and hasattr (py_value , '_asdict' )) or
@@ -205,85 +190,181 @@ def py_value_to_js_dict(py_value):
205190 return py_value
206191
207192
208- # TODO(yaml) Consider adding optional language version parameter to support
209- # ECMAScript 5 and 6
193+ class PythonMonkeyDispatcher :
194+ """Dispatcher for executing JavaScript code using pythonmonkey.
195+
196+ This class manages a worker thread to execute JavaScript, ensuring that
197+ pythonmonkey is only imported and used within that thread. It also handles
198+ process shutdown carefully to avoid segmentation faults known to occur
199+ when pythonmonkey is present during standard Python interpreter finalization.
200+ """
201+ def __init__ (self ):
202+ self ._req_queue = queue .Queue ()
203+ self ._resp_events = {}
204+ self ._resp_data = {}
205+ self ._lock = threading .Lock ()
206+ self ._thread = threading .Thread (target = self ._worker , daemon = True )
207+ self ._started = False
208+ # Register the stop method to be called on exit.
209+ # atexit handlers are executed in LIFO order. By registering at import time,
210+ # we ensure this handler runs last, allowing other cleanup handlers
211+ # (registered later) to execute first.
212+ atexit .register (self .stop )
213+
214+ def start (self ):
215+ with self ._lock :
216+ if not self ._started :
217+ self ._thread .start ()
218+ self ._started = True
219+
220+ def stop (self ):
221+ # This method is called on process exit.
222+ if not self ._started :
223+ return
224+ # Flush standard streams before forced exit to avoid data loss.
225+ try :
226+ sys .stdout .flush ()
227+ sys .stderr .flush ()
228+ except Exception :
229+ pass
230+ # Force an immediate exit to avoid a segmentation fault that occurs with
231+ # pythonmonkey during standard interpreter finalization.
232+ # Since this runs as one of the last atexit handlers (due to import-time
233+ # registration), most other cleanup should have already completed.
234+ os ._exit (0 )
235+
236+ def _worker (self ):
237+ try :
238+ import pythonmonkey as pm
239+ except ImportError :
240+ pm = None
241+
242+ self ._pm = pm
243+ self ._cache = {}
244+
245+ while True :
246+ req = self ._req_queue .get ()
247+ if req is None :
248+ break
249+
250+ req_id , type_str , payload = req
251+ res = None
252+ is_err = False
253+ try :
254+ if self ._pm is None :
255+ raise ImportError (
256+ "PythonMonkey not installed or failed to import in worker thread."
257+ )
258+
259+ if type_str == 'exec' :
260+ source , row = payload
261+ if source not in self ._cache :
262+ self ._cache [source ] = self ._pm .eval (f"({ source } )" )
263+ func = self ._cache [source ]
264+ res = func (row )
265+ except Exception as e :
266+ res = e
267+ is_err = True
268+
269+ with self ._lock :
270+ if req_id in self ._resp_events :
271+ self ._resp_data [req_id ] = (is_err , res )
272+ self ._resp_events [req_id ].set ()
273+
274+ def eval_and_run (self , source , row ):
275+ if not self ._started :
276+ self .start ()
277+
278+ req_id = str (uuid .uuid4 ())
279+ event = threading .Event ()
280+ with self ._lock :
281+ self ._resp_events [req_id ] = event
282+
283+ self ._req_queue .put ((req_id , 'exec' , (source , row )))
284+ event .wait ()
285+
286+ with self ._lock :
287+ is_err , result = self ._resp_data .pop (req_id )
288+ del self ._resp_events [req_id ]
289+
290+ if is_err :
291+ raise result
292+ return result
293+
294+
295+ _pythonmonkey_dispatcher = PythonMonkeyDispatcher ()
296+
297+
298+ class JavaScriptCallable :
299+ def __init__ (self , source , name = None ):
300+ self ._source = source
301+ self ._name = name
302+
303+ def __call__ (self , row ):
304+ # Check for pythonmonkey availability lazily (on first call)
305+ if importlib .util .find_spec ("pythonmonkey" ) is None :
306+ raise RuntimeError (
307+ "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
308+ "to use JavaScript mapping functions." )
309+
310+ row_as_dict = py_value_to_js_dict (row )
311+ try :
312+ # If we have a name, it means we evaluated a file and need to call
313+ # a specific function.
314+ # Dispatcher expects a self-contained source/expression.
315+ if self ._name :
316+ # Wrap: (function() { <source>; return <name>; })()
317+ effective_source = (
318+ f"(function() {{ { self ._source } ; return { self ._name } ; }})()" )
319+ else :
320+ # Expression/Callable case: Wrap in parens to be safe
321+ effective_source = f"({ self ._source } )"
322+
323+ js_result = _pythonmonkey_dispatcher .eval_and_run (
324+ effective_source , row_as_dict )
325+
326+ except Exception as exn :
327+ raise RuntimeError (
328+ f"Error evaluating javascript expression: { exn } " ) from exn
329+ return dicts_to_rows (_finalize_js_result (js_result ))
330+
331+
332+ def _finalize_js_result (obj ):
333+ """Coerce pythonmonkey objects to native Python objects (specifically
334+ strings).
335+ """
336+ if isinstance (obj , str ):
337+ return str (obj )
338+ if isinstance (obj , list ):
339+ return [_finalize_js_result (x ) for x in obj ]
340+ if isinstance (obj , dict ):
341+ return {k : _finalize_js_result (v ) for k , v in obj .items ()}
342+ return obj
343+
344+
210345def _expand_javascript_mapping_func (
211346 original_fields , expression = None , callable = None , path = None , name = None ):
212347
213- # Check for installed js2py package
214- if js2py is None :
348+ if importlib .util .find_spec ("pythonmonkey" ) is None :
215349 raise ValueError (
216- "Javascript mapping functions are not supported on"
217- " Python 3.12 or later." )
218-
219- # import remaining js2py objects
220- from js2py import base
221- from js2py .constructors import jsdate
222- from js2py .internals import simplex
223-
224- js_array_type = (
225- base .PyJsArray ,
226- base .PyJsArrayBuffer ,
227- base .PyJsInt8Array ,
228- base .PyJsUint8Array ,
229- base .PyJsUint8ClampedArray ,
230- base .PyJsInt16Array ,
231- base .PyJsUint16Array ,
232- base .PyJsInt32Array ,
233- base .PyJsUint32Array ,
234- base .PyJsFloat32Array ,
235- base .PyJsFloat64Array )
236-
237- def _js_object_to_py_object (obj ):
238- if isinstance (obj , (base .PyJsNumber , base .PyJsString , base .PyJsBoolean )):
239- return base .to_python (obj )
240- elif isinstance (obj , js_array_type ):
241- return [_js_object_to_py_object (value ) for value in obj .to_list ()]
242- elif isinstance (obj , jsdate .PyJsDate ):
243- return obj .to_utc_dt ()
244- elif isinstance (obj , (base .PyJsNull , base .PyJsUndefined )):
245- return None
246- elif isinstance (obj , base .PyJsError ):
247- raise RuntimeError (obj ['message' ])
248- elif isinstance (obj , base .PyJsObject ):
249- return {
250- key : _js_object_to_py_object (value ['value' ])
251- for (key , value ) in obj .own .items ()
252- }
253- elif isinstance (obj , base .JsObjectWrapper ):
254- return _js_object_to_py_object (obj ._obj )
255-
256- return obj
350+ "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
351+ "to use JavaScript mapping functions." )
257352
258353 if expression :
259354 source = '\n ' .join (['function(__row__) {' ] + [
260355 f' { name } = __row__.{ name } '
261356 for name in original_fields if name in expression
262357 ] + [' return (' + expression + ')' ] + ['}' ])
263- js_func = _CustomJsObjectWrapper ( js2py . eval_js ( source ) )
358+ return JavaScriptCallable ( source )
264359
265360 elif callable :
266- js_func = _CustomJsObjectWrapper ( js2py . eval_js ( callable ) )
361+ return JavaScriptCallable ( callable )
267362
268363 else :
269364 if not path .endswith ('.js' ):
270365 raise ValueError (f'File "{ path } " is not a valid .js file.' )
271366 udf_code = FileSystems .open (path ).read ().decode ()
272- js = js2py .EvalJs ()
273- js .eval (udf_code )
274- js_func = _CustomJsObjectWrapper (getattr (js , name ))
275-
276- def js_wrapper (row ):
277- row_as_dict = py_value_to_js_dict (row )
278- try :
279- js_result = js_func (row_as_dict )
280- except simplex .JsException as exn :
281- raise RuntimeError (
282- f"Error evaluating javascript expression: "
283- f"{ exn .mes ['message' ]} " ) from exn
284- return dicts_to_rows (_js_object_to_py_object (js_result ))
285-
286- return js_wrapper
367+ return JavaScriptCallable (udf_code , name = name )
287368
288369
289370def _expand_python_mapping_func (
0 commit comments