[native] Expose an API to clean up async data cache on node#24530
Conversation
| if (nodeState() == NodeState::kActive) { | ||
| auto* asyncDataCache = velox::cache::AsyncDataCache::getInstance(); | ||
| if (asyncDataCache != nullptr) { | ||
| asyncDataCache->clear(); |
There was a problem hiding this comment.
There is a VELOX_CHECK in clear() that might fail. Shall we catch it, log the failure and rethrow?
void AsyncDataCache::clear() {
for (auto& shard : shards_) {
memory::Allocation unused;
shard->evict(std::numeric_limits<uint64_t>::max(), true, 0, unused);
VELOX_CHECK(unused.empty());
}
}
| LOG(INFO) << "async data cache clean up is successful"; | ||
| } | ||
| else { | ||
| LOG(ERROR) << "Issue in async data cache clean up"; |
There was a problem hiding this comment.
Let's be more specific, and just say "Cannot acquire the AsyncDataCache instance"
| nativeQueryRunnerParameters.workerCount, | ||
| cacheMaxSize, | ||
| DEFAULT_STORAGE_FORMAT, | ||
| true, |
There was a problem hiding this comment.
Why are we setting addStorageFormatToPath to true here?
There was a problem hiding this comment.
I just added this to add storage name in the data folder path.
| assertEquals(0, finalMetrics.entries, "Cache should be empty after cleanup."); | ||
| } | ||
|
|
||
| private Metrics collectCacheMetrics(Set<InternalNode> workerNodes, DistributedQueryRunner distributedQueryRunner, String endpoint) |
There was a problem hiding this comment.
distributedQueryRunner is not used
| int hits = 0; | ||
| int entries = 0; | ||
| for (InternalNode worker : workerNodes) { | ||
| Map<String, Long> metrics = fetchMetrics(worker.getInternalUri().toString(), endpoint, "GET"); |
There was a problem hiding this comment.
Seems this function only supports scalar integer type metrics. How do you plan to collect histogram type metics in the future? Will you add new logic to this method or create another method? If it's the latter, maybe it'll be clearer to rename this one to fetchScalarLongMetrics?
| defaultQueryRunner.close(); | ||
|
|
||
| return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false); | ||
| return createNativeQueryRunner(dataDirectory.get().toString(), prestoServerPath.get(), workerCount, cacheMaxSize, true, Optional.empty(), storageFormat, addStorageFormatToPath, false, isCoordinatorSidecarEnabled, false, enableRuntimeMetricsCollection); |
f47d63b to
eba0f8a
Compare
|
What is the use case to clear the cache? we already have a pushback mechanism now? |
pramodsatya
left a comment
There was a problem hiding this comment.
Thanks @agrawalreetika.
|
|
||
| void reportNodeStatus(proxygen::ResponseHandler* downstream); | ||
|
|
||
| void cleanAsynDataCache( |
There was a problem hiding this comment.
nit: cleanAsyncDataCache
| false); | ||
| } | ||
|
|
||
| public static QueryRunner createQueryRunner(boolean enableRuntimeMetricsCollection) |
There was a problem hiding this comment.
would be better to add this parameter to the existing createQueryRunner function:
public static QueryRunner createQueryRunner(boolean addStorageFormatToPath, boolean isCoordinatorSidecarEnabled)
| int entries = 0; | ||
| for (InternalNode worker : workerNodes) { | ||
| Map<String, Long> metrics = fetchScalarLongMetrics(worker.getInternalUri().toString(), endpoint, "GET"); | ||
| hits += metrics.getOrDefault("velox_memory_cache_num_hits", 0L); |
There was a problem hiding this comment.
Where are the configs velox_memory_cache_num_hits and velox_memory_cache_num_entries defined?
There was a problem hiding this comment.
These counters should be defined on Velox side
| proxygen::ResponseHandler* downstream) { | ||
| server->reportMemoryInfo(downstream); | ||
| }); | ||
| httpServer_->registerPut( |
There was a problem hiding this comment.
Why is the v1/memory endpoint overloaded? Might be better to add a new endpoint v1/memory/clear
Sorry, I missed this. Could you please provide the details around if this is something could be done with pushback mechanism? |
eba0f8a to
4f2836e
Compare
@agrawalreetika Can you please add this context in the PR message? Also, |
yingsu00
left a comment
There was a problem hiding this comment.
@agrawalreetika THere is test failure:
Error: Failures:
Error: TestPrestoNativeAsyncDataCacheCleanupAPI.testAsyncDataCacheCleanup:67->collectCacheMetrics:100 ? FileNotFound http://127.0.0.1:37499/v1/info/metrics
9aa9f06 to
6a44bb0
Compare
xiaoxmeng
left a comment
There was a problem hiding this comment.
@agrawalreetika we already support to cleanup cache through We already support ssd/memory cache cleanup through v1/operation/server/clearCache?type=memory cc @tanjialiang @zacw7 Thanks!
|
@xiaoxmeng Thanks for the heads up! However we didn't find any tests or documentation for it. Reetika will rearrange this PR to add them if you don't mind. |
Here is the implementation and it is only used in operation: presto-native-execution/presto_cpp/main/ServerOperation.cpp. The server operation can support different kinds of operations in addition to cache cleanup. We could help adding tests for this instead of duplicating the implementation. cc @zacw7 @tanjialiang |
|
I'll be adding the tests for the server operations. Thanks! |
6a44bb0 to
d9e7485
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Thanks for the doc! A couple of nits only.
f18a099 to
0039295
Compare
0a73768 to
304fbec
Compare
zacw7
left a comment
There was a problem hiding this comment.
LGTM. Thanks again for adding the tests!
xiaoxmeng
left a comment
There was a problem hiding this comment.
@agrawalreetika thanks for e2e test coverage!
| workerCount, | ||
| Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)), | ||
| getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin, isCoordinatorSidecarEnabled), | ||
| getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin, isCoordinatorSidecarEnabled, enableRuntimeMetricsCollection, enableSsdCache), |
There was a problem hiding this comment.
nit: this line is too long
|
|
||
| public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled) | ||
| public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(String catalogName, String prestoServerPath, int cacheMaxSize, Optional<String> remoteFunctionServerUds, Boolean failOnNestedLoopJoin, | ||
| boolean isCoordinatorSidecarEnabled, boolean enableRuntimeMetricsCollection, boolean enableSsdCache) |
There was a problem hiding this comment.
Make each parameter on a single line
| for (int i = 0; i < 4; i++) { | ||
| queryRunner.execute(session, "SELECT count(*) FROM customer"); | ||
| } | ||
| TimeUnit.SECONDS.sleep(60); // Sleep to allow cache updates |
There was a problem hiding this comment.
Sleeping 60 seconds? Is it too long? This means this test would take at least 5 minutes. Can we change it to 10 sec?
312b40d
304fbec to
312b40d
Compare
|
@agrawalreetika Which test fails? I see I wonder if it's because we reduced the wait time too much. But it surprises me if 10s is not enough to see the cache update |
312b40d to
265b439
Compare
|
@zacw7 @xiaoxmeng I tried reducing the sleep window to 10s and 30s to facilitate cache updates, but it seems the updates are failing. Do you have any insights on this? |
Description
Adding e2e tests for existing cache APIs
Motivation and Context
Adding e2e tests for existing cache APIs
Impact
Adding e2e tests for existing cache APIs -
Test Plan
Test Added
Contributor checklist
Release Notes