Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
Expand Down Expand Up @@ -56,7 +57,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
this.keySchema = new SessionKeySchema();
}

@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
bytesStore.init(context, root);
Expand Down Expand Up @@ -128,21 +128,23 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern
context.setRecordContext(entry.recordContext());
try {
final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
if (flushListener != null) {
final AGG newValue = serdes.valueFrom(entry.newValue());
final AGG oldValue = fetchPrevious(binaryKey);
final AGG oldValue = fetchPrevious(rawKey, key.window());
if (!(newValue == null && oldValue == null)) {
flushListener.apply(key, newValue, oldValue);
}
}
bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()), entry.newValue());
bytesStore.put(new Windowed<>(rawKey, key.window()), entry.newValue());
} finally {
context.setRecordContext(current);
}
}

private AGG fetchPrevious(final Bytes key) {
try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.fetch(key)) {
private AGG fetchPrevious(final Bytes rawKey, final Window window) {
try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore
.findSessions(rawKey, window.start(), window.end())) {
if (!iterator.hasNext()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
Expand Down Expand Up @@ -161,6 +163,29 @@ public void shouldFetchCorrectlyAcrossSegments() throws Exception {
assertFalse(results.hasNext());
}

@Test
public void shouldForwardChangedValuesDuringFlush() throws Exception {
final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0));
final List<KeyValue<Windowed<String>, Change<Long>>> flushed = new ArrayList<>();
cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long newValue, final Long oldValue) {
flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
}
});

cachingStore.put(a, 1L);
cachingStore.flush();

cachingStore.put(a, 2L);
cachingStore.flush();

cachingStore.remove(a);
cachingStore.flush();

assertEquals(flushed, Arrays.asList(KeyValue.pair(a, new Change<>(1L, null)), KeyValue.pair(a, new Change<>(2L, 1L)), KeyValue.pair(a, new Change<>(null, 2L))));
}

@Test
public void shouldClearNamespaceCacheOnClose() throws Exception {
final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
Expand Down