Skip to content

Commit c64ea49

Browse files
authored
Merge a9e8d3f into d39bdcb
2 parents d39bdcb + a9e8d3f commit c64ea49

8 files changed

Lines changed: 313 additions & 13 deletions

File tree

docs/docs/en/guide/remote-logging.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ If you deploy DolphinScheduler in `Standalone` mode, you only need to configure
1010
```properties
1111
# Whether to enable remote logging
1212
remote.logging.enable=false
13-
# if remote.logging.enable = true, set the target of remote logging
13+
# if remote.logging.enable = true, set the target of remote logging, currently support OSS, S3, GCS, ABS
1414
remote.logging.target=OSS
1515
# if remote.logging.enable = true, set the log base directory
1616
remote.logging.base.dir=logs
@@ -66,12 +66,12 @@ remote.logging.google.cloud.storage.bucket.name=<your-bucket>
6666
Configure `common.properties` as follows:
6767

6868
```properties
69-
# abs container name, required if you set resource.storage.type=ABS
70-
resource.azure.blob.storage.container.name=<your-container>
7169
# abs account name, required if you set resource.storage.type=ABS
72-
resource.azure.blob.storage.account.name=<your-account-name>
73-
# abs connection string, required if you set resource.storage.type=ABS
74-
resource.azure.blob.storage.connection.string=<your-connection-string>
70+
remote.logging.abs.account.name=<your-account-name>
71+
# abs account key, required if you set resource.storage.type=ABS
72+
remote.logging.abs.account.key=<your-account-key>
73+
# abs container name, required if you set resource.storage.type=ABS
74+
remote.logging.abs.container.name=<your-container-name>
7575
```
7676

7777
### Notice

docs/docs/zh/guide/remote-logging.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Apache DolphinScheduler支持将任务日志传输到远端存储上。当配置
1010
```properties
1111
# 是否开启远程日志存储
1212
remote.logging.enable=true
13-
# 任务日志写入的远端存储,目前支持OSS, S3, GCS
13+
# 任务日志写入的远端存储,目前支持OSS, S3, GCS, ABS
1414
remote.logging.target=OSS
1515
# 任务日志在远端存储上的目录
1616
remote.logging.base.dir=logs
@@ -66,12 +66,12 @@ remote.logging.google.cloud.storage.bucket.name=<your-bucket>
6666
配置`common.propertis`如下:
6767

6868
```properties
69-
# abs container name, required if you set resource.storage.type=ABS
70-
resource.azure.blob.storage.container.name=<your-container>
7169
# abs account name, required if you set resource.storage.type=ABS
72-
resource.azure.blob.storage.account.name=<your-account-name>
73-
# abs connection string, required if you set resource.storage.type=ABS
74-
resource.azure.blob.storage.connection.string=<your-connection-string>
70+
remote.logging.abs.account.name=<your-account-name>
71+
# abs account key, required if you set resource.storage.type=ABS
72+
remote.logging.abs.account.key=<your-account-key>
73+
# abs container name, required if you set resource.storage.type=ABS
74+
remote.logging.abs.container.name=<your-container-name>
7575
```
7676

7777
### 注意事项

dolphinscheduler-common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@
9898
<artifactId>esdk-obs-java-bundle</artifactId>
9999
</dependency>
100100

