What happened?
Code to reproduce:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
class _Unpicklable(object):
def __init__(self, value):
self.value = value
def __getstate__(self):
raise NotImplementedError()
def __setstate__(self, state):
raise NotImplementedError()
class _UnpicklableCoder(beam.coders.Coder):
def encode(self, value):
return str(value.value).encode()
def decode(self, encoded):
return _Unpicklable(int(encoded.decode()))
def to_type_hint(self):
return _Unpicklable
def is_deterministic(self):
return True
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)
def pipeline_fn(root):
values = [_Unpicklable(i) for i in range(5)]
#return root | beam.Create(values) | beam.Reshuffle() | beam.Map(lambda x: x.value*2)
return root | beam.Create(values).with_output_types(_Unpicklable) \
| beam.WindowInto(window.SlidingWindows(size=3, period=1)) \
| beam.Reshuffle().with_output_types(_Unpicklable) \
| beam.Map(lambda x: x.value*2)
options = PipelineOptions(runner='DirectRunner', direct_num_workers=1)
pipeline = beam.Pipeline(options=options)
out = pipeline_fn(pipeline)
result = pipeline.run()
result.wait_until_finish()
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
What happened?
Code to reproduce:
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components