KAFKA-2594 Added InMemoryLRUCacheStore#256
Conversation
|
kafka-trunk-git-pr #586 FAILURE |
|
kafka-trunk-git-pr #593 FAILURE |
|
Looks like there's an occasional test failure in |
|
@guozhangwang, I've updated this PR to involve a single commit that does nothing but add the |
There was a problem hiding this comment.
@rhauch I am wondering if this class could not benefit from a static Builder class. Something along the lines of
InMemoryLRUCacheStore cacheStore = InMemoryLRUCacheStore.Builder.create()
.name(name)
.capacity(capacity)
.context(context)
.keySerDe(keySerializer, keyDeserializer)
.valueSerDe(valueSerializer, valueDeserializer)
.time(time)
.build();
OTOH, the constructor arguments are not the same type (say, String), so no good chances of inverting parameter order or so. Wdyt?
There was a problem hiding this comment.
Yeah, it'd be nice, but KeyValueStore is parameterized, and that makes builders complicated (but not impossible). See the discussion on #255.
|
@rhauch Do you think the new org.apache.kafka.common.cache.LRUCache can be reused here? |
|
@guozhangwang, the new |
d59b2e6 to
7678126
Compare
|
@guozhangwang, rebased this PR. |
|
This PR seems still contains some changes from #255, could you double check? |
|
@guozhangwang, yes, it is currently based upon #255, so after #255 is merged I will rebase this so it is a single commit. |
Added a new KeyValueStore implementation that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store.
|
@guozhangwang: I've rebased and cleaned up this PR, and it now has only one commit that adds the in-memory cache via the new method in the Stores builder. A local build succeeds with this PR. Creating a limited-size store is nearly identical to creating an unlimited-sized in-memory store. The difference is that the interface returned by The default maximum number of entries is |
|
I think this is good since for any in-memory stores, we need some general mechanism to bound the memory usage. LRU is a good starting point to do it, but moving forward we may need to 1) bound the capacity based on bytes instead of number of records (we have seen quite many cases where bounding footprint with number of messages brings ops problems with variable record length), and 2) have customizable evicting methods beyond LRU. |
|
@guozhangwang: I completely agree. Perhaps this is a good start, though. |
|
Yeah just trying to remind ourselves of generalizing it in the future, this patch LGTM. |
…ion (apache#256) Callback handler for Metadata Service to perform LDAP username/password authentication for clients using HTTP Basic authentication method. This uses the Kafka AuthenticateCallbackHandler interface so that it can also be used in future for SASL/PLAIN authentication using LDAP for Kafka clients when custom configs are supported for callback handlers. To avoid external dependencies, password comparison is performed using an extensible interface. Encrypted password verification based on the implementation in Jetty is included in Metadata Service. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Added a new
KeyValueStoreimplementation calledInMemoryLRUCacheStorethat keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existingInMemoryKeyValueStoreandRocksDBKeyValueStoreimplementations. A newKeyValueStoreTestDriverclass simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.This PR depends upon KAFKA-2593 and its PR at #255. Once that PR is merged, I can rebase this PR if desired.
Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
RocksDBKeyValueStoreinitialization was not creating the file system directory if missing.MeteredKeyValueStorewas casting toProcessorContextImplto access theRecordCollector, which prevent usingMeteredKeyValueStoreimplementations in tests where something other thanProcessorContextImplwas used. The fix was to introduce aRecordCollector.Supplierinterface to define thisrecordCollector()method, and changeProcessorContextImplandMockProcessorContextto both implement this interface. Now,MeteredKeyValueStorecan cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.