-
Notifications
You must be signed in to change notification settings - Fork 4.1k
changefeedccl: emittedmessages metric resolved messages semantics aren't consistent #111043
Copy link
Copy link
Open
Labels
A-cdcChange Data CaptureChange Data CaptureC-bugCode not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.E-easyEasy issue to tackle, requires little or no CockroachDB experienceEasy issue to tackle, requires little or no CockroachDB experienceT-cdc
Description
Some sinks will enqueue resolved messages, but still increment the emittedmessages metric. For example, see
cockroach/pkg/ccl/changefeedccl/sink_kafka.go
Lines 359 to 408 in d1c1727
| // EmitResolvedTimestamp implements the Sink interface. | |
| func (s *kafkaSink) EmitResolvedTimestamp( | |
| ctx context.Context, encoder Encoder, resolved hlc.Timestamp, | |
| ) error { | |
| defer s.metrics.recordResolvedCallback()() | |
| // Periodically ping sarama to refresh its metadata. This means talking to | |
| // zookeeper, so it shouldn't be done too often, but beyond that this | |
| // constant was picked pretty arbitrarily. | |
| // | |
| // TODO(dan): Add a test for this. We can't right now (2018-11-13) because | |
| // we'd need to bump sarama, but that's a bad idea while we're still | |
| // actively working on stability. At the same time, revisit this tuning. | |
| const metadataRefreshMinDuration = time.Minute | |
| if timeutil.Since(s.lastMetadataRefresh) > metadataRefreshMinDuration { | |
| if err := s.client.RefreshMetadata(s.topics.DisplayNamesSlice()...); err != nil { | |
| return err | |
| } | |
| s.lastMetadataRefresh = timeutil.Now() | |
| } | |
| return s.topics.Each(func(topic string) error { | |
| payload, err := encoder.EncodeResolvedTimestamp(ctx, topic, resolved) | |
| if err != nil { | |
| return err | |
| } | |
| s.scratch, payload = s.scratch.Copy(payload, 0 /* extraCap */) | |
| // sarama caches this, which is why we have to periodically refresh the | |
| // metadata above. Staleness here does not impact correctness. Some new | |
| // partitions will miss this resolved timestamp, but they'll eventually | |
| // be picked up and get later ones. | |
| partitions, err := s.client.Partitions(topic) | |
| if err != nil { | |
| return err | |
| } | |
| for _, partition := range partitions { | |
| msg := &sarama.ProducerMessage{ | |
| Topic: topic, | |
| Partition: partition, | |
| Key: nil, | |
| Value: sarama.ByteEncoder(payload), | |
| } | |
| if err := s.emitMessage(ctx, msg); err != nil { | |
| return err | |
| } | |
| } | |
| return nil | |
| }) | |
| } |
This is inconsistent behavior because some sinks will only increment the metric when the resolved message is successfully sent.
Jira issue: CRDB-31739
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
A-cdcChange Data CaptureChange Data CaptureC-bugCode not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.Code not up to spec/doc, specs & docs deemed correct. Solution expected to change code/behavior.E-easyEasy issue to tackle, requires little or no CockroachDB experienceEasy issue to tackle, requires little or no CockroachDB experienceT-cdc