@@ -29,6 +29,7 @@ import (
2929 "slices"
3030 "strconv"
3131 "strings"
32+ "sync"
3233 "testing"
3334 "time"
3435
@@ -43,6 +44,7 @@ import (
4344 "google.golang.org/api/option"
4445 "google.golang.org/grpc"
4546 "google.golang.org/grpc/codes"
47+ expgrpc "google.golang.org/grpc/experimental"
4648 "google.golang.org/grpc/mem"
4749 "google.golang.org/grpc/status"
4850 "google.golang.org/protobuf/proto"
@@ -2825,6 +2827,165 @@ func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {
28252827 })
28262828}
28272829
2830+ // Used to test gRPC buffer pool allocs and frees.
2831+ // See https://pkg.go.dev/google.golang.org/grpc/mem
2832+ type testBufferPool struct {
2833+ allocs int64
2834+ frees int64
2835+ sync.Mutex // mutex needed becuase Get/Put can be called in parallel.
2836+ }
2837+
2838+ func (bp * testBufferPool ) Get (length int ) * []byte {
2839+ bp .Lock ()
2840+ bp .allocs += int64 (length )
2841+ bp .Unlock ()
2842+ return mem .DefaultBufferPool ().Get (length )
2843+ }
2844+
2845+ func (bp * testBufferPool ) Put (b * []byte ) {
2846+ if b != nil {
2847+ bp .Lock ()
2848+ bp .frees += int64 (len (* b ))
2849+ bp .Unlock ()
2850+ }
2851+ mem .DefaultBufferPool ().Put (b )
2852+ }
2853+
2854+ func (bp * testBufferPool ) getAllocsAndFrees () (int64 , int64 ) {
2855+ bp .Lock ()
2856+ defer bp .Unlock ()
2857+ return bp .allocs , bp .frees
2858+ }
2859+
2860+ // Test that successful downloads using Reader and MultiRangeDownloader free
2861+ // all of their allocated buffers.
2862+ func TestReadCodecLeaksEmulated (t * testing.T ) {
2863+ checkEmulatorEnvironment (t )
2864+ ctx := context .Background ()
2865+ var bp testBufferPool
2866+ client , err := NewGRPCClient (ctx , option .WithGRPCDialOption (expgrpc .WithBufferPool (& bp )), experimental .WithZonalBucketAPIs ())
2867+ if err != nil {
2868+ t .Fatalf ("NewGRPCClient: %v" , err )
2869+ }
2870+ var (
2871+ contents = randomBytes9MiB
2872+ prefix = time .Now ().Nanosecond ()
2873+ bucketName = fmt .Sprintf ("bucket-%d" , prefix )
2874+ objName = fmt .Sprintf ("%d-object" , prefix )
2875+ bkt = client .Bucket (bucketName )
2876+ obj = bkt .Object (objName )
2877+ )
2878+
2879+ // Upload object.
2880+ if err := bkt .Create (ctx , "project" , nil ); err != nil {
2881+ t .Fatalf ("creating bucket: %v" , err )
2882+ }
2883+ w := obj .NewWriter (ctx )
2884+ if _ , err := io .Copy (w , bytes .NewReader (contents )); err != nil {
2885+ t .Fatalf ("uploading object: %v" , err )
2886+ }
2887+ if err := w .Close (); err != nil {
2888+ t .Fatalf ("closing writer: %v" , err )
2889+ }
2890+ if bp .allocs != bp .frees {
2891+ t .Errorf ("upload: alloc'd bytes %v not equal to freed bytes %v" , bp .allocs , bp .frees )
2892+ }
2893+
2894+ // Test multiple download paths.
2895+ testCases := []struct {
2896+ name string
2897+ downloadFunc func (obj * ObjectHandle ) ([]byte , error )
2898+ }{
2899+ {
2900+ name : "Reader.Read" ,
2901+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
2902+ r , err := obj .NewReader (ctx )
2903+ defer r .Close ()
2904+ if err != nil {
2905+ return nil , err
2906+ }
2907+ gotContents , err := io .ReadAll (r )
2908+ return gotContents , err
2909+ },
2910+ },
2911+ {
2912+ name : "Reader.WriteTo" ,
2913+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
2914+ r , err := obj .NewReader (ctx )
2915+ defer r .Close ()
2916+ if err != nil {
2917+ return nil , err
2918+ }
2919+ buf := bytes .NewBuffer ([]byte {})
2920+ _ , err = r .WriteTo (buf )
2921+ return buf .Bytes (), err
2922+ },
2923+ },
2924+ {
2925+ name : "MultiRangeDownloader 3MiB ranges" ,
2926+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
2927+ mrd , err := obj .NewMultiRangeDownloader (ctx )
2928+ var bufs []* bytes.Buffer
2929+ var currOff int64
2930+ var increment int64 = 3 * MiB
2931+ for range 3 {
2932+ buf := bytes .NewBuffer ([]byte {})
2933+ mrd .Add (buf , currOff , increment , func (int64 , int64 , error ) {})
2934+ bufs = append (bufs , buf )
2935+ currOff += increment
2936+ }
2937+ mrd .Wait ()
2938+ if err := mrd .Close (); err != nil {
2939+ return nil , err
2940+ }
2941+ var b []byte
2942+ for _ , buf := range bufs {
2943+ b = append (b , buf .Bytes ()... )
2944+ }
2945+ return b , err
2946+ }},
2947+ {
2948+ name : "MultiRangeDownloader 256k ranges" ,
2949+ downloadFunc : func (obj * ObjectHandle ) ([]byte , error ) {
2950+ mrd , err := obj .NewMultiRangeDownloader (ctx )
2951+ var bufs []* bytes.Buffer
2952+ var currOff int64
2953+ var increment int64 = 256 * 1024
2954+ for range 36 {
2955+ buf := bytes .NewBuffer ([]byte {})
2956+ mrd .Add (buf , currOff , increment , func (int64 , int64 , error ) {})
2957+ bufs = append (bufs , buf )
2958+ currOff += increment
2959+ }
2960+ mrd .Wait ()
2961+ if err := mrd .Close (); err != nil {
2962+ return nil , err
2963+ }
2964+ var b []byte
2965+ for _ , buf := range bufs {
2966+ b = append (b , buf .Bytes ()... )
2967+ }
2968+ return b , err
2969+ }},
2970+ }
2971+
2972+ for _ , tc := range testCases {
2973+ t .Run (tc .name , func (t * testing.T ) {
2974+ gotContents , err := tc .downloadFunc (obj )
2975+ if err != nil {
2976+ t .Fatalf ("downloading content: %v" , err )
2977+ }
2978+ if ! bytes .Equal (gotContents , contents ) {
2979+ t .Errorf ("downloaded bytes did not match; got %v bytes, want %v" , len (gotContents ), len (contents ))
2980+ }
2981+ allocs , frees := bp .getAllocsAndFrees ()
2982+ if allocs != frees {
2983+ t .Errorf ("download: alloc'd bytes %v not equal to freed bytes %v" , allocs , frees )
2984+ }
2985+ })
2986+ }
2987+ }
2988+
28282989// createRetryTest creates a bucket in the emulator and sets up a test using the
28292990// Retry Test API for the given instructions. This is intended for emulator tests
28302991// of retry behavior that are not covered by conformance tests.
0 commit comments