@@ -83,7 +83,7 @@ void send() throws IOException {
8383 }
8484
8585 verboseLog (
86- "TCP write" ,
86+ "TCP write: transaction id=" + query . getHeader (). getID () ,
8787 channel .socket ().getLocalSocketAddress (),
8888 channel .socket ().getRemoteSocketAddress (),
8989 queryData );
@@ -97,8 +97,13 @@ void send() throws IOException {
9797 buffer .flip ();
9898 while (buffer .hasRemaining ()) {
9999 long n = channel .write (buffer );
100- if (n < 0 ) {
101- throw new EOFException ();
100+ if (n == 0 ) {
101+ throw new EOFException (
102+ "Insufficient room for the data in the underlying output buffer for transaction "
103+ + query .getHeader ().getID ());
104+ } else if (n < queryData .length ) {
105+ throw new EOFException (
106+ "Could not write all data for transaction " + query .getHeader ().getID ());
102107 }
103108 }
104109
@@ -146,7 +151,11 @@ private void handleChannelException(IOException e) {
146151 try {
147152 channel .close ();
148153 } catch (IOException ex ) {
149- log .error ("failed to close channel" , ex );
154+ log .warn (
155+ "Failed to close channel l={}/r={}" ,
156+ entry .getKey ().local ,
157+ entry .getKey ().remote ,
158+ ex );
150159 }
151160 return ;
152161 }
@@ -197,27 +206,35 @@ private void processRead() {
197206 byte [] data = new byte [responseData .limit ()];
198207 System .arraycopy (
199208 responseData .array (), responseData .arrayOffset (), data , 0 , responseData .limit ());
200- verboseLog (
201- "TCP read" ,
202- channel .socket ().getLocalSocketAddress (),
203- channel .socket ().getRemoteSocketAddress (),
204- data );
205209
206210 // The message was shorter than the minimum length to find the transaction, abort
207211 if (data .length < 2 ) {
212+ verboseLog (
213+ "TCP read: response too short for a valid reply, discarding" ,
214+ channel .socket ().getLocalSocketAddress (),
215+ channel .socket ().getRemoteSocketAddress (),
216+ data );
208217 return ;
209218 }
210219
220+ int id = ((data [0 ] & 0xFF ) << 8 ) + (data [1 ] & 0xFF );
221+ verboseLog (
222+ "TCP read: transaction id=" + id ,
223+ channel .socket ().getLocalSocketAddress (),
224+ channel .socket ().getRemoteSocketAddress (),
225+ data );
226+
211227 for (Iterator <Transaction > it = pendingTransactions .iterator (); it .hasNext (); ) {
212228 Transaction t = it .next ();
213- int id = ((data [0 ] & 0xFF ) << 8 ) + (data [1 ] & 0xFF );
214229 int qid = t .query .getHeader ().getID ();
215230 if (id == qid ) {
216231 t .f .complete (data );
217232 it .remove ();
218233 return ;
219234 }
220235 }
236+
237+ log .warn ("Transaction for answer to id {} not found" , id );
221238 }
222239
223240 private void processWrite (SelectionKey key ) {
@@ -256,9 +273,10 @@ static CompletableFuture<byte[]> sendrecv(
256273 channelMap .computeIfAbsent (
257274 new ChannelKey (local , remote ),
258275 key -> {
276+ log .debug ("Opening async channel for l={}/r={}" , local , remote );
277+ SocketChannel c = null ;
259278 try {
260- log .trace ("Opening async channel for l={}/r={}" , local , remote );
261- SocketChannel c = SocketChannel .open ();
279+ c = SocketChannel .open ();
262280 c .configureBlocking (false );
263281 if (local != null ) {
264282 c .bind (local );
@@ -267,13 +285,21 @@ static CompletableFuture<byte[]> sendrecv(
267285 c .connect (remote );
268286 return new ChannelState (c );
269287 } catch (IOException e ) {
288+ if (c != null ) {
289+ try {
290+ c .close ();
291+ } catch (IOException ee ) {
292+ // ignore
293+ }
294+ }
270295 f .completeExceptionally (e );
271296 return null ;
272297 }
273298 });
274299 if (channel != null ) {
275300 log .trace (
276- "Creating transaction for {}/{}" ,
301+ "Creating transaction for id {} ({}/{})" ,
302+ query .getHeader ().getID (),
277303 query .getQuestion ().getName (),
278304 Type .string (query .getQuestion ().getType ()));
279305 Transaction t = new Transaction (query , data , endTime , channel .channel , f );
0 commit comments