Skip to content

Commit bdf3175

Browse files
author
Łukasz Wyszomirski
authored
Improve memory usage in Dataproc deferrable operators (#28117)
1 parent 3fef462 commit bdf3175

File tree

1 file changed

+12
-9
lines changed

1 file changed

+12
-9
lines changed

airflow/providers/google/cloud/hooks/dataproc.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,7 @@ def __init__(
992992
impersonation_chain: str | Sequence[str] | None = None,
993993
) -> None:
994994
super().__init__(gcp_conn_id, delegate_to, impersonation_chain)
995+
self._cached_client: JobControllerAsyncClient | None = None
995996

996997
def get_cluster_client(self, region: str | None = None) -> ClusterControllerAsyncClient:
997998
"""Returns ClusterControllerAsyncClient."""
@@ -1015,15 +1016,17 @@ def get_template_client(self, region: str | None = None) -> WorkflowTemplateServ
10151016

10161017
def get_job_client(self, region: str | None = None) -> JobControllerAsyncClient:
10171018
"""Returns JobControllerAsyncClient."""
1018-
client_options = None
1019-
if region and region != "global":
1020-
client_options = ClientOptions(api_endpoint=f"{region}-dataproc.googleapis.com:443")
1021-
1022-
return JobControllerAsyncClient(
1023-
credentials=self.get_credentials(),
1024-
client_info=CLIENT_INFO,
1025-
client_options=client_options,
1026-
)
1019+
if self._cached_client is None:
1020+
client_options = None
1021+
if region and region != "global":
1022+
client_options = ClientOptions(api_endpoint=f"{region}-dataproc.googleapis.com:443")
1023+
1024+
self._cached_client = JobControllerAsyncClient(
1025+
credentials=self.get_credentials(),
1026+
client_info=CLIENT_INFO,
1027+
client_options=client_options,
1028+
)
1029+
return self._cached_client
10271030

10281031
def get_batch_client(self, region: str | None = None) -> BatchControllerAsyncClient:
10291032
"""Returns BatchControllerAsyncClient"""

0 commit comments

Comments
 (0)