Skip to content

KAFKA-5172: Fix fetchPrevious to find the correct session.#2972

Closed
ghost wants to merge 4 commits into
apache:trunkfrom
kyle-winkelman:CachingSessionStore-fetchPrevious
Closed

KAFKA-5172: Fix fetchPrevious to find the correct session.#2972
ghost wants to merge 4 commits into
apache:trunkfrom
kyle-winkelman:CachingSessionStore-fetchPrevious

Conversation

@ghost

@ghost ghost commented May 4, 2017

Copy link
Copy Markdown

Change fetchPrevious to use findSessions with the proper key and timestamps rather than using fetch.

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3463/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3457/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3454/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang

Copy link
Copy Markdown
Contributor

@dguy could you take a look?

@dguy dguy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KyleWinkelman thanks for the PR. Can you please add a test to CachingSessionStoreTest that fails without this change?

@ghost

ghost commented May 4, 2017

Copy link
Copy Markdown
Author

@dguy I have added a test case that fails without this change.

@ghost ghost changed the title [KAFKA-5172] Fix fetchPrevious to find the correct session. KAFKA-5172: Fix fetchPrevious to find the correct session. May 4, 2017
@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3477/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3474/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3483/
Test FAILed (JDK 8 and Scala 2.11).

@dguy dguy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the test @KyleWinkelman left a couple of comments

*/
package org.apache.kafka.streams.state.internals;

import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you re-order the input back to what they where before?

@Test
public void shouldPutAndMaybeForward() throws Exception {
final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(0, 1));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to add the extra class just for this case. We could do something like:

final List<Change<Long>> values = new ArrayList<>();
        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, Long>() {
            @Override
            public void apply(final Windowed<String> key, final Long newValue, final Long oldValue) {
                values.add(new Change<>(newValue, oldValue));
            }
        });
       ...
      assertThat(values, equalTo(Arrays.asList(new Change<>(1L, null), new Change<>(null, 1L), new Change<>(2L, null))));

assertFalse(results.hasNext());
}

@Test

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe something like: shouldForwardChangedValuesDuringFlush ?

import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;

public class MockCacheFlushListener<K, V> implements CacheFlushListener<K, V> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, i don't think we need this class

@ghost

ghost commented May 4, 2017

Copy link
Copy Markdown
Author

I made the recommended changes and altered the contents of the test a little so we could see a case where the new and old values are set.
Thanks for the comments.

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3495/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3486/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot

asfbot commented May 4, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3489/
Test FAILed (JDK 7 and Scala 2.10).

@ghost

ghost commented May 4, 2017

Copy link
Copy Markdown
Author

@dguy Do you think we should prevent the Bytes.wrap(serdes.rawKey(key.key())) from being called twice (once in putAndMaybeForward and once in fetchPrevious)? I don't know how costly this is but it could save a little bit of processing. I think we could call it once and change the fetchPrevious to accept Bytes and Window.

    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
        final Bytes binaryKey = entry.key();
        final RecordContext current = context.recordContext();
        context.setRecordContext(entry.recordContext());
        try {
            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
            **final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));**
            if (flushListener != null) {
                final AGG newValue = serdes.valueFrom(entry.newValue());
                final AGG oldValue = fetchPrevious(rawKey, key.window());
                if (!(newValue == null && oldValue == null)) {
                    flushListener.apply(key, newValue, oldValue);
                }
            }
            bytesStore.put(new Windowed<>(**rawKey**, key.window()), entry.newValue());
        } finally {
            context.setRecordContext(current);
        }
    }
    private AGG fetchPrevious(**final Bytes rawKey, final Window window**) {
        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore
                .findSessions(**rawKey**, window.start(), window.end())) {
            if (!iterator.hasNext()) {
                return null;
            }
            return serdes.valueFrom(iterator.next().value);
        }
    }

@dguy

dguy commented May 5, 2017

Copy link
Copy Markdown
Contributor

@KyleWinkelman yes that makes sense. Thanks!

@ghost

ghost commented May 5, 2017

Copy link
Copy Markdown
Author

Ok made the changes. Anything else required of me for this PR?

@asfbot

asfbot commented May 5, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3537/
Test FAILed (JDK 8 and Scala 2.12).

@dguy dguy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @KyleWinkelman LGTM

@asfbot

asfbot commented May 5, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3546/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented May 5, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3540/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang

Copy link
Copy Markdown
Contributor

retest this please

@asfbot

asfbot commented May 6, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3583/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented May 6, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3577/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

asfbot commented May 6, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3573/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang

Copy link
Copy Markdown
Contributor

retest this please

@asfbot

asfbot commented May 7, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3620/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot

asfbot commented May 7, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3614/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

asfbot commented May 7, 2017

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3611/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang guozhangwang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Merged to trunk.

@asfgit asfgit closed this in e472ee7 May 8, 2017
@ghost ghost deleted the CachingSessionStore-fetchPrevious branch May 15, 2017 13:19
guozhangwang added a commit that referenced this pull request Jan 31, 2019
…for flushing / getter (#6161)

#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.

I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…for flushing / getter (apache#6161)

apache#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.

I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants