1515
1616from __future__ import annotations
1717
18- from typing import Sequence
18+ from typing import (
19+ TYPE_CHECKING ,
20+ AsyncGenerator ,
21+ AsyncIterable ,
22+ Awaitable ,
23+ Sequence ,
24+ )
1925
2026from google .cloud .bigtable_v2 .types import ReadRowsRequest as ReadRowsRequestPB
2127from google .cloud .bigtable_v2 .types import ReadRowsResponse as ReadRowsResponsePB
2632from google .cloud .bigtable .data .read_rows_query import ReadRowsQuery
2733from google .cloud .bigtable .data .exceptions import InvalidChunk
2834from google .cloud .bigtable .data .exceptions import _RowSetComplete
29- from google .cloud .bigtable .data .exceptions import _ResetRow
3035from google .cloud .bigtable .data ._helpers import _attempt_timeout_generator
3136from google .cloud .bigtable .data ._helpers import _make_metadata
3237from google .cloud .bigtable .data ._helpers import _retry_exception_factory
3338
3439from google .api_core import retry as retries
3540from google .api_core .retry import exponential_sleep_generator
3641
37- from google .cloud .bigtable .data ._sync .cross_sync import CrossSync
42+ if TYPE_CHECKING :
43+ from google .cloud .bigtable .data ._async .client import TableAsync
44+
45+
46+ class _ResetRow (Exception ):
47+ def __init__ (self , chunk ):
48+ self .chunk = chunk
3849
3950
40- @CrossSync .export_sync (
41- path = "google.cloud.bigtable.data._sync._read_rows._ReadRowsOperation" ,
42- add_mapping_for_name = "_ReadRowsOperation" ,
43- )
4451class _ReadRowsOperationAsync :
4552 """
4653 ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
@@ -75,7 +82,7 @@ class _ReadRowsOperationAsync:
7582 def __init__ (
7683 self ,
7784 query : ReadRowsQuery ,
78- table : "CrossSync.Table " ,
85+ table : "TableAsync " ,
7986 operation_timeout : float ,
8087 attempt_timeout : float ,
8188 retryable_exceptions : Sequence [type [Exception ]] = (),
@@ -101,22 +108,22 @@ def __init__(
101108 self ._last_yielded_row_key : bytes | None = None
102109 self ._remaining_count : int | None = self .request .rows_limit or None
103110
104- def start_operation (self ) -> CrossSync . Iterable [Row ]:
111+ def start_operation (self ) -> AsyncGenerator [Row , None ]:
105112 """
106113 Start the read_rows operation, retrying on retryable errors.
107114
108115 Yields:
109116 Row: The next row in the stream
110117 """
111- return CrossSync . retry_target_stream (
118+ return retries . retry_target_stream_async (
112119 self ._read_rows_attempt ,
113120 self ._predicate ,
114121 exponential_sleep_generator (0.01 , 60 , multiplier = 2 ),
115122 self .operation_timeout ,
116123 exception_factory = _retry_exception_factory ,
117124 )
118125
119- def _read_rows_attempt (self ) -> CrossSync . Iterable [Row ]:
126+ def _read_rows_attempt (self ) -> AsyncGenerator [Row , None ]:
120127 """
121128 Attempt a single read_rows rpc call.
122129 This function is intended to be wrapped by retry logic,
@@ -152,10 +159,9 @@ def _read_rows_attempt(self) -> CrossSync.Iterable[Row]:
152159 chunked_stream = self .chunk_stream (gapic_stream )
153160 return self .merge_rows (chunked_stream )
154161
155- @CrossSync .convert
156162 async def chunk_stream (
157- self , stream : CrossSync . Awaitable [CrossSync . Iterable [ReadRowsResponsePB ]]
158- ) -> CrossSync . Iterable [ReadRowsResponsePB .CellChunk ]:
163+ self , stream : Awaitable [AsyncIterable [ReadRowsResponsePB ]]
164+ ) -> AsyncGenerator [ReadRowsResponsePB .CellChunk , None ]:
159165 """
160166 process chunks out of raw read_rows stream
161167
@@ -164,7 +170,7 @@ async def chunk_stream(
164170 Yields:
165171 ReadRowsResponsePB.CellChunk: the next chunk in the stream
166172 """
167- async for resp in CrossSync . rm_aio ( await stream ) :
173+ async for resp in await stream :
168174 # extract proto from proto-plus wrapper
169175 resp = resp ._pb
170176
@@ -205,12 +211,9 @@ async def chunk_stream(
205211 current_key = None
206212
207213 @staticmethod
208- @CrossSync .convert (
209- replace_symbols = {"__aiter__" : "__iter__" , "__anext__" : "__next__" }
210- )
211214 async def merge_rows (
212- chunks : CrossSync . Iterable [ReadRowsResponsePB .CellChunk ] | None ,
213- ) -> CrossSync . Iterable [Row ]:
215+ chunks : AsyncGenerator [ReadRowsResponsePB .CellChunk , None ] | None
216+ ) -> AsyncGenerator [Row , None ]:
214217 """
215218 Merge chunks into rows
216219
@@ -225,8 +228,8 @@ async def merge_rows(
225228 # For each row
226229 while True :
227230 try :
228- c = CrossSync . rm_aio ( await it .__anext__ () )
229- except CrossSync . StopIteration :
231+ c = await it .__anext__ ()
232+ except StopAsyncIteration :
230233 # stream complete
231234 return
232235 row_key = c .row_key
@@ -274,7 +277,7 @@ async def merge_rows(
274277 buffer = [value ]
275278 while c .value_size > 0 :
276279 # throws when premature end
277- c = CrossSync . rm_aio ( await it .__anext__ () )
280+ c = await it .__anext__ ()
278281
279282 t = c .timestamp_micros
280283 cl = c .labels
@@ -306,7 +309,7 @@ async def merge_rows(
306309 if c .commit_row :
307310 yield Row (row_key , cells )
308311 break
309- c = CrossSync . rm_aio ( await it .__anext__ () )
312+ c = await it .__anext__ ()
310313 except _ResetRow as e :
311314 c = e .chunk
312315 if (
@@ -319,7 +322,7 @@ async def merge_rows(
319322 ):
320323 raise InvalidChunk ("reset row with data" )
321324 continue
322- except CrossSync . StopIteration :
325+ except StopAsyncIteration :
323326 raise InvalidChunk ("premature end of stream" )
324327
325328 @staticmethod
0 commit comments