Skip to content

Support Pandas==2.x in Apache Beam. #27221

@JorgeCardona

Description

@JorgeCardona

What happened?

Beam doesn't work with pandas>=2.0.0

Reproducible Example

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
pipeline = beam.Pipeline(options=options)
data = pipeline | 'CreateData' >> beam.Create(['Pandas, 2.0,2'])
data | 'PrintData' >> beam.Map(print)
pipeline.run()

Issue Description

Pipeline fails with a runtime error: `AttributeError: type object 'Series' has no attribute 'append'`. Sample stacktrace:
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[2], line 8
      4 options = PipelineOptions()
      6 pipeline = beam.Pipeline(options=options)
----> 8 data = pipeline | 'CreateData' >> beam.Create(['Pandas, 2.0,2'])
     10 data | 'PrintData' >> beam.Map(print)
     11 # Ejecutar el pipeline

File /usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:1092, in _NamedPTransform.__ror__(self, pvalueish, _unused)
   1091 def __ror__(self, pvalueish, _unused=None):
-> 1092   return self.transform.__ror__(pvalueish, self.label)

File /usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:614, in PTransform.__ror__(self, left, label)
    612 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
    613 self.pipeline = p
--> 614 result = p.apply(self, pvalueish, label)
    615 if deferred:
    616   return result

File /usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py:666, in Pipeline.apply(self, transform, pvalueish, label)
    664 old_label, transform.label = transform.label, label
    665 try:
--> 666   return self.apply(transform, pvalueish)
    667 finally:
    668   transform.label = old_label

File /usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py:674, in Pipeline.apply(self, transform, pvalueish, label)
    670 # Attempts to alter the label of the transform to be applied only when it's
    671 # a top-level transform so that the cell number will not be prepended to
    672 # every child transform in a composite.
    673 if self._current_transform() is self._root_transform():
--> 674   alter_label_if_ipython(transform, pvalueish)
    676 full_label = '/'.join(
    677     [self._current_transform().full_label, label or
    678      transform.label]).lstrip('/')
    679 if full_label in self.applied_labels:

File /usr/local/lib/python3.11/site-packages/apache_beam/utils/interactive_utils.py:71, in alter_label_if_ipython(transform, pvalueish)
     59 """Alters the label to an interactive label with ipython prompt metadata
     60 prefixed for the given transform if the given pvalueish belongs to a
     61 user-defined pipeline and current code execution is within an ipython kernel.
   (...)
     68 `Cell {prompt}: {original_label}`.
     69 """
     70 if is_in_ipython():
---> 71   from apache_beam.runners.interactive import interactive_environment as ie
     72   # Tracks user defined pipeline instances in watched scopes so that we only
     73   # alter labels for any transform to pvalueish belonging to those pipeline
     74   # instances, excluding any transform to be applied in other pipeline
     75   # instances the Beam SDK creates implicitly.
     76   ie.current_env().track_user_pipelines()

File /usr/local/lib/python3.11/site-packages/apache_beam/runners/interactive/interactive_environment.py:41
     39 from apache_beam.runners.direct import direct_runner
     40 from apache_beam.runners.interactive import cache_manager as cache
---> 41 from apache_beam.runners.interactive.messaging.interactive_environment_inspector import InteractiveEnvironmentInspector
     42 from apache_beam.runners.interactive.recording_manager import RecordingManager
     43 from apache_beam.runners.interactive.sql.sql_chain import SqlChain

File /usr/local/lib/python3.11/site-packages/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py:26
     23 # pytype: skip-file
     25 import apache_beam as beam
---> 26 from apache_beam.runners.interactive.utils import as_json
     27 from apache_beam.runners.interactive.utils import obfuscate
     30 class InteractiveEnvironmentInspector(object):

File /usr/local/lib/python3.11/site-packages/apache_beam/runners/interactive/utils.py:33
     30 import pandas as pd
     32 import apache_beam as beam
---> 33 from apache_beam.dataframe.convert import to_pcollection
     34 from apache_beam.dataframe.frame_base import DeferredBase
     35 from apache_beam.internal.gcp import auth

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/convert.py:33
     31 from apache_beam.dataframe import expressions
     32 from apache_beam.dataframe import frame_base
---> 33 from apache_beam.dataframe import transforms
     34 from apache_beam.dataframe.schemas import element_typehint_from_dataframe_proxy
     35 from apache_beam.dataframe.schemas import generate_proxy

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/transforms.py:33
     31 from apache_beam import transforms
     32 from apache_beam.dataframe import expressions
---> 33 from apache_beam.dataframe import frames  # pylint: disable=unused-import
     34 from apache_beam.dataframe import partitionings
     35 from apache_beam.utils import windowed_value

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/frames.py:1231
   1224       return func(*args, **kwargs)
   1226     return func(self, *args, **kwargs)
   1229 @populate_not_implemented(pd.Series)
   1230 @frame_base.DeferredFrame._register_for(pd.Series)
-> 1231 class DeferredSeries(DeferredDataFrameOrSeries):
   1232   def __repr__(self):
   1233     return (
   1234         f'DeferredSeries(name={self.name!r}, dtype={self.dtype}, '
   1235         f'{self._render_indexes()})')

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/frames.py:1338, in DeferredSeries()
   1331 transpose = frame_base._elementwise_method('transpose', base=pd.Series)
   1332 shape = property(
   1333     frame_base.wont_implement_method(
   1334         pd.Series, 'shape', reason="non-deferred-result"))
   1336 @frame_base.with_docs_from(pd.Series)
   1337 @frame_base.args_to_kwargs(pd.Series)
-> 1338 @frame_base.populate_defaults(pd.Series)
   1339 def append(self, to_append, ignore_index, verify_integrity, **kwargs):
   1340   """``ignore_index=True`` is not supported, because it requires generating an
   1341   order-sensitive index."""
   1342   if not isinstance(to_append, DeferredSeries):

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/frame_base.py:600, in populate_defaults.<locals>.wrap(func)
    599 def wrap(func):
--> 600   base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
    601   if not base_argspec.defaults:
    602     return func

AttributeError: type object 'Series' has no attribute 'append'

Expected Behavior

the code must be the same result that version 1.5.3 from pandas
image

Installed Versions

Details

Replace this line with the output of pd.show_versions()

version that fails Pandas 2.0.2
image

version that working ok Pandas 1.5.3
image

Response when I reported the issue on the panda's project

pandas-dev/pandas#53799 (comment)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

P2done & doneIssue has been reviewed after it was closed for verification, followups, etc.new featurepython

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions