feat: Add support for Kafka clusters sharding#1454
Conversation
This allows to configured many different Kafka clusters per topic. The
basic idea is to provide the numebr of shards and the lower bound for
the range with the kafka config name and the topic name on that cluster
```
metrics:
shards: 65000
mapping:
0:
name: "ingest-metrics-1"
config: "metric_1"
25000:
name: "ingest-metrics-2"
config: "metrics_2"
45000:
name: "ingest-metrics-3"
config: "metrics_3"
```
This will also allow to configure one Kafka clusters and use different
topics names on it.
INGEST-1592
| pub struct Sharded { | ||
| /// The number of shards used for this topic. | ||
| shards: u64, | ||
| /// The Kafka configuration assigned to the specific shard range. |
There was a problem hiding this comment.
could you describe that the u64 is the start index and the next u64 describes the range. Explicitly calling out what's inclusive (i'm assuming the start u64 is inclusive, the end u64 is excluded and part of the next range). Maybe even write an example out on the struct doc comment like you did in the PR description.
relay-server/src/actors/outcome.rs
Outdated
| let org_id_hash = hasher.finish(); | ||
| let shard = org_id_hash % shards; | ||
|
|
||
| // should be ok to unwrap since we MUST have at least one range defined |
There was a problem hiding this comment.
This is reasonable but because it is so far apart from each other in the code I'm a bit uncomfortable about it. One way would be make a custom type for the producers, then it is enforced in that type and relying on that invariant would be a lot nicer and keeps the code interacting with the invariant close together. I guess this would be a newtype over the BTreeMap with a few small methods.
There was a problem hiding this comment.
That's a very good point. I will look into it.
It might also simplify the code and removes some duplication.
There was a problem hiding this comment.
So, I moved the same functionality under the Sharded producer in store actore d4a05c2.
I do not know if it's a good idea to create a new type over BTreeMap, which can be a bit more complicated to understand.
But I might have to pick your brain on this , and how to implement better.
relay-server/src/actors/store.rs
Outdated
| let mut hasher = FnvHasher::default(); | ||
| std::hash::Hash::hash(&organization_id, &mut hasher); | ||
| let org_id_hash = hasher.finish(); | ||
| let shard = org_id_hash % shards; |
There was a problem hiding this comment.
this code exists in two places already, given the nature of it - i suspect both would need updating together all the time - it might be better to create a method somewhere. Maybe also on the newtype around the BTreeMap I already suggested.
There was a problem hiding this comment.
moved this into the impl for Sharded producer in d4a05c2
| /// The maximum number of logical shards for this set of configs. | ||
| shards: u64, | ||
| /// The list of the sharded Kafka configs. | ||
| configs: BTreeMap<u64, KafkaParams<'a>>, |
There was a problem hiding this comment.
We have this mapping in three different places now, I wonder if it would be possible to have it only once.
There was a problem hiding this comment.
Yes, I think it makes sense this way.
Now Kafka config with topic name and parameters is produced from TopicAssignment enum
relay-config/src/config.rs
Outdated
| assert!(matches!( | ||
| kafka_config_profiles, | ||
| KafkaConfigName::Single { .. } | ||
| )); |
There was a problem hiding this comment.
Any way we could keep this assertion?
relay-config/src/config.rs
Outdated
| topic_name: topic.as_str(), | ||
| kafka_config_name: None, | ||
| }, | ||
| fn kafka_config_name<'a>( |
There was a problem hiding this comment.
Should we update the doc comment and the function name, now that this function returns a full config rather than a name?
| /// The maximum number of logical shards for this set of configs. | ||
| shards: u64, | ||
| /// The list of the sharded Kafka configs. | ||
| configs: BTreeMap<u64, KafkaParams<'a>>, |
There was a problem hiding this comment.
Yes, I think it makes sense this way.
relay-server/src/actors/store.rs
Outdated
| Self::Single { | ||
| topic_name, | ||
| producer, | ||
| } => (topic_name.as_str(), Arc::clone(producer)), |
There was a problem hiding this comment.
nit: You can probably change the signature of get_producer to return (&str, &ThreadedProducer) to save on Arc::clones.
|
|
||
| dir = config_dir("relay") | ||
| dir.join("config.yml").write(json.dumps(default_opts)) | ||
| dir.join("config.yml").write(yaml.dump(default_opts)) |
There was a problem hiding this comment.
we don't have to change this now, but I believe that at some point the configuration will have to become json-serializable. then we will have to revisit the schema. but that's fine
As a folllowup for #1454 we decided that we do not have to hash the org id to get the sahrd number and simple modulo #shard will be sufficient. As part of this small refactoring the `unwrap` was also removed and Result is returned instead, to propagate the error to the caller if it happens.
As a followup for #1454 we decided that we do not have to hash the org id to get the shard number and simple modulo #shard will be sufficient. As part of this small refactoring the `unwrap` was also removed and Result is returned instead, to propagate the error to the caller if it happens.
This allows to configured many different Kafka clusters per topic. The
basic idea is to provide the numebr of shards and the lower bound for
the range with the kafka config name and the topic name on that cluster
This will also allow to configure one Kafka clusters and use different
topics names on it.