101+
<dependency>
102+
<groupId>com.azure</groupId>
103+
<artifactId>azure-storage-blob</artifactId>
104+
</dependency>
105+
101106
<dependency>
102107
<groupId>com.github.oshi</groupId>
103108
<artifactId>oshi-core</artifactId>

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,13 @@ private Constants() {
725725

726726
public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";
727727

728+
/**
729+
* remote logging for ABS
730+
*/
731+
public static final String REMOTE_LOGGING_ABS_ACCOUNT_NAME = "remote.logging.abs.account.name";
732+
public static final String REMOTE_LOGGING_ABS_ACCOUNT_KEY = "remote.logging.abs.account.key";
733+
public static final String REMOTE_LOGGING_ABS_CONTAINER_NAME = "remote.logging.abs.container.name";
734+
728735
/**
729736
* data quality
730737
*/
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.common.log.remote;
19+
20+
import org.apache.dolphinscheduler.common.constants.Constants;
21+
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
25+
import java.io.Closeable;
26+
import java.io.FileOutputStream;
27+
import java.io.IOException;
28+
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import com.azure.storage.blob.BlobContainerClient;
32+
import com.azure.storage.blob.BlobServiceClient;
33+
import com.azure.storage.blob.BlobServiceClientBuilder;
34+
import com.azure.storage.blob.specialized.BlobInputStream;
35+
import com.azure.storage.common.StorageSharedKeyCredential;
36+
37+
@Slf4j
38+
public class AbsRemoteLogHandler implements RemoteLogHandler, Closeable {
39+
40+
private String accountName;
41+
42+
private String accountKey;
43+
44+
private String containerName;
45+
46+
private BlobContainerClient blobContainerClient;
47+
48+
private static AbsRemoteLogHandler instance;
49+
50+
private AbsRemoteLogHandler() {
51+
accountName = readAccountName();
52+
accountKey = readAccountKey();
53+
containerName = readContainerName();
54+
blobContainerClient = buildBlobContainerClient();
55+
}
56+
57+
public static synchronized AbsRemoteLogHandler getInstance() {
58+
if (instance == null) {
59+
instance = new AbsRemoteLogHandler();
60+
}
61+
62+
return instance;
63+
}
64+
65+
protected BlobContainerClient buildBlobContainerClient() {
66+
67+
BlobServiceClient serviceClient = new BlobServiceClientBuilder()
68+
.endpoint(String.format("https://%s.blob.core.windows.net/", accountName))
69+
.credential(new StorageSharedKeyCredential(accountName, accountKey))
70+
.buildClient();
71+
72+
if (StringUtils.isBlank(containerName)) {
73+
throw new IllegalArgumentException("remote.logging.abs.container.name is blank");
74+
}
75+
76+
try {
77+
this.blobContainerClient = serviceClient.getBlobContainerClient(containerName);
78+
} catch (Exception ex) {
79+
throw new IllegalArgumentException(
80+
"containerName: " + containerName + " is not exists, you need to create them by yourself");
81+
}
82+
83+
log.info("containerName: {} has been found.", containerName);
84+
85+
return blobContainerClient;
86+
}
87+
88+
@Override
89+
public void close() throws IOException {
90+
// no need to close blobContainerClient
91+
}
92+
93+
@Override
94+
public void sendRemoteLog(String logPath) {
95+
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
96+
97+
try {
98+
log.info("send remote log {} to Azure Blob {}", logPath, objectName);
99+
blobContainerClient.getBlobClient(objectName).uploadFromFile(logPath);
100+
} catch (Exception e) {
101+
log.error("error while sending remote log {} to Azure Blob {}", logPath, objectName, e);
102+
}
103+
}
104+
105+
@Override
106+
public void getRemoteLog(String logPath) {
107+
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
108+
109+
try {
110+
log.info("get remote log on Azure Blob {} to {}", objectName, logPath);
111+
112+
try (
113+
BlobInputStream bis = blobContainerClient.getBlobClient(objectName).openInputStream();
114+
FileOutputStream fos = new FileOutputStream(logPath)) {
115+
byte[] readBuf = new byte[1024];
116+
int readLen = 0;
117+
while ((readLen = bis.read(readBuf)) > 0) {
118+
fos.write(readBuf, 0, readLen);
119+
}
120+
}
121+
} catch (Exception e) {
122+
log.error("error while getting remote log on Azure Blob {} to {}", objectName, logPath, e);
123+
}
124+
}
125+
126+
protected String readAccountName() {
127+
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME);
128+
}
129+
130+
protected String readAccountKey() {
131+
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY);
132+
}
133+
134+
protected String readContainerName() {
135+
return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME);
136+
}
137+
}

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public RemoteLogHandler getRemoteLogHandler() {
3939
return S3RemoteLogHandler.getInstance();
4040
} else if ("GCS".equals(target)) {
4141
return GcsRemoteLogHandler.getInstance();
42+
} else if ("ABS".equals(target)) {
43+
return AbsRemoteLogHandler.getInstance();
4244
}
4345

4446
log.error("No suitable remote logging target for {}", target);

