Skip to content

changefeedccl: emittedmessages metric resolved messages semantics aren't consistent #111043

@jayshrivastava

Description

@jayshrivastava

Some sinks will enqueue resolved messages, but still increment the emittedmessages metric. For example, see

// EmitResolvedTimestamp implements the Sink interface.
func (s *kafkaSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
defer s.metrics.recordResolvedCallback()()
// Periodically ping sarama to refresh its metadata. This means talking to
// zookeeper, so it shouldn't be done too often, but beyond that this
// constant was picked pretty arbitrarily.
//
// TODO(dan): Add a test for this. We can't right now (2018-11-13) because
// we'd need to bump sarama, but that's a bad idea while we're still
// actively working on stability. At the same time, revisit this tuning.
const metadataRefreshMinDuration = time.Minute
if timeutil.Since(s.lastMetadataRefresh) > metadataRefreshMinDuration {
if err := s.client.RefreshMetadata(s.topics.DisplayNamesSlice()...); err != nil {
return err
}
s.lastMetadataRefresh = timeutil.Now()
}
return s.topics.Each(func(topic string) error {
payload, err := encoder.EncodeResolvedTimestamp(ctx, topic, resolved)
if err != nil {
return err
}
s.scratch, payload = s.scratch.Copy(payload, 0 /* extraCap */)
// sarama caches this, which is why we have to periodically refresh the
// metadata above. Staleness here does not impact correctness. Some new
// partitions will miss this resolved timestamp, but they'll eventually
// be picked up and get later ones.
partitions, err := s.client.Partitions(topic)
if err != nil {
return err
}
for _, partition := range partitions {
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Key: nil,
Value: sarama.ByteEncoder(payload),
}
if err := s.emitMessage(ctx, msg); err != nil {
return err
}
}
return nil
})
}
.

This is inconsistent behavior because some sinks will only increment the metric when the resolved message is successfully sent.

Jira issue: CRDB-31739

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-cdcChange Data CaptureC-bugCode not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.E-easyEasy issue to tackle, requires little or no CockroachDB experienceT-cdc

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions