3131from google .api_core import gapic_v1
3232from google .cloud .spanner_v1 ._helpers import _make_value_pb
3333from google .cloud .spanner_v1 ._helpers import _merge_query_options
34- from google .cloud .spanner_v1 ._helpers import _metadata_with_prefix
34+ from google .cloud .spanner_v1 ._helpers import (
35+ _metadata_with_prefix ,
36+ _metadata_with_leader_aware_routing ,
37+ )
3538from google .cloud .spanner_v1 ._helpers import _SessionWrapper
3639from google .cloud .spanner_v1 ._opentelemetry_tracing import trace_call
3740from google .cloud .spanner_v1 .streamed import StreamedResultSet
@@ -235,6 +238,10 @@ def read(
235238 database = self ._session ._database
236239 api = database .spanner_api
237240 metadata = _metadata_with_prefix (database .name )
241+ if not self ._read_only and database ._route_to_leader_enabled :
242+ metadata .append (
243+ _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
244+ )
238245
239246 if request_options is None :
240247 request_options = RequestOptions ()
@@ -244,7 +251,7 @@ def read(
244251 if self ._read_only :
245252 # Transaction tags are not supported for read only transactions.
246253 request_options .transaction_tag = None
247- else :
254+ elif self . transaction_tag is not None :
248255 request_options .transaction_tag = self .transaction_tag
249256
250257 request = ReadRequest (
@@ -391,6 +398,10 @@ def execute_sql(
391398
392399 database = self ._session ._database
393400 metadata = _metadata_with_prefix (database .name )
401+ if not self ._read_only and database ._route_to_leader_enabled :
402+ metadata .append (
403+ _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
404+ )
394405
395406 api = database .spanner_api
396407
@@ -406,7 +417,7 @@ def execute_sql(
406417 if self ._read_only :
407418 # Transaction tags are not supported for read only transactions.
408419 request_options .transaction_tag = None
409- else :
420+ elif self . transaction_tag is not None :
410421 request_options .transaction_tag = self .transaction_tag
411422
412423 request = ExecuteSqlRequest (
@@ -527,6 +538,10 @@ def partition_read(
527538 database = self ._session ._database
528539 api = database .spanner_api
529540 metadata = _metadata_with_prefix (database .name )
541+ if database ._route_to_leader_enabled :
542+ metadata .append (
543+ _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
544+ )
530545 transaction = self ._make_txn_selector ()
531546 partition_options = PartitionOptions (
532547 partition_size_bytes = partition_size_bytes , max_partitions = max_partitions
@@ -621,6 +636,10 @@ def partition_query(
621636 database = self ._session ._database
622637 api = database .spanner_api
623638 metadata = _metadata_with_prefix (database .name )
639+ if database ._route_to_leader_enabled :
640+ metadata .append (
641+ _metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
642+ )
624643 transaction = self ._make_txn_selector ()
625644 partition_options = PartitionOptions (
626645 partition_size_bytes = partition_size_bytes , max_partitions = max_partitions
@@ -766,6 +785,10 @@ def begin(self):
766785 database = self ._session ._database
767786 api = database .spanner_api
768787 metadata = _metadata_with_prefix (database .name )
788+ if not self ._read_only and database ._route_to_leader_enabled :
789+ metadata .append (
790+ (_metadata_with_leader_aware_routing (database ._route_to_leader_enabled ))
791+ )
769792 txn_selector = self ._make_txn_selector ()
770793 with trace_call ("CloudSpanner.BeginTransaction" , self ._session ):
771794 response = api .begin_transaction (
0 commit comments