Skip to content

Conversation

@HenryCaiHaiying
Copy link
Contributor

This fixed issue: #651

During indexer starts up, it was supposed to find all the existing metadata snapshots from ZK and only pull the newer data after the last snapshot.

But the listAsync() call from KaldbPartitionMetadataStore couldn't retrieve all existing snapshots from ZK (in fact most of the time it retrieved 0 snapshots), changing the listAsync() to listSync() seems fixing the problem.

See discussion thread here: https://slack-pde.slack.com/archives/C02028H9LHJ/p1692300820984549?thread_ts=1691511612.465979&cid=C02028H9LHJ

Summary

Describe the goal of this PR. Mention any related Issue numbers.

Requirements

This fixed issue: #651

During indexer starts up, it was supposed to find all the existing metadata snapshots from ZK and only pull the newer data after the last snapshot.

But the listAsync() call from KaldbPartitionMetadataStore couldn't retrieve all existing snapshots from ZK (in fact most of the time it retrieved 0 snapshots), changing the listAsync() to listSync() seems fixing the problem.

See discussion thread here: https://slack-pde.slack.com/archives/C02028H9LHJ/p1692300820984549?thread_ts=1691511612.465979&cid=C02028H9LHJ
@github-actions
Copy link

Review:

  1. In the method listAsync(), the code is using CompletableFuture.allOf() to wait for all completion stages to complete. However, it is not handling any exceptions that may occur during the execution of the completion stages. It would be better to use CompletableFuture.allOf(...).exceptionally(...) to handle any exceptions and return a completed future with the exception.

Example:

return CompletableFuture.allOf(completionStages.toArray(new CompletableFuture[completionStages.size()]))
    .exceptionally(ex -> {
        LOG.error("Error listing nodes", ex);
        throw new InternalMetadataStoreException("Error listing nodes", ex);
    })
    .thenApply(...);
  1. In the method listSync(), the code is using a try-catch block to handle exceptions when calling listAsync().toCompletableFuture().get(...). However, it is not handling any exceptions that may occur when retrieving the snapshots from the metadata stores. It would be better to handle these exceptions and throw a more specific exception, such as MetadataStoreException, instead of a generic InternalMetadataStoreException.

Example:

try {
    List<T> asyncList = listAsync().toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
    LOG.info("There are {} from async list.", asyncList.size());
    return asyncList;
} catch (ExecutionException | InterruptedException | TimeoutException e) {
    throw new MetadataStoreException("Error retrieving snapshots from metadata stores", e);
}
  1. In the method RecoveryTaskCreator.createRecoveryTasks(), the code is logging the number of snapshots and recovery tasks for debugging purposes. It would be better to use a logger with debug level instead of info level to avoid cluttering the logs with unnecessary information.

Example:

LOG.debug("There are {} snapshots", snapshots.size());
LOG.debug("There are {} snapshots for partition {}", snapshotsForPartition.size(), partitionId);
LOG.debug("There are {} nonLive snapshots.", nonLiveSnapshotsForPartition.size());
LOG.debug("There are {} recoveryTasks", recoveryTasks.size());
  1. In the method RecoveryTaskCreator.createRecoveryTasks(), the code is filtering the snapshots based on a condition and then collecting them into a list. Instead of using filter(...).collect(...), it would be more efficient to use stream().collect(...) with a custom collector that filters and collects the snapshots in a single pass.

Example:

List<SnapshotMetadata> snapshotsForPartition = snapshots.stream()
    .collect(Collectors.collectingAndThen(
        Collectors.partitioningBy(s -> s.indexed && s.partitionId.equals(partitionId)),
        map -> map.get(true)));
  1. In the method RecoveryTaskCreator.createRecoveryTasks(), the code is filtering the non-live snapshots based on a condition and then collecting them into a list. Instead of using filter(...).collect(...), it would be more efficient to use stream().collect(...) with a custom collector that filters and collects the non-live snapshots in a single pass.

Example:

List<SnapshotMetadata> nonLiveSnapshotsForPartition = snapshotsForPartition.stream()
    .filter(s -> !deletedSnapshots.contains(s))
    .collect(Collectors.toList());
  1. In the method RecoveryTaskCreator.createRecoveryTasks(), the code is retrieving the highest durable offset for a partition by calling getHighestDurableOffsetForPartition(...). However, the implementation of getHighestDurableOffsetForPartition(...) is not shown. It would be helpful to see the implementation of this method in order to provide specific feedback.

@bryanlb bryanlb closed this in #655 Aug 31, 2023
@bryanlb bryanlb deleted the henry_kafka branch September 12, 2023 16:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants