KAFKA-10832: Fix Log to use the correct ProducerStateManager instance when updating producers#9718
Conversation
|
cc @dhruvilshah3 @junrao @gardnervickers for review |
|
Good catch. This looks a bit error prone. Should we be moving some of these methods to the companion object or something? |
|
@ijuma Good idea. I can move |
|
|
||
| // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be | ||
| // free of all side-effects, i.e. it must not update any log-specific state. | ||
| private def rebuildProducerState(log: Log, |
There was a problem hiding this comment.
This seems too large as a companion method. I am also not sure that we should pass along Log to this method since it exposes all internal states in Log, which make it harder to track usage.
There was a problem hiding this comment.
That is true. Do you feel the original approach in f0466b2 was better, where, we didn't move methods into a companion object? I'd be happy to revert to that.
There was a problem hiding this comment.
We can probably keep the two other two simpler methods (without Log as the input) as the companion methods.
eb9a449 to
31f95a0
Compare
31f95a0 to
5d798b7
Compare
| private def isLogFile(file: File): Boolean = | ||
| file.getPath.endsWith(LogFileSuffix) | ||
|
|
||
| private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { |
There was a problem hiding this comment.
Perhaps this could be name loadProducersFromRecords?
I have fixed what looked like a potential bug to me.
Bug:
The bug is that from within
Log.updateProducers(…), the code operates on theproducerStateManagerattribute of theLoginstance instead of operating on an input parameter. Please see this LOC where it callsproducerStateManager.prepareUpdatethus accessing the attribute from theLogobject (see this). This looks unusual particularly forLog.loadProducersFromLog(...)path. Here I believe we should be using the instance passed to the method, rather than the attribute from theLoginstance. I have fixed the same in this PR.I'm not sure (yet) if this bug has any drastic consequences (probably not), but it is still worthwhile making things consistent.
Fix:
The fix is to explicitly pass the
ProducerStateManagerintoLog.updateProducersand move the following methods:Log.loadProducersFromLogandLog.updateProducersinto theLogcompanion object.Tests:
Rely on existing tests.