Skip to content

[Bug]: MLTransform drops elements if they are already transformed before. #29600

@AnandInguva

Description

@AnandInguva

What happened?

When duplicate elements are present in the input PColl, the MLTransform will only output the elements once and drops the remaining duplicate transformed elements. This is not an expected behavior.

Note: MLTransform is intended to be an experimental feature in 2.50.0 to 2.52.0 and this bug suggests not to use MLTransform with those versions if your data have identical elements.

For 2.53.0, the fix will be introduced in PR #29542


Simple repro:

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
    {
        'x': 'I'
    },
    {
        'x': 'love'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'is'
    },
    {
        'x': 'awesome'
    },
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

Expected output

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))

Actual output

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Labels

    P2awaiting triagebugdone & doneIssue has been reviewed after it was closed for verification, followups, etc.python

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions