@@ -86,7 +86,7 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Check
8686 }
8787 }
8888
89- public static ValueTask < ImmutableArray < ( Checksum , object ) > > ReadDataAsync ( PipeReader pipeReader , int scopeId , ISet < Checksum > checksums , ISerializerService serializerService , CancellationToken cancellationToken )
89+ public static async ValueTask < ImmutableArray < ( Checksum , object ) > > ReadDataAsync ( PipeReader pipeReader , int scopeId , ISet < Checksum > checksums , ISerializerService serializerService , CancellationToken cancellationToken )
9090 {
9191 // Workaround for ObjectReader not supporting async reading.
9292 // Unless we read from the RPC stream asynchronously and with cancallation support we might hang when the server cancels.
@@ -98,7 +98,7 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Check
9898 Exception ? exception = null ;
9999
100100 // start a task on a thread pool thread copying from the RPC pipe to a local pipe:
101- Task . Run ( async ( ) =>
101+ var copyTask = Task . Run ( async ( ) =>
102102 {
103103 try
104104 {
@@ -113,20 +113,26 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Check
113113 await localPipe . Writer . CompleteAsync ( exception ) . ConfigureAwait ( false ) ;
114114 await pipeReader . CompleteAsync ( exception ) . ConfigureAwait ( false ) ;
115115 }
116- } , cancellationToken ) . Forget ( ) ;
116+ } , cancellationToken ) ;
117117
118118 // blocking read from the local pipe on the current thread:
119119 try
120120 {
121121 using var stream = localPipe . Reader . AsStream ( leaveOpen : false ) ;
122- return new ( ReadData ( stream , scopeId , checksums , serializerService , cancellationToken ) ) ;
122+ return ReadData ( stream , scopeId , checksums , serializerService , cancellationToken ) ;
123123 }
124124 catch ( EndOfStreamException )
125125 {
126126 cancellationToken . ThrowIfCancellationRequested ( ) ;
127127
128128 throw exception ?? ExceptionUtilities . Unreachable ;
129129 }
130+ finally
131+ {
132+ // Make sure to complete the copy and pipes before returning, otherwise the caller could complete the
133+ // reader and/or writer while they are still in use.
134+ await copyTask . ConfigureAwait ( false ) ;
135+ }
130136 }
131137
132138 public static ImmutableArray < ( Checksum , object ) > ReadData ( Stream stream , int scopeId , ISet < Checksum > checksums , ISerializerService serializerService , CancellationToken cancellationToken )
0 commit comments