@@ -262,8 +262,32 @@ public Blob create(
262262 @ Override
263263 public Blob create (BlobInfo blobInfo , InputStream content , BlobWriteOption ... options ) {
264264 try {
265- return createFrom (blobInfo , content , options );
266- } catch (IOException e ) {
265+ requireNonNull (blobInfo , "blobInfo must be non null" );
266+ InputStream inputStreamParam = firstNonNull (content , new ByteArrayInputStream (ZERO_BYTES ));
267+
268+ Opts <ObjectTargetOpt > optsWithDefaults = Opts .unwrap (options ).prepend (defaultOpts );
269+ GrpcCallContext grpcCallContext =
270+ optsWithDefaults .grpcMetadataMapper ().apply (GrpcCallContext .createDefault ());
271+ WriteObjectRequest req = getWriteObjectRequest (blobInfo , optsWithDefaults );
272+ Hasher hasher = Hasher .enabled ();
273+ GrpcCallContext merge = Utils .merge (grpcCallContext , Retrying .newCallContext ());
274+ UnbufferedWritableByteChannelSession <WriteObjectResponse > session =
275+ ResumableMedia .gapic ()
276+ .write ()
277+ .byteChannel (storageClient .writeObjectCallable ().withDefaultCallContext (merge ))
278+ .setByteStringStrategy (ByteStringStrategy .noCopy ())
279+ .setHasher (hasher )
280+ .direct ()
281+ .unbuffered ()
282+ .setRequest (req )
283+ .build ();
284+
285+ try (UnbufferedWritableByteChannel c = session .open ()) {
286+ ByteStreams .copy (Channels .newChannel (inputStreamParam ), c );
287+ }
288+ ApiFuture <WriteObjectResponse > responseApiFuture = session .getResult ();
289+ return this .getBlob (responseApiFuture );
290+ } catch (IOException | ApiException e ) {
267291 throw StorageException .coalesce (e );
268292 }
269293 }
@@ -549,17 +573,20 @@ public boolean delete(String bucket, BucketSourceOption... options) {
549573 DeleteBucketRequest .Builder builder =
550574 DeleteBucketRequest .newBuilder ().setName (bucketNameCodec .encode (bucket ));
551575 DeleteBucketRequest req = opts .deleteBucketsRequest ().apply (builder ).build ();
552- try {
553- GrpcCallContext merge = Utils .merge (grpcCallContext , Retrying .newCallContext ());
554- Retrying .run (
555- getOptions (),
556- retryAlgorithmManager .getFor (req ),
557- () -> storageClient .deleteBucketCallable ().call (req , merge ),
558- Decoder .identity ());
559- return true ;
560- } catch (StorageException e ) {
561- return false ;
562- }
576+ GrpcCallContext merge = Utils .merge (grpcCallContext , Retrying .newCallContext ());
577+ return Boolean .TRUE .equals (
578+ Retrying .run (
579+ getOptions (),
580+ retryAlgorithmManager .getFor (req ),
581+ () -> {
582+ try {
583+ storageClient .deleteBucketCallable ().call (req , merge );
584+ return true ;
585+ } catch (NotFoundException e ) {
586+ return false ;
587+ }
588+ },
589+ Decoder .identity ()));
563590 }
564591
565592 @ Override
@@ -760,11 +787,19 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
760787 opts .grpcMetadataMapper ().apply (GrpcCallContext .createDefault ());
761788 WriteObjectRequest req = getWriteObjectRequest (blobInfo , opts );
762789 Hasher hasher = Hasher .noop ();
790+ // in JSON, the starting of the resumable session happens before the invocation of write can
791+ // happen. Emulate the same thing here.
792+ // 1. create the future
793+ ApiFuture <ResumableWrite > startResumableWrite = startResumableWrite (grpcCallContext , req );
794+ // 2. await the result of the future
795+ ResumableWrite resumableWrite = ApiFutureUtils .await (startResumableWrite );
796+ // 3. wrap the result in another future container before constructing the BlobWriteChannel
797+ ApiFuture <ResumableWrite > wrapped = ApiFutures .immediateFuture (resumableWrite );
763798 return new GrpcBlobWriteChannel (
764799 storageClient .writeObjectCallable (),
765800 getOptions (),
766801 retryAlgorithmManager .idempotent (),
767- () -> startResumableWrite ( grpcCallContext , req ) ,
802+ () -> wrapped ,
768803 hasher );
769804 }
770805
0 commit comments