2828
2929import com .google .api .core .ApiFuture ;
3030import com .google .api .core .ApiFutures ;
31+ import com .google .api .gax .grpc .GrpcCallContext ;
3132import com .google .api .gax .retrying .RetrySettings ;
3233import com .google .api .gax .rpc .AbortedException ;
3334import com .google .api .gax .rpc .ApiException ;
3738import com .google .cloud .storage .Crc32cValue .Crc32cLengthKnown ;
3839import com .google .cloud .storage .Hasher .UncheckedChecksumMismatchException ;
3940import com .google .cloud .storage .Storage .BlobSourceOption ;
41+ import com .google .cloud .storage .StorageDataClient .FastOpenObjectReadSession ;
42+ import com .google .cloud .storage .ZeroCopySupport .DisposableByteString ;
4043import com .google .cloud .storage .it .ChecksummedTestContent ;
4144import com .google .cloud .storage .it .GrpcPlainRequestLoggingInterceptor ;
4245import com .google .cloud .storage .it .GrpcRequestAuditing ;
4952import com .google .common .io .ByteStreams ;
5053import com .google .protobuf .Any ;
5154import com .google .protobuf .ByteString ;
55+ import com .google .protobuf .TextFormat ;
5256import com .google .storage .v2 .BidiReadHandle ;
5357import com .google .storage .v2 .BidiReadObjectError ;
5458import com .google .storage .v2 .BidiReadObjectRedirectedError ;
@@ -986,6 +990,9 @@ public void onNext(BidiReadObjectRequest request) {
986990 ApiFuture <byte []> f3 =
987991 bd .readRange (RangeSpec .of (3 , 3 ), RangeProjectionConfigs .asFutureBytes ());
988992
993+ // make sure the first read succeeded
994+ byte [] actual = TestUtils .await (f1 , 5 , TimeUnit .SECONDS );
995+
989996 // close the "parent"
990997 bd .close ();
991998
@@ -1005,8 +1012,6 @@ public void onNext(BidiReadObjectRequest request) {
10051012 assertThat (readRanges ).isEqualTo (expected );
10061013 },
10071014 () -> {
1008- // make sure the first read succeeded
1009- byte [] actual = TestUtils .await (f1 , 5 , TimeUnit .SECONDS );
10101015 assertThat (ByteString .copyFrom (actual )).isEqualTo (ByteString .copyFromUtf8 ("A" ));
10111016 },
10121017 // make sure the other two pending reads fail
@@ -1305,6 +1310,134 @@ public void onCompleted() {
13051310 }
13061311 }
13071312
1313+ @ Test
1314+ public void gettingSessionFromFastOpenKeepsTheSessionOpenUntilClosed () throws Exception {
1315+ ChecksummedTestContent expected = ChecksummedTestContent .of (ALL_OBJECT_BYTES , 10 , 30 );
1316+
1317+ ChecksummedTestContent content1 = ChecksummedTestContent .of (ALL_OBJECT_BYTES , 10 , 10 );
1318+ ChecksummedTestContent content2 = ChecksummedTestContent .of (ALL_OBJECT_BYTES , 20 , 10 );
1319+ ChecksummedTestContent content3 = ChecksummedTestContent .of (ALL_OBJECT_BYTES , 30 , 10 );
1320+ BidiReadObjectRequest req1 =
1321+ BidiReadObjectRequest .newBuilder ()
1322+ .setReadObjectSpec (
1323+ BidiReadObjectSpec .newBuilder ()
1324+ .setBucket (METADATA .getBucket ())
1325+ .setObject (METADATA .getName ())
1326+ .build ())
1327+ .addReadRanges (getReadRange (1 , 10 , 10 ))
1328+ .build ();
1329+ BidiReadObjectResponse res1 =
1330+ BidiReadObjectResponse .newBuilder ()
1331+ .setMetadata (METADATA )
1332+ .addObjectDataRanges (
1333+ ObjectRangeData .newBuilder ()
1334+ .setReadRange (getReadRange (1 , 10 , content1 ))
1335+ .setChecksummedData (content1 .asChecksummedData ())
1336+ .setRangeEnd (true )
1337+ .build ())
1338+ .build ();
1339+
1340+ BidiReadObjectRequest req2 = read (2 , 20 , 10 );
1341+ BidiReadObjectResponse res2 =
1342+ BidiReadObjectResponse .newBuilder ()
1343+ .addObjectDataRanges (
1344+ ObjectRangeData .newBuilder ()
1345+ .setReadRange (getReadRange (2 , 20 , content2 ))
1346+ .setChecksummedData (content2 .asChecksummedData ())
1347+ .setRangeEnd (true )
1348+ .build ())
1349+ .build ();
1350+ BidiReadObjectRequest req3 = read (3 , 30 , 10 );
1351+ BidiReadObjectResponse res3 =
1352+ BidiReadObjectResponse .newBuilder ()
1353+ .addObjectDataRanges (
1354+ ObjectRangeData .newBuilder ()
1355+ .setReadRange (getReadRange (3 , 30 , content3 ))
1356+ .setChecksummedData (content3 .asChecksummedData ())
1357+ .setRangeEnd (true )
1358+ .build ())
1359+ .build ();
1360+
1361+ System .out .println ("req1 = " + TextFormat .printer ().shortDebugString (req1 ));
1362+ System .out .println ("req2 = " + TextFormat .printer ().shortDebugString (req2 ));
1363+ System .out .println ("req3 = " + TextFormat .printer ().shortDebugString (req3 ));
1364+ ImmutableMap <BidiReadObjectRequest , BidiReadObjectResponse > db =
1365+ ImmutableMap .<BidiReadObjectRequest , BidiReadObjectResponse >builder ()
1366+ .put (req1 , res1 )
1367+ .put (req2 , res2 )
1368+ .put (req3 , res3 )
1369+ .buildOrThrow ();
1370+
1371+ FakeStorage fakeStorage = FakeStorage .from (db );
1372+
1373+ try (FakeServer fakeServer = FakeServer .of (fakeStorage );
1374+ GrpcStorageImpl storage =
1375+ (GrpcStorageImpl )
1376+ fakeServer
1377+ .getGrpcStorageOptions ()
1378+ .toBuilder ()
1379+ .setRetrySettings (RetrySettings .newBuilder ().setMaxAttempts (1 ).build ())
1380+ .build ()
1381+ .getService ()) {
1382+ StorageDataClient dataClient = storage .storageDataClient ;
1383+
1384+ BidiReadObjectRequest req =
1385+ BidiReadObjectRequest .newBuilder ()
1386+ .setReadObjectSpec (
1387+ BidiReadObjectSpec .newBuilder ()
1388+ .setBucket (METADATA .getBucket ())
1389+ .setObject (METADATA .getName ())
1390+ .build ())
1391+ .build ();
1392+
1393+ ApiFuture <FastOpenObjectReadSession <ApiFuture <DisposableByteString >>> future =
1394+ dataClient .fastOpenReadSession (
1395+ req ,
1396+ GrpcCallContext .createDefault (),
1397+ RangeSpec .of (10 , 10 ),
1398+ RangeProjectionConfigs .asFutureByteString ());
1399+
1400+ ByteString bytes = ByteString .empty ();
1401+ Exception caught = null ;
1402+
1403+ try (FastOpenObjectReadSession <ApiFuture <DisposableByteString >> fastOpenChannel =
1404+ future .get (5 , TimeUnit .SECONDS );
1405+ ObjectReadSession session = fastOpenChannel .getSession ()) {
1406+ ApiFuture <DisposableByteString > futureBytes1 = fastOpenChannel .getProjection ();
1407+ try (DisposableByteString disposableByteString = futureBytes1 .get ()) {
1408+ bytes = bytes .concat (disposableByteString .byteString ());
1409+ }
1410+
1411+ ApiFuture <DisposableByteString > futureBytes2 =
1412+ session .readRange (RangeSpec .of (20 , 10 ), RangeProjectionConfigs .asFutureByteString ());
1413+ try (DisposableByteString disposableByteString = futureBytes2 .get ()) {
1414+ bytes = bytes .concat (disposableByteString .byteString ());
1415+ }
1416+
1417+ ApiFuture <DisposableByteString > futureBytes3 =
1418+ session .readRange (RangeSpec .of (30 , 10 ), RangeProjectionConfigs .asFutureByteString ());
1419+ try (DisposableByteString disposableByteString = futureBytes3 .get ()) {
1420+ bytes = bytes .concat (disposableByteString .byteString ());
1421+ }
1422+
1423+ } catch (Exception e ) {
1424+ // stash off any runtime failure so we can still do our assertions to help determine
1425+ // the true failure
1426+ caught = e ;
1427+ } finally {
1428+ final ByteString finalBytes = bytes ;
1429+ final Exception finalCaught = caught ;
1430+ assertAll (
1431+ () -> assertThat (xxd (finalBytes )).isEqualTo (xxd (expected .getBytes ())),
1432+ () -> {
1433+ if (finalCaught != null ) {
1434+ throw new Exception ("exception during test" , finalCaught );
1435+ }
1436+ });
1437+ }
1438+ }
1439+ }
1440+
13081441 private static void runTestAgainstFakeServer (
13091442 FakeStorage fakeStorage , RangeSpec range , ChecksummedTestContent expected ) throws Exception {
13101443
0 commit comments