Conversation
|
Just some feedback as somebody who tried tsdb-gw via rpm after this similar update. I think FPM should be set to have a depends on librdkafka, there is a no librdkafka in any of the el6/amzn1 repos however there is a nice spec file out there already for users to build it themselves. How metrictank will probably behave after merge: Users can build the dependant rpm here: Suggested update file: https://github.com/grafana/metrictank/blob/master/scripts/build_packages.sh I'm not sure how to handle it but this is a breaking change for anybody upgrading rpm on el6/amzn1. el6 is EOL in 2020. |
| defer c.wg.Done() | ||
|
|
||
| var ok bool | ||
| var offsetPtr *int64 |
There was a problem hiding this comment.
offsetPtr is being updated on every msg, but then nothing is done with it.
There was a problem hiding this comment.
this value is read by the monitorLag() method: https://github.com/grafana/metrictank/pull/879/files#diff-57960cc1ee87fab707306aab51440a91R251
| } | ||
|
|
||
| c.conf.MessageHandler(e.Value, tp.Partition) | ||
| atomic.StoreInt64(offsetPtr, int64(tp.Offset)) |
There was a problem hiding this comment.
looks like this should be
atomic.StoreInt64(c.currentOffsets[tp.Partition], int64(tp.Offset))
There was a problem hiding this comment.
i was worried about a message arriving from a partition that we did not expect. in such a case we would then call atomic.StoreInt64(nil, int64(tp.Offset)) if we didn't verify first that this partition id exists in c.currentOffsets
There was a problem hiding this comment.
oh, i missed that you are setting offsetPtr to c.currentOffsets[tp.Partition] on L#221
kafka/consumer.go
Outdated
| c.consumer.Unassign() | ||
| log.Info("kafka-consumer: Revoked partitions: %+v", e) | ||
| case confluent.PartitionEOF: | ||
| fmt.Printf("%% Reached %v\n", e) |
There was a problem hiding this comment.
this should either be removed, or use log.Debug. But i think we should just set enable.partition.eof to false in the confluent.ConfigMap to prevent these events from being emitted.
kafka/consumer.go
Outdated
| c.partitionLogSize[partition].Set(int(newest)) | ||
| } | ||
|
|
||
| c.partitionOffset[partition].Set(int(offset)) |
There was a problem hiding this comment.
this is already set on L#252
kafka/consumer.go
Outdated
| currentOffset = time.Now().Add(-1*offsetDuration).UnixNano() / int64(time.Millisecond) | ||
| currentOffset, _, err = c.tryGetOffset(topic, partition, currentOffset, 3, time.Second) | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
If the offset is outside of the range what kafka has, it will return an error. If that happens we just want to use oldest, not return an error.
There was a problem hiding this comment.
actually this comment sounds like in this case it would just use oldest:
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/consumer.go#L488-L490
but i'll fall back to oldest in case of any error now
kafka/consumer.go
Outdated
| return 0, 0, err | ||
| } | ||
|
|
||
| var val1, val2 int64 |
There was a problem hiding this comment.
lets rename these to something meaningful.
kafka/consumer.go
Outdated
| times, err = c.consumer.OffsetsForTimes(times, c.conf.MetadataTimeout) | ||
| if err == nil { | ||
| if len(times) == 0 { | ||
| err = fmt.Errorf("Got 0 topics returned from broker") |
There was a problem hiding this comment.
we need to fall back to offsetEnd
There was a problem hiding this comment.
why should this fall back to offsetEnd, while in the above case it should fall back to offsetBeginning? shouldn't they both fall back to offsetBeginning?
There was a problem hiding this comment.
how is that?
Lines 360 to 383 in 985f69a
There was a problem hiding this comment.
yep, i meant offsetBeginning.
kafka/consumer.go
Outdated
| return c.consumer.Assign(topicPartitions) | ||
| } | ||
|
|
||
| func (c *Consumer) tryGetOffset(topic string, partition int32, offsetI int64, attempts int, sleep time.Duration) (int64, int64, error) { |
There was a problem hiding this comment.
why does this return 2 values?
There was a problem hiding this comment.
changing that. it made sense in an older version of the code, but now not anymore
kafka/partitions.go
Outdated
| continue | ||
| } | ||
|
|
||
| if tm, ok = metadata.Topics[topic]; !ok || len(tm.Partitions) == 0 { |
There was a problem hiding this comment.
no need to check if topic is in metadata.Topics, it has already been checked.
Maybe change these 2 checks into
tm, ok := metadata.Topics[topic]
if !ok || tm.Error.Code() == confluent.ErrUnknownTopic {
log.Warn("kafka: unknown topic %s, %d retries", topic, retry)
time.Sleep()
continue
}
if len(tm.Partitions) == 0 {
log.Warn("kafka: 0 partitions returned for %s, %d retries left, %d backoffMs", topic, retry, backoff)
sleep()
continue
}
mdata/notifierKafka/cfg.go
Outdated
| fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration") | ||
| fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index") | ||
| fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") | ||
| fs.IntVar(&batchNumMessages, "batch-num-messages", 10000, "Maximum number of messages batched in one MessageSet") |
There was a problem hiding this comment.
ass all of these flag vars are just going to be put into a kafka.ConsumerConf{} why just just a package global consumerConfig
eg
var consumerConfig kafka.ConsumerConf
func init() {
consumerConfig = kafka.NewConfig()
fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
fs.IntVar(&consumerConfig.MetadataTimeout, "consumer-metadata-timeout-ms", 10000, "Maximum time to wait for the broker to send its metadata in ms")
...
mdata/notifierKafka/notifierKafka.go
Outdated
| break EVENTS | ||
| } | ||
| default: | ||
| fmt.Printf("Ignored unexpected event: %s\n", ev) |
There was a problem hiding this comment.
this should be a log message
fdd5285 to
7f010ed
Compare
where is it being disabled? i don't see that in our code and according to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md it defaults to true |
0884f9e to
718029b
Compare
here: https://github.com/grafana/metrictank/pull/879/files#diff-57960cc1ee87fab707306aab51440a91R102 |
|
I ran two types of benchmarks now, comparing the current When I fed them with a steady stream of data that was below their maximum, then the cpu & memory usage metrics look pretty similar. When they have to replay the backlog, which maxes them out, the current confluent branch seems to be quite a lot faster, but it also uses much more CPU/Memory. I think I should probably check if there is a way to optimize this memory usage. replaying backlog: master: confluent: steady consuming: master: confluent: |
|
we decided to go forward with sarama instead. see #906 |
Replaces the sarama consumers with confluent ones.
Also gets rid of the duplication between the kafka notifier and kafka input by moving all kafka consumer related stuff into a new struct that's used by both of them.