When going through the simple query protocol the Flux operator discardOnCancel (https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/PostgresqlStatement.java#L248) does not seem to be propagated to the ReactorNettyClient.Conversation sink.
When a query is canceled, the Operators#discardOnCancel operator is supposed to discard the cancel event and send a very large Subscription#request to the subscriptions (see https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/util/FluxDiscardOnCancel.java#L125). But the FluxSink#onRequest is never called (see https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java#L710) when FluxDiscardOnCancel.FluxDiscardOnCancelSubscriber#cancel is called. This leads to a dead ReactorNettyClient.Conversation since it is not canceled and its demand is exhausted.
Since this ReactorNettyClient.Conversation is able to handle the cancel state, I removed the Flux operator discardOnCancel from the PostgresqlStatement. Then everything worked as the ReactorNettyClient.Conversation was able to go through until the end even if the sink was canceled.
Do we still need this discardOnCancel operator here? If so, why is the large request not propagated to the ReactorNettyClient.Conversation?
When going through the simple query protocol the
FluxoperatordiscardOnCancel(https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/PostgresqlStatement.java#L248) does not seem to be propagated to theReactorNettyClient.Conversationsink.When a query is canceled, the
Operators#discardOnCanceloperator is supposed to discard the cancel event and send a very largeSubscription#requestto the subscriptions (see https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/util/FluxDiscardOnCancel.java#L125). But theFluxSink#onRequestis never called (see https://github.com/pgjdbc/r2dbc-postgresql/blob/main/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java#L710) whenFluxDiscardOnCancel.FluxDiscardOnCancelSubscriber#cancelis called. This leads to a deadReactorNettyClient.Conversationsince it is not canceled and its demand is exhausted.Since this
ReactorNettyClient.Conversationis able to handle the cancel state, I removed theFluxoperatordiscardOnCancelfrom thePostgresqlStatement. Then everything worked as theReactorNettyClient.Conversationwas able to go through until the end even if the sink was canceled.Do we still need this
discardOnCanceloperator here? If so, why is the large request not propagated to theReactorNettyClient.Conversation?