Skip to content

Commit c082aec

Browse files
Fix accessing a GKE cluster through the private endpoint in GKEStartPodOperator (#31391)
* Fix accessing a GKE cluster through the private endpoint in `GKEStartPodOperator` * Add a unit test for cluster info
1 parent 45b6cfa commit c082aec

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

airflow/providers/google/cloud/operators/kubernetes_engine.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def __init__(
414414
*,
415415
location: str,
416416
cluster_name: str,
417-
use_internal_ip: bool | None = None,
417+
use_internal_ip: bool = False,
418418
project_id: str | None = None,
419419
gcp_conn_id: str = "google_cloud_default",
420420
impersonation_chain: str | Sequence[str] | None = None,
@@ -433,15 +433,6 @@ def __init__(
433433
)
434434
is_delete_operator_pod = False
435435

436-
if use_internal_ip is not None:
437-
warnings.warn(
438-
f"You have set parameter use_internal_ip in class {self.__class__.__name__}. "
439-
"In current implementation of the operator the parameter is not used and will "
440-
"be deleted in future.",
441-
AirflowProviderDeprecationWarning,
442-
stacklevel=2,
443-
)
444-
445436
if regional is not None:
446437
warnings.warn(
447438
f"You have set parameter regional in class {self.__class__.__name__}. "
@@ -457,6 +448,7 @@ def __init__(
457448
self.cluster_name = cluster_name
458449
self.gcp_conn_id = gcp_conn_id
459450
self.impersonation_chain = impersonation_chain
451+
self.use_internal_ip = use_internal_ip
460452

461453
self.pod: V1Pod | None = None
462454
self._ssl_ca_cert: str | None = None
@@ -516,7 +508,10 @@ def fetch_cluster_info(self) -> tuple[str, str | None]:
516508
project_id=self.project_id,
517509
)
518510

519-
self._cluster_url = f"https://{cluster.endpoint}"
511+
if not self.use_internal_ip:
512+
self._cluster_url = f"https://{cluster.endpoint}"
513+
else:
514+
self._cluster_url = f"https://{cluster.private_cluster_config.private_endpoint}"
520515
self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate
521516
return self._cluster_url, self._ssl_ca_cert
522517

tests/providers/google/cloud/operators/test_kubernetes_engine.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
TEMP_FILE = "tempfile.NamedTemporaryFile"
6767
GKE_OP_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator"
6868
CLUSTER_URL = "https://test-host"
69+
CLUSTER_PRIVATE_URL = "https://test-private-host"
6970
SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"
7071

7172

@@ -293,6 +294,31 @@ def test_execute_with_impersonation_service_chain_one_element(
293294

294295
fetch_cluster_info_mock.assert_called_once()
295296

297+
@pytest.mark.parametrize("use_internal_ip", [True, False])
298+
@mock.patch(f"{GKE_HOOK_PATH}.get_cluster")
299+
def test_cluster_info(self, get_cluster_mock, use_internal_ip):
300+
get_cluster_mock.return_value = mock.MagicMock(
301+
**{
302+
"endpoint": "test-host",
303+
"private_cluster_config.private_endpoint": "test-private-host",
304+
"master_auth.cluster_ca_certificate": SSL_CA_CERT,
305+
}
306+
)
307+
gke_op = GKEStartPodOperator(
308+
project_id=TEST_GCP_PROJECT_ID,
309+
location=PROJECT_LOCATION,
310+
cluster_name=CLUSTER_NAME,
311+
task_id=PROJECT_TASK_ID,
312+
name=TASK_NAME,
313+
namespace=NAMESPACE,
314+
image=IMAGE,
315+
use_internal_ip=use_internal_ip,
316+
)
317+
cluster_url, ssl_ca_cert = gke_op.fetch_cluster_info()
318+
319+
assert cluster_url == CLUSTER_PRIVATE_URL if use_internal_ip else CLUSTER_URL
320+
assert ssl_ca_cert == SSL_CA_CERT
321+
296322

297323
class TestGKEPodOperatorAsync:
298324
def setup_method(self):

0 commit comments

Comments
 (0)