dolphinscheduler-common/src/main/resources/common.properties

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,9 @@ remote.logging.s3.region=<region>
202202
remote.logging.google.cloud.storage.credential=/path/to/credential
203203
# gcs bucket name, required if you set remote.logging.target=GCS
204204
remote.logging.google.cloud.storage.bucket.name=<your-bucket>
205-
205+
# abs account name, required if you set resource.storage.type=ABS
206+
remote.logging.abs.account.name=<your-account-name>
207+
# abs account key, required if you set resource.storage.type=ABS
208+
remote.logging.abs.account.key=<your-account-key>
209+
# abs container name, required if you set resource.storage.type=ABS
210+
remote.logging.abs.container.name=<your-container-name>
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.common.log.remote;
19+
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.when;
23+
24+
import org.apache.dolphinscheduler.common.constants.Constants;
25+
import org.apache.dolphinscheduler.common.utils.LogUtils;
26+
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.extension.ExtendWith;
33+
import org.mockito.Mock;
34+
import org.mockito.MockedConstruction;
35+
import org.mockito.MockedStatic;
36+
import org.mockito.Mockito;
37+
import org.mockito.junit.jupiter.MockitoExtension;
38+
39+
import com.azure.storage.blob.BlobClient;
40+
import com.azure.storage.blob.BlobContainerClient;
41+
import com.azure.storage.blob.BlobServiceClient;
42+
import com.azure.storage.blob.BlobServiceClientBuilder;
43+
import com.azure.storage.common.StorageSharedKeyCredential;
44+
45+
@Slf4j
46+
@ExtendWith(MockitoExtension.class)
47+
public class AbsRemoteLogHandlerTest {
48+
49+
@Mock
50+
BlobServiceClient blobServiceClient;
51+
52+
@Mock
53+
BlobContainerClient blobContainerClient;
54+
55+
@Mock
56+
BlobClient blobClient;
57+
58+
@Test
59+
public void testAbsRemoteLogHandlerContainerNameBlack() {
60+
try (
61+
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
62+
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) {
63+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME))
64+
.thenReturn("account_name");
65+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY))
66+
.thenReturn("account_key");
67+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME))
68+
.thenReturn("");
69+
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");
70+
71+
IllegalArgumentException thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> {
72+
AbsRemoteLogHandler.getInstance();
73+
});
74+
Assertions.assertEquals("remote.logging.abs.container.name is blank", thrown.getMessage());
75+
}
76+
}
77+
78+
@Test
79+
public void testAbsRemoteLogHandlerContainerNotExists() {
80+
try (
81+
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
82+
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class);
83+
MockedConstruction<BlobServiceClientBuilder> k8sClientWrapperMockedConstruction =
84+
Mockito.mockConstruction(BlobServiceClientBuilder.class, (mock, context) -> {
85+
when(mock.endpoint(any(String.class))).thenReturn(mock);
86+
when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock);
87+
when(mock.buildClient())
88+
.thenReturn(blobServiceClient);
89+
})) {
90+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME))
91+
.thenReturn("account_name");
92+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY))
93+
.thenReturn("account_key");
94+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME))
95+
.thenReturn("container_name");
96+
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");
97+
98+
when(blobServiceClient.getBlobContainerClient(any(String.class))).thenThrow(
99+
new NullPointerException("container not exists"));
100+
IllegalArgumentException thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> {
101+
AbsRemoteLogHandler.getInstance();
102+
});
103+
Assertions.assertEquals("containerName: container_name is not exists, you need to create them by yourself",
104+
thrown.getMessage());
105+
}
106+
}
107+
108+
@Test
109+
public void testAbsRemoteLogHandler() {
110+
111+
try (
112+
MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class);
113+
MockedStatic<LogUtils> remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class);
114+
MockedConstruction<BlobServiceClientBuilder> blobServiceClientBuilderMockedConstruction =
115+
Mockito.mockConstruction(BlobServiceClientBuilder.class, (mock, context) -> {
116+
when(mock.endpoint(any(String.class))).thenReturn(mock);
117+
when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock);
118+
when(mock.buildClient())
119+
.thenReturn(blobServiceClient);
120+
});
121+
MockedStatic<RemoteLogUtils> remoteLogUtilsMockedStatic1 = Mockito.mockStatic(RemoteLogUtils.class)) {
122+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME))
123+
.thenReturn("account_name");
124+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY))
125+
.thenReturn("account_key");
126+
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME))
127+
.thenReturn("container_name");
128+
remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs");
129+
String logPath = "logpath";
130+
String objectName = "objectname";
131+
remoteLogUtilsMockedStatic1.when(() -> RemoteLogUtils.getObjectNameFromLogPath(logPath))
132+
.thenReturn(objectName);
133+
134+
when(blobServiceClient.getBlobContainerClient(any(String.class))).thenReturn(blobContainerClient);
135+
when(blobContainerClient.getBlobClient(objectName)).thenReturn(blobClient);
136+
137+
AbsRemoteLogHandler absRemoteLogHandler = AbsRemoteLogHandler.getInstance();
138+
Assertions.assertNotNull(absRemoteLogHandler);
139+
140+
absRemoteLogHandler.sendRemoteLog(logPath);
141+
Mockito.verify(blobClient, times(1)).uploadFromFile(logPath);
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)