@@ -38,11 +38,13 @@ class and wrapper class that allows lambda functions to be used as
3838
3939import copy
4040import itertools
41+ import json
4142import logging
4243import operator
4344import os
4445import sys
4546import threading
47+ import warnings
4648from functools import reduce
4749from functools import wraps
4850from typing import TYPE_CHECKING
@@ -83,6 +85,7 @@ class and wrapper class that allows lambda functions to be used as
8385from apache_beam .typehints .trivial_inference import instance_to_type
8486from apache_beam .typehints .typehints import validate_composite_type_param
8587from apache_beam .utils import proto_utils
88+ from apache_beam .utils import python_callable
8689
8790if TYPE_CHECKING :
8891 from apache_beam import coders
@@ -95,6 +98,7 @@ class and wrapper class that allows lambda functions to be used as
9598 'PTransform' ,
9699 'ptransform_fn' ,
97100 'label_from_callable' ,
101+ 'annotate_yaml' ,
98102]
99103
100104_LOGGER = logging .getLogger (__name__ )
@@ -1096,3 +1100,51 @@ def __ror__(self, pvalueish, _unused=None):
10961100
10971101 def expand (self , pvalue ):
10981102 raise RuntimeError ("Should never be expanded directly." )
1103+
1104+
1105+ # Defined here to avoid circular import issues for Beam library transforms.
1106+ def annotate_yaml (constructor ):
1107+ """Causes instances of this transform to be annotated with their yaml syntax.
1108+
1109+ Should only be used for transforms that are fully defined by their constructor
1110+ arguments.
1111+ """
1112+ @wraps (constructor )
1113+ def wrapper (* args , ** kwargs ):
1114+ transform = constructor (* args , ** kwargs )
1115+
1116+ fully_qualified_name = (
1117+ f'{ constructor .__module__ } .{ constructor .__qualname__ } ' )
1118+ try :
1119+ imported_constructor = (
1120+ python_callable .PythonCallableWithSource .
1121+ load_from_fully_qualified_name (fully_qualified_name ))
1122+ if imported_constructor != wrapper :
1123+ raise ImportError ('Different object.' )
1124+ except ImportError :
1125+ warnings .warn (f'Cannot import { constructor } as { fully_qualified_name } .' )
1126+ return transform
1127+
1128+ try :
1129+ config = json .dumps ({
1130+ 'constructor' : fully_qualified_name ,
1131+ 'args' : args ,
1132+ 'kwargs' : kwargs ,
1133+ })
1134+ except TypeError as exn :
1135+ warnings .warn (
1136+ f'Cannot serialize arguments for { constructor } as json: { exn } ' )
1137+ return transform
1138+
1139+ original_annotations = transform .annotations
1140+ transform .annotations = lambda : {
1141+ ** original_annotations (),
1142+ # These override whatever may have been provided earlier.
1143+ # The outermost call is expected to be the most specific.
1144+ 'yaml_provider' : 'python' ,
1145+ 'yaml_type' : 'PyTransform' ,
1146+ 'yaml_args' : config ,
1147+ }
1148+ return transform
1149+
1150+ return wrapper
0 commit comments