[Feature-13331][Remote Logging] Add support for writing task logs to OSS#13332
[Feature-13331][Remote Logging] Add support for writing task logs to OSS#13332zhongjiajie merged 3 commits intoapache:devfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## dev #13332 +/- ##
============================================
- Coverage 39.57% 39.47% -0.11%
- Complexity 4373 4377 +4
============================================
Files 1097 1102 +5
Lines 41293 41437 +144
Branches 4723 4736 +13
============================================
+ Hits 16342 16356 +14
- Misses 23138 23263 +125
- Partials 1813 1818 +5
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
952fff4 to
d06cc4a
Compare
|
|
||
| # remote logging | ||
| remote.logging.enable=false | ||
| # if remote.logging.enable = true, set the target of remote logging |
There was a problem hiding this comment.
should we add a comment to tell users we support oss only currently?
There was a problem hiding this comment.
Sure, I'll add it in the doc.
| # oss endpoint, required if you set remote.logging.target=OSS | ||
| remote.logging.oss.endpoint=<endpoint> | ||
| # oss base directory, required if you set remote.logging.target=OSS | ||
| remote.logging.oss.base.dir=logs No newline at end of file |
There was a problem hiding this comment.
not sure whether some of our remote log plugin without base.dir, if all of them have this config, we should use remote.logging.base.dir instead of remote.logging.oss.base.dir
There was a problem hiding this comment.
I also thought about this problem, and I rechecked the relevant configuration in Airflow.
I think we could use remote.logging.base.dir instead, which can be shared by multiple plugins (OSS / S3 / GCS)
|
|
||
| void sendRemoteLog(String logPath); | ||
|
|
||
| void getRemoteLog(String logPath); |
There was a problem hiding this comment.
should we also not add an interface to test whether the current remote target work or not?
There was a problem hiding this comment.
Hi, @zhongjiajie , I am not sure about this.
Do you mean that the user can click a button somewhere on the front end to test whether the remote storage is available?
The usage scenario of remote logging is somewhat similar to that of the resource center, but it seems that StorageOperate does not currently have a similar interface.
But I add checkBucketNameExists(), like OssStorageOperator and S3StorageOperator.
|
|
||
| public RemoteLogHandler getRemoteLogHandler() { | ||
| if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) { | ||
| if ("OSS".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET))) { |
There was a problem hiding this comment.
we have getUpperCaseString method now
| if ("OSS".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET))) { | |
| if ("OSS".equals(PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET))) { |
There was a problem hiding this comment.
Sure, I'll use getUpperCaseString.
| public class RemoteLogHandlerFactory { | ||
|
|
||
| public RemoteLogHandler getRemoteLogHandler() { | ||
| if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) { |
There was a problem hiding this comment.
how about use PropertyUtils.getBoolean directly?
| private String getObjectNameFromLogPath(String logPath) { | ||
| Path path = Paths.get(logPath); | ||
| int nameCount = path.getNameCount(); | ||
| if (nameCount < 2) { |
There was a problem hiding this comment.
can we use constants for magic number 2?
| if ("true".equalsIgnoreCase(PropertyUtils.getString(Constants.REMOTE_LOGGING_ENABLE))) { | ||
| if (taskInstance.getHost().endsWith(masterAddress.split(":")[1])) { |
There was a problem hiding this comment.
use PropertyUtils.getBoolean instead of string, and we have COLON in constants for :
| logger.info("Succeed to send master's log {} to remote target {}", taskInstance.getLogPath(), | ||
| PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET)); | ||
| } catch (Exception e) { | ||
| logger.error("send master's log {} to remote target error", taskInstance.getLogPath(), e); |
There was a problem hiding this comment.
I find out we already catch exceptions in sendRemoteLog method is there any expected exception will be thrown during log sending? if not I think one catch is enough, in handler or in currently
There was a problem hiding this comment.
Sure, I'll look into it.
| logger.info("Start to send log {} to remote target {}", taskExecutionContext.getLogPath(), | ||
| PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET)); | ||
| logger.info("Wait log {} to be flushed...", taskExecutionContext.getLogPath()); | ||
| Thread.sleep(5000); |
There was a problem hiding this comment.
why we have to sleep for 5s here? can we run continue when we get log path immediately?
There was a problem hiding this comment.
The reason for waiting here is to allow the final output of the log to be written to the local file
Here is an example.
The green box in the figure is the last output of the task log, which is later than the time when the log is uploaded to the remote storage.
Therefore, if the thread does not use sleep(), the log uploaded to the remote storage will lose the statements in the green box.
WDYT, or is there any other better way to flush the final output of the log?
There was a problem hiding this comment.
If you trigger sendRemoteLogIfNeeded in afterExecute, it does have to sleep for a few seconds, because when handling large amount of logs, async flush will take a lot of time.
So, I recommend you trigger in logHandle when handling FINALIZE_SESSION_MARKER which marks that log appender will be closed in few seconds, flush logic in ds(you can check in parseProcessOutput) makes sure that no logs will reach in these few seconds and we don't have to wait for five seconds any more.
There was a problem hiding this comment.
If you trigger
sendRemoteLogIfNeededinafterExecute, it does have to sleep for a few seconds, because when handling large amount of logs, async flush will take a lot of time.So, I recommend you trigger in
logHandlewhen handlingFINALIZE_SESSION_MARKERwhich marks that log appender will be closed in few seconds, flush logic in ds(you can check inparseProcessOutput) makes sure that no logs will reach in these few seconds and we don't have to wait for five seconds any more.
Hi, @Radeity Thanks a lot for your suggestion! I think this is a good idea and I'll look into it.
|
Hi, @zhongjiajie , thanks a lot for your review and comments. I will carefully modify according to the suggestions and add related UT and documents. |
d06cc4a to
dfe6b71
Compare
dfe6b71 to
46d77ec
Compare
|
Hi, @zhongjiajie @Radeity, thanks again for your kind review. I have modified this PR according to your comments and suggestions. Here are some brief changes:
Here is some examples:
|
9700d2b to
c7b29cf
Compare
c7b29cf to
d9f4db3
Compare
d9f4db3 to
4b7ba61
Compare
|
Hi, @zhongjiajie , could you please help review this again? |
5af18c6 to
475906d
Compare
...mon/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java
Outdated
Show resolved
Hide resolved
...ler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java
Outdated
Show resolved
Hide resolved
...duler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
Outdated
Show resolved
Hide resolved
...duler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
Outdated
Show resolved
Hide resolved
...emote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java
Show resolved
Hide resolved
| this.logBuffer = new LinkedBlockingQueue<>(); | ||
|
|
||
| if (this.taskRequest != null) { | ||
| this.taskRequest.setLogHandleEnable(true); |
There was a problem hiding this comment.
Why set default true? Please add some comments.
There was a problem hiding this comment.
I've changed logHandleEnable to logBufferEnable for better readability.
This PR records whether the logBuffer is enabled in the task context for the following resaons:
There are two types of tasks:
- Tasks use
logBufferto cache logs (tasks that useShellCommandExecutorwhich extendsAbstractCommandExecutor)
- At the end of this type of task, the log is not completely written to the file system, so it needs to write the log to the remote storage when the cache is finally emptied
- it sends the task log to the remote storage in
clear()method inAbstractCommandExecutoras below
- Tasks that not use
logBuffer
- Note that not all tasks will use
logBuffer, such asZEPPELINtask and the tasks executed on master. So this PR also sends the task log to the remote storage inafterExecuted()andafterThrowing()on worker (if the task does not uselogBuffer) andtaskFinishedon master (if the task is executed on master).
b93a2e4 to
9252c8e
Compare
|
Hi, @caishunfeng , thanks a lot for your review and comments. I've modified it according to your suggestions, could you please help review this again? |
b70bed3 to
72c3daf
Compare
Sure, and please resolve the conflicts. |
72c3daf to
4ae1af8
Compare
|
Hi, @caishunfeng , I've resolved the conflicts. |
zhongjiajie
left a comment
There was a problem hiding this comment.
LGTM overall, one more things, we should add our new adding docs remote-logging.md to https://github.com/apache/dolphinscheduler/blob/dev/docs/configs/docsdev.js file, otherwise your new docs will not show in our website
Sure, I'll look into it. |
|
SonarCloud Quality Gate failed. |
|
Hi, @zhongjiajie , I've add configs in |














Purpose of the pull request
Add support for writing task logs to OSS
Brief change log
Task log writing
remote.logging.enable=true(By default it's false)Task log reading
Log retention
Verify this pull request
manually tested
common.propertiesCreate a shell task
View the task log
delete the local task log file
View the task log again. The task log will be download from OSS.