Skip to content

Commit 16b7bb4

Browse files
committed
[Feature][Metrics] Add resource download related metrics for workers (#9324)
1 parent 4ac3349 commit 16b7bb4

4 files changed

Lines changed: 73 additions & 4 deletions

File tree

docs/docs/en/guide/metrics/metrics.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
7474
- ds.task.execution.count.by.type: (counter) the number of task executions grouped by tag `task_type`
7575
- ds.task.running: (gauge) the number of running tasks
7676
- ds.task.prepared: (gauge) the number of tasks prepared for task queue
77-
- ds.task.execution.count: (histogram) the number of executed tasks
77+
- ds.task.execution.count: (counter) the number of executed tasks
7878
- ds.task.execution.duration: (histogram) duration of task executions
7979

8080

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,4 +244,15 @@ public static boolean directoryTraversal(String filename){
244244
}
245245
}
246246

247+
/**
248+
* Get file size in KB
249+
*
250+
* @param filename String type of filename
251+
* @return file size in KB
252+
*/
253+
public static double getFileSizeInKB(String filename){
254+
File file = new File(filename);
255+
return ((double) file.length()) / 1024;
256+
}
257+
247258
}

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
package org.apache.dolphinscheduler.server.worker.metrics;
1919

20+
import java.util.concurrent.TimeUnit;
2021
import java.util.function.Supplier;
2122

22-
import io.micrometer.core.instrument.Counter;
23-
import io.micrometer.core.instrument.Gauge;
24-
import io.micrometer.core.instrument.Metrics;
23+
import io.micrometer.core.instrument.*;
2524
import lombok.experimental.UtilityClass;
2625

2726
@UtilityClass
@@ -37,6 +36,36 @@ public class WorkerServerMetrics {
3736
.description("full worker submit queues count")
3837
.register(Metrics.globalRegistry);
3938

39+
private static final Counter WORKER_RESOURCE_DOWNLOAD_COUNTER =
40+
Counter.builder("ds.worker.resource.download.count")
41+
.description("worker resource download count")
42+
.register(Metrics.globalRegistry);
43+
44+
private static final Counter WORKER_RESOURCE_DOWNLOAD_SUCCESS_COUNTER =
45+
Counter.builder("ds.worker.resource.download.success.count")
46+
.description("worker resource download success count")
47+
.register(Metrics.globalRegistry);
48+
49+
private static final Counter WORKER_RESOURCE_DOWNLOAD_FAILURE_COUNTER =
50+
Counter.builder("ds.worker.resource.download.failure.count")
51+
.description("worker resource download failure count")
52+
.register(Metrics.globalRegistry);
53+
54+
private static final Timer WORKER_RESOURCE_DOWNLOAD_DURATION_TIMER =
55+
Timer.builder("ds.worker.resource.download.duration")
56+
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
57+
.publishPercentileHistogram()
58+
.description("time cost of resource download on workers")
59+
.register(Metrics.globalRegistry);
60+
61+
private static final DistributionSummary WORKER_RESOURCE_DOWNLOAD_SIZE_DISTRIBUTION =
62+
DistributionSummary.builder("ds.worker.resource.download.size")
63+
.baseUnit("KB")
64+
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
65+
.publishPercentileHistogram()
66+
.description("size of downloaded resource files on worker")
67+
.register(Metrics.globalRegistry);
68+
4069
public static void incWorkerOverloadCount() {
4170
WORKER_OVERLOAD_COUNTER.increment();
4271
}
@@ -45,6 +74,26 @@ public static void incWorkerSubmitQueueIsFullCount() {
4574
WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER.increment();
4675
}
4776

77+
public static void incWorkerResourceDownloadCount() {
78+
WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER.increment();
79+
}
80+
81+
public static void incWorkerResourceDownloadSuccessCount() {
82+
WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER.increment();
83+
}
84+
85+
public static void incWorkerResourceDownloadFailureCount() {
86+
WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER.increment();
87+
}
88+
89+
public static void recordWorkerResourceDownloadTime(long milliseconds) {
90+
WORKER_RESOURCE_DOWNLOAD_DURATION_TIMER.record(milliseconds, TimeUnit.MILLISECONDS);
91+
}
92+
93+
public static void recordWorkerResourceDownloadSize(double size) {
94+
WORKER_RESOURCE_DOWNLOAD_SIZE_DISTRIBUTION.record(size);
95+
}
96+
4897
public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
4998
Gauge.builder("ds.task.running", supplier)
5099
.description("number of running tasks on workers")

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2929
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
3030
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
31+
import org.apache.dolphinscheduler.common.utils.FileUtils;
3132
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
3233
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
3334
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -36,6 +37,7 @@
3637
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3738
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
3839
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
40+
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
3941
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
4042
import org.apache.dolphinscheduler.service.alert.AlertClientService;
4143
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
@@ -273,13 +275,20 @@ public void kill() {
273275
public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
274276
for (Pair<String, String> fileDownload : fileDownloads) {
275277
try {
278+
WorkerServerMetrics.incWorkerResourceDownloadCount();
276279
// query the tenant code of the resource according to the name of the resource
277280
String fullName = fileDownload.getLeft();
278281
String tenantCode = fileDownload.getRight();
279282
String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
280283
logger.info("get resource file from hdfs :{}", resHdfsPath);
284+
Long resourceDownloadStartTime = System.currentTimeMillis();
281285
storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
286+
WorkerServerMetrics.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
287+
WorkerServerMetrics.recordWorkerResourceDownloadSize(
288+
FileUtils.getFileSizeInKB(execLocalPath + File.separator + fullName));
289+
WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
282290
} catch (Exception e) {
291+
WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
283292
logger.error(e.getMessage(), e);
284293
throw new ServiceException(e.getMessage());
285294
}

0 commit comments

Comments
 (0)