Skip to content

Commit 8b66f37

Browse files
authored
Merge pull request #15044 Coder and Windowing cleanups.
2 parents dd20b4f + f1546aa commit 8b66f37

5 files changed

Lines changed: 25 additions & 24 deletions

File tree

sdks/python/apache_beam/coders/coders.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def _dict_without_impl(self):
220220
return self.__dict__
221221

222222
def to_type_hint(self):
223-
raise NotImplementedError('BEAM-2717')
223+
raise NotImplementedError('BEAM-2717: %s' % self.__class__.__name__)
224224

225225
@classmethod
226226
def from_type_hint(cls, unused_typehint, unused_registry):
@@ -532,6 +532,9 @@ def __eq__(self, other):
532532
def __hash__(self):
533533
return hash(type(self)) + hash(self._key_coder) + hash(self._value_coder)
534534

535+
def __repr__(self):
536+
return 'MapCoder[%s, %s]' % (self._key_coder, self._value_coder)
537+
535538

536539
class NullableCoder(FastCoder):
537540
def __init__(self, value_coder):
@@ -929,10 +932,10 @@ def __eq__(self, other):
929932
def __hash__(self):
930933
return hash(self.proto_message_type)
931934

932-
@staticmethod
933-
def from_type_hint(typehint, unused_registry):
935+
@classmethod
936+
def from_type_hint(cls, typehint, unused_registry):
934937
if issubclass(typehint, proto_utils.message_types):
935-
return ProtoCoder(typehint)
938+
return cls(typehint)
936939
else:
937940
raise ValueError((
938941
'Expected a subclass of google.protobuf.message.Message'
@@ -1019,10 +1022,10 @@ def as_deterministic_coder(self, step_label, error_message=None):
10191022
def to_type_hint(self):
10201023
return typehints.Tuple[tuple(c.to_type_hint() for c in self._coders)]
10211024

1022-
@staticmethod
1023-
def from_type_hint(typehint, registry):
1025+
@classmethod
1026+
def from_type_hint(cls, typehint, registry):
10241027
# type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder
1025-
return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
1028+
return cls([registry.get_coder(t) for t in typehint.tuple_types])
10261029

10271030
def as_cloud_object(self, coders_context=None):
10281031
if self.is_kv_coder():
@@ -1106,10 +1109,10 @@ def as_deterministic_coder(self, step_label, error_message=None):
11061109
return TupleSequenceCoder(
11071110
self._elem_coder.as_deterministic_coder(step_label, error_message))
11081111

1109-
@staticmethod
1110-
def from_type_hint(typehint, registry):
1112+
@classmethod
1113+
def from_type_hint(cls, typehint, registry):
11111114
# type: (Any, CoderRegistry) -> TupleSequenceCoder
1112-
return TupleSequenceCoder(registry.get_coder(typehint.inner_type))
1115+
return cls(registry.get_coder(typehint.inner_type))
11131116

11141117
def _get_component_coders(self):
11151118
# type: () -> Tuple[Coder, ...]
@@ -1484,11 +1487,11 @@ def to_type_hint(self):
14841487
return sharded_key_type.ShardedKeyTypeConstraint(
14851488
self._key_coder.to_type_hint())
14861489

1487-
@staticmethod
1488-
def from_type_hint(typehint, registry):
1490+
@classmethod
1491+
def from_type_hint(cls, typehint, registry):
14891492
from apache_beam.typehints import sharded_key_type
14901493
if isinstance(typehint, sharded_key_type.ShardedKeyTypeConstraint):
1491-
return ShardedKeyCoder(registry.get_coder(typehint.key_type))
1494+
return cls(registry.get_coder(typehint.key_type))
14921495
else:
14931496
raise ValueError((
14941497
'Expected an instance of ShardedKeyTypeConstraint'

sdks/python/apache_beam/io/flink/flink_streaming_impulse_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def expand(self, pbegin):
4242
'Input to transform must be a PBegin but found %s' % pbegin)
4343
return pvalue.PCollection(pbegin.pipeline, is_bounded=False)
4444

45-
def get_windowing(self, inputs):
45+
def get_windowing(self, unused_inputs):
4646
return Windowing(GlobalWindows())
4747

4848
def infer_output_type(self, unused_input_type):

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def _infer_output_coder(
398398
# type: (...) -> typing.Optional[coders.Coder]
399399
return coders.BytesCoder()
400400

401-
def get_windowing(self, inputs):
401+
def get_windowing(self, unused_inputs):
402402
return beam.Windowing(beam.window.GlobalWindows())
403403

404404
def expand(self, pvalue):

sdks/python/apache_beam/transforms/core.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2887,13 +2887,6 @@ def expand(self, pcolls):
28872887
is_bounded = all(pcoll.is_bounded for pcoll in pcolls)
28882888
return pvalue.PCollection(self.pipeline, is_bounded=is_bounded)
28892889

2890-
def get_windowing(self, inputs):
2891-
# type: (typing.Any) -> Windowing
2892-
if not inputs:
2893-
# TODO(robertwb): Return something compatible with every windowing?
2894-
return Windowing(GlobalWindows())
2895-
return super(Flatten, self).get_windowing(inputs)
2896-
28972890
def infer_output_type(self, input_type):
28982891
return input_type
28992892

sdks/python/apache_beam/transforms/ptransform.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,13 @@ def get_windowing(self, inputs):
549549
By default most transforms just return the windowing function associated
550550
with the input PCollection (or the first input if several).
551551
"""
552-
# TODO(robertwb): Assert all input WindowFns compatible.
553-
return inputs[0].windowing
552+
if inputs:
553+
return inputs[0].windowing
554+
else:
555+
from apache_beam.transforms.core import Windowing
556+
from apache_beam.transforms.window import GlobalWindows
557+
# TODO(robertwb): Return something compatible with every windowing?
558+
return Windowing(GlobalWindows())
554559

555560
def __rrshift__(self, label):
556561
return _NamedPTransform(self, label)

0 commit comments

Comments
 (0)