1919import static com .google .common .base .Preconditions .checkArgument ;
2020
2121import com .google .api .core .ApiFuture ;
22+ import com .google .api .core .ApiFutures ;
2223import com .google .api .gax .grpc .GrpcCallContext ;
2324import com .google .cloud .storage .GrpcUtils .ZeroCopyBidiStreamingCallable ;
25+ import com .google .cloud .storage .ObjectReadSessionState .OpenArguments ;
2426import com .google .cloud .storage .RetryContext .RetryContextProvider ;
2527import com .google .storage .v2 .BidiReadObjectRequest ;
2628import com .google .storage .v2 .BidiReadObjectResponse ;
3032final class StorageDataClient implements IOAutoCloseable {
3133
3234 private final ScheduledExecutorService executor ;
33- private final ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > read ;
35+ private final ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse >
36+ bidiReadObject ;
3437 private final RetryContextProvider retryContextProvider ;
3538 private final IOAutoCloseable onClose ;
3639
3740 private StorageDataClient (
3841 ScheduledExecutorService executor ,
39- ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > read ,
42+ ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > bidiReadObject ,
4043 RetryContextProvider retryContextProvider ,
4144 IOAutoCloseable onClose ) {
4245 this .executor = executor ;
43- this .read = read ;
46+ this .bidiReadObject = bidiReadObject ;
4447 this .retryContextProvider = retryContextProvider ;
4548 this .onClose = onClose ;
4649 }
4750
4851 ApiFuture <ObjectReadSession > readSession (BidiReadObjectRequest req , GrpcCallContext ctx ) {
4952 checkArgument (
5053 req .getReadRangesList ().isEmpty (),
51- "ranged included in the initial request are not supported" );
52- return ObjectReadSessionImpl .create (req , ctx , read , executor , retryContextProvider );
54+ "ranges included in the initial request are not supported" );
55+ ObjectReadSessionState state = new ObjectReadSessionState (ctx , req );
56+
57+ ObjectReadSessionStream stream =
58+ ObjectReadSessionStream .create (
59+ executor , bidiReadObject , state , retryContextProvider .create ());
60+
61+ ApiFuture <ObjectReadSession > objectReadSessionFuture =
62+ ApiFutures .transform (
63+ stream ,
64+ nowOpen ->
65+ new ObjectReadSessionImpl (
66+ executor , bidiReadObject , stream , state , retryContextProvider ),
67+ executor );
68+ stream .send (req );
69+ return objectReadSessionFuture ;
70+ }
71+
72+ <Projection > ApiFuture <FastOpenObjectReadSession <Projection >> fastOpenReadSession (
73+ BidiReadObjectRequest openRequest ,
74+ GrpcCallContext ctx ,
75+ RangeSpec range ,
76+ RangeProjectionConfig <Projection > config ) {
77+ checkArgument (
78+ openRequest .getReadRangesList ().isEmpty (),
79+ "ranges included in the initial request are not supported" );
80+ ObjectReadSessionState state = new ObjectReadSessionState (ctx , openRequest );
81+
82+ ObjectReadSessionStream stream =
83+ ObjectReadSessionStream .create (
84+ executor , bidiReadObject , state , retryContextProvider .create ());
85+
86+ long readId = state .newReadId ();
87+ ObjectReadSessionStreamRead <Projection , ?> read =
88+ config .cast ().newRead (readId , range , retryContextProvider .create ());
89+ state .putOutstandingRead (readId , read );
90+
91+ ApiFuture <FastOpenObjectReadSession <Projection >> objectReadSessionFuture =
92+ ApiFutures .transform (
93+ stream ,
94+ nowOpen ->
95+ new FastOpenObjectReadSession <>(
96+ new ObjectReadSessionImpl (
97+ executor , bidiReadObject , stream , state , retryContextProvider ),
98+ read ),
99+ executor );
100+ OpenArguments openArguments = state .getOpenArguments ();
101+ BidiReadObjectRequest req = openArguments .getReq ();
102+ stream .send (req );
103+ read .setOnCloseCallback (stream );
104+ return objectReadSessionFuture ;
53105 }
54106
55107 @ Override
@@ -67,4 +119,41 @@ static StorageDataClient create(
67119 IOAutoCloseable onClose ) {
68120 return new StorageDataClient (executor , read , retryContextProvider , onClose );
69121 }
122+
123+ static final class FastOpenObjectReadSession <Projection > implements IOAutoCloseable {
124+ private final ObjectReadSession session ;
125+ private final ObjectReadSessionStreamRead <Projection , ?> read ;
126+
127+ private FastOpenObjectReadSession (
128+ ObjectReadSession session , ObjectReadSessionStreamRead <Projection , ?> read ) {
129+ this .session = session ;
130+ this .read = read ;
131+ }
132+
133+ ObjectReadSession getSession () {
134+ return session ;
135+ }
136+
137+ ObjectReadSessionStreamRead <Projection , ?> getRead () {
138+ return read ;
139+ }
140+
141+ Projection getProjection () {
142+ return read .project ();
143+ }
144+
145+ @ Override
146+ public void close () throws IOException {
147+ //noinspection EmptyTryBlock
148+ try (IOAutoCloseable ignore1 = session ;
149+ IOAutoCloseable ignore2 = read ) {
150+ // use try-with to ensure full cleanup
151+ }
152+ }
153+
154+ public static <Projection > FastOpenObjectReadSession <Projection > of (
155+ ObjectReadSession session , ObjectReadSessionStreamRead <Projection , ?> read ) {
156+ return new FastOpenObjectReadSession <>(session , read );
157+ }
158+ }
70159}
0 commit comments