Skip to content

[Bug]: GCSIO delete_batch breaks dataflow pipeline in 2.53 #30166

@lgeiger

Description

@lgeiger

What happened?

I am upgrading Beam from version 2.52 to 2.53 but it looks like I am running into an issue introduced in #25676.

When trying to call delete_batch inside beam.Map something like this:

    def teardown(self):
        def cleanup(count):
            client = GcsIO()
            blobs = client.list_prefix(self.resultio.path)
            client.delete_batch(blobs)

        return (
            "Count all elements" >> beam.combiners.Count.Globally()
            | "Delete blobs" >> beam.Map(cleanup)
        )

I am seeing the following error with Beam 2.53.0 while Beam 2.52.0 works as expected:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 300, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 375, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
    bundle_processor.process_bundle(instruction_id))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 572, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 851, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 995, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/Users/lgeiger/data-infrastructure/.venv/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 1963, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "/Users/lgeiger/data-infrastructure/test.py", line 60, in cleanup
    client.delete_batch(blobs)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line 217, in delete_batch
    current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
                    ~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unhashable type: 'slice'

I am running the pipeline on Dataflow using Python 3.11.

Beam 2.52.0 used to chunk the paths using itertools.islice which avoids having to explicitly use slicing:

paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

@BjornPrime I guess something similar here would fix the above issue, or am I missing something?

Potentially the current code might already work in Python 3.12 since slices will be hashable, but as far as I know Beam doesn't yet provide Python 3.12 containers.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions