-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Bug]: GCSIO delete_batch breaks dataflow pipeline in 2.53 #30166
Description
What happened?
I am upgrading Beam from version 2.52 to 2.53 but it looks like I am running into an issue introduced in #25676.
When trying to call delete_batch inside beam.Map something like this:
def teardown(self):
def cleanup(count):
client = GcsIO()
blobs = client.list_prefix(self.resultio.path)
client.delete_batch(blobs)
return (
"Count all elements" >> beam.combiners.Count.Globally()
| "Delete blobs" >> beam.Map(cleanup)
)I am seeing the following error with Beam 2.53.0 while Beam 2.52.0 works as expected:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 300, in _execute
response = task()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 375, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 572, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 851, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 995, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/Users/lgeiger/data-infrastructure/.venv/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 1963, in <lambda>
wrapper = lambda x: [fn(x)]
^^^^^
File "/Users/lgeiger/data-infrastructure/test.py", line 60, in cleanup
client.delete_batch(blobs)
File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line 217, in delete_batch
current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unhashable type: 'slice'
I am running the pipeline on Dataflow using Python 3.11.
Beam 2.52.0 used to chunk the paths using itertools.islice which avoids having to explicitly use slicing:
beam/sdks/python/apache_beam/io/gcp/gcsio.py
Line 304 in 7c8a997
| paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE)) |
@BjornPrime I guess something similar here would fix the above issue, or am I missing something?
Potentially the current code might already work in Python 3.12 since slices will be hashable, but as far as I know Beam doesn't yet provide Python 3.12 containers.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner