Skip to content

KAFKA-13785: [2/N][emit final] add processor metadata to be committed with offset#11829

Merged
mjsax merged 5 commits into
apache:trunkfrom
lihaosky:meta-persist
Mar 31, 2022
Merged

KAFKA-13785: [2/N][emit final] add processor metadata to be committed with offset#11829
mjsax merged 5 commits into
apache:trunkfrom
lihaosky:meta-persist

Conversation

@lihaosky

@lihaosky lihaosky commented Mar 2, 2022

Copy link
Copy Markdown
Contributor

Add processor metadata to be committed with offset to broker. Adding this so that we can store last processed window time gracefully.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lihaosky lihaosky changed the title [RFC][3/N] add processor metadata to be committed with offset [RFC][2/N][emit final] add processor metadata to be committed with offset Mar 3, 2022

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should make this public API (and add to the KIP?)

Otherwise, we need to case to an internal interface in the implementation? If we think this cast is ok, and we don't want to expand the scope of the KIP, I am also fine with it. Just wondering... We could also expose it as public API later on, but I think we should design it in a way that will allow us to expose it as public API later without major changes?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting/indention

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I can follow? Why is this a key-value pair? And why are the types <Bytes, byte[]>?

The OffsetAndMetadata interface takes a String metadata argument. That is why we encode the current streamTime we store as metadata as String encoded using base64.

If we assume that there might be multiple processors inside a task, I understand that we need a mapping from processor name to metadata, but the map should be <String,String> (or maybe <String,Long>)?

And we would encode it as concatenation of <streamTime><numberOfEntries><key,value><key,value> with key being the processor name, and value being the encoded timestamp (if we use Long in the map, we convert the long to base64 internally?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a key-value pair

My original intention is to make it flexible so that even different processor can commit/store multiple key/value pairs. So the key/value could be anything the processor choose. Now I think maybe we could also add a namespace to prevent collision of key from different processor. Namespace can be processor name.

And why are the types <Bytes, byte[]>

Why not <String, Long>? I think Long is not flexible enough. For emit final use case, Long is fine.
Why not <String, String>? I think it's more steps for processor to serialize it to String in some cases. e.g. For Long, processor needs to serialize it to byte[] and then String? So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be pre-mature optimization to make it generic. I would stick to just what we need. That is also why I think <String,Long> might just be good enough as we intend to only use it for "emit final".

So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?

My point is, that we should avoid to change the format/serialization twice, but only once. Either we accept long and let the runtime do long -> String directly. Or we just use String and the runtime has nothing to do but to concatenate string, and the processor needs to do the long -> String conversion. In both cases it's a single translation step. Introducing byte[] add a second translation step: what would be the advantage of having two steps instead of one?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we pass in TopicPartition? Tasks are per partition already?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. TopicPartition can be dropped.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of merge ? It seems it would be sufficient for a processor to just set it's own metadata?

The runtime can actually add the name of the processor internally?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For globalMetadata, I'm thinking this should be committed to all TopicPartition. We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be committed to all TopicPartition

Sound like we would commit it redundantly? Why store the same metadata for multiple partitions?

We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?

Not sure if I can follow. If the commit fails, we would fall back to the previous offset including the previous metadata, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in StreamTask line 1107 for serializing this object to byte[] before put into OffsetAndMetadata

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to add a proper POJO maybe StreamsMetadata or something that wraps the streamTime Long plus ProcessorMetadata instead of using KeyValue ? We might add new fields later on what is easier to do for a new POJO.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this will be removed as it's replace by decodeTimestampAndMeta that can handle old and new format?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

@lihaosky lihaosky left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should make this public API (and add to the KIP?)
Otherwise, we need to case to an internal interface in the implementation? If we think this cast is ok, and we don't want to expand the scope of the KIP, I am also fine with it. Just wondering... We could also expose it as public API later on, but I think we should design it in a way that will allow us to expose it as public API later without major changes?

I'm thinking of creating internal interface to reduce the scope of the KIP. Yeah, we should design it in a way for easy public API support

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a key-value pair

My original intention is to make it flexible so that even different processor can commit/store multiple key/value pairs. So the key/value could be anything the processor choose. Now I think maybe we could also add a namespace to prevent collision of key from different processor. Namespace can be processor name.

And why are the types <Bytes, byte[]>

Why not <String, Long>? I think Long is not flexible enough. For emit final use case, Long is fine.
Why not <String, String>? I think it's more steps for processor to serialize it to String in some cases. e.g. For Long, processor needs to serialize it to byte[] and then String? So why not pass byte[] directly and serialize them finally to String for OffsetAndMetadata?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in StreamTask line 1107 for serializing this object to byte[] before put into OffsetAndMetadata

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. TopicPartition can be dropped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For globalMetadata, I'm thinking this should be committed to all TopicPartition. We need to merge the metadata from all TopicPartition to get correct globalMetadata in case some commits fail?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

@lihaosky

Copy link
Copy Markdown
Contributor Author

I plan to make changes. Don't review

@lihaosky lihaosky changed the title [RFC][2/N][emit final] add processor metadata to be committed with offset [2/N][emit final] add processor metadata to be committed with offset Mar 29, 2022
}

@Override
public void setProcessorMetadata(final ProcessorMetadata metadata) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both "per key" and "full" getters/setters -- could we limit it to only one of both?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in restoration to set processor metadata as a whole after deserialization and merging. The per key setter is used by processor. Maybe we can get rid of getProcessorMetadata if it's not used.

final long partitionTime = partitionTimes.get(partition);
committableOffsets.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
committableOffsets.put(partition, new OffsetAndMetadata(offset,
TopicPartitionMetadata.with(partitionTime, processorContext.getProcessorMetadata()).encode()));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we need to guard this write and make a case decision during rolling upgrade -- it might only be save to use the new format after all instances are upgrade? Or do we consider using the new "emit final" feature a "breaking change" anyway and existing apps can only use it after a reset anyway?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing windowed aggregation not using "emit final" can't upgrade to it since store format is different. Yes, it's a breaking changing if user wants to use it.

public void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
super.updateInputPartitions(topicPartitions, allTopologyNodesToSourceTopics);
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
processorContext.getProcessorMetadata().setNeedsCommit(true);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is setting the commit flag enough? Or would we need to commit the current metadata into the new partition(s) before starting to process any data? Atm, it seems we would only commit to the new partitions on the first commit after data was processed. Wondering if this is a race condition we need to close?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If new partitions are added, I think it's ok to not commit immediately since old partitions still have original metadata and they can be fetched/merged.

If there's partition deletion and system crashed before writing to new partitions, metadata will be gone. But I'm not sure if we need to handle that since user deleted old partitions containing metadata.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it might be a rare case. If we think it's not worth it and the risk to lose the metadata is tiny (and it's complicated to force a commit right away) I am fine with not closing this race condition.

buffer.put(LATEST_MAGIC_BYTE);
buffer.putLong(partitionTime);
buffer.put(serializedMeta);
return Base64.getEncoder().encodeToString(buffer.array());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still wondering, why we encode processorMetadata first as byte[] and convert it to String using Base64 here? Why not convert the Map to String directly`? The key is already String, so we would only use Base64 to convert the Long-values to String and concatenate all kv-pairs in some format (eg, CSV?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is after converting single partitionTime to String, the length is not deterministic and we need other meta to encode it. Also I don't want to use Base64 multiple times.

@mjsax mjsax added streams kip Requires or implements a KIP labels Mar 31, 2022

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We just need a Jira for KIP-825 before we can merge this PR.

@lihaosky

Copy link
Copy Markdown
Contributor Author

Thanks @mjsax ! Created Jira: https://issues.apache.org/jira/browse/KAFKA-13785

@lihaosky lihaosky changed the title [2/N][emit final] add processor metadata to be committed with offset KAFKA-13785: [2/N][emit final] add processor metadata to be committed with offset Mar 31, 2022
@mjsax mjsax merged commit 6b2a0bc into apache:trunk Mar 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants