|
49 | 49 | # pylint: enable=wrong-import-order, wrong-import-position |
50 | 50 |
|
51 | 51 | _LOGGER = logging.getLogger(__name__) |
52 | | -_RUNNER = "" |
53 | 52 |
|
54 | 53 |
|
55 | 54 | @pytest.mark.uses_gcp_java_expansion_service |
56 | | -@unittest.skipUnless( |
57 | | - os.environ.get('EXPANSION_PORT'), |
58 | | - "EXPANSION_PORT environment var is not provided.") |
| 55 | +# @unittest.skipUnless( |
| 56 | +# os.environ.get('EXPANSION_PORT'), |
| 57 | +# "EXPANSION_PORT environment var is not provided.") |
59 | 58 | class BigQueryXlangStorageWriteIT(unittest.TestCase): |
60 | 59 | BIGQUERY_DATASET = 'python_xlang_storage_write' |
61 | 60 |
|
@@ -106,7 +105,7 @@ def setUp(self): |
106 | 105 | self.test_pipeline = TestPipeline(is_integration_test=True) |
107 | 106 | self.args = self.test_pipeline.get_full_options_as_args() |
108 | 107 | self.project = self.test_pipeline.get_option('project') |
109 | | - _RUNNER = PipelineOptions(self.args).get_all_options()['runner'] |
| 108 | + self._runner = PipelineOptions(self.args).get_all_options()['runner'] |
110 | 109 |
|
111 | 110 | self.bigquery_client = BigQueryWrapper() |
112 | 111 | self.dataset_id = '%s_%s_%s' % ( |
@@ -279,12 +278,13 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): |
279 | 278 | expansion_service=self.expansion_service)) |
280 | 279 | hamcrest_assert(p, bq_matcher) |
281 | 280 |
|
282 | | - @unittest.skipUnless( |
283 | | - "dataflowrunner" in _RUNNER.lower(), |
284 | | - "The exactly-once route has the requirement " |
285 | | - "`beam:requirement:pardo:on_window_expiration:v1`, " |
286 | | - "which is currently only supported by the Dataflow runner.") |
287 | 281 | def test_streaming_with_fixed_num_streams(self): |
| 282 | + # skip if dataflow runner is not specified |
| 283 | + if not self._runner or "dataflowrunner" not in self._runner.lower(): |
| 284 | + self.skipTest( |
| 285 | + "The exactly-once route has the requirement " |
| 286 | + "`beam:requirement:pardo:on_window_expiration:v1`, " |
| 287 | + "which is currently only supported by the Dataflow runner") |
288 | 288 | table = 'streaming_fixed_num_streams' |
289 | 289 | self.run_streaming(table_name=table, num_streams=4) |
290 | 290 |
|
|
0 commit comments