-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Describe the bug
map阶段的最后一个任务 OMS_LAST_TASK 写入H2 数据库的后在任务派发定定时任务之前, ProcessorTracker Idle检测发生,并且条件成立,告知TaskTracker后会将,这个实例ID和IDLE的ProcessorTracker 刚刚待派发的任务修改为失败状态,然后后续的派发定时任务检测不到应该派遣的任务。因任务中有失败任务发生,导致了任务的失败。
To Reproduce
这个bug是偶发性的,不是100%能重现,但是map阶段的任务 时间结束120s -130s ,正好处于ProcessorTracker Idle检测的检测中,然后大量运行大概率会触发这个任务。
Expected behavior
在map任务中没有任务失败,reduce阶段应该能正常被触发和执行
Environment
- PowerJob Version: [e.g. 4.3.2], 但是 应该所有版本都存在这个bug
- Java Version: [e.g. OpenJDK 8]
- OS: [e.g. CentOS 8.1]
Screenshots
If applicable, add screenshots to help explain your problem.
Additional context
分析过程:
1、在powerjob 的mapreduce 的map阶段 最后系统会生成一个OMS_LAST_TASK 的任务:
if (unfinishedNum == 0) {
// 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
if (finishedNum == 0) {
finished.set(true);
result = SystemInstanceResult.TASK_INIT_FAILED;
} else {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
switch (executeType) {
// STANDALONE 只有一个任务,完成即结束
case STANDALONE:
finished.set(true);
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
result = SystemInstanceResult.UNKNOWN_BUG;
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
} else {
result = allTask.get(0).getResult();
success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
}
break;
// MAP 不关心结果,最简单
case MAP:
finished.set(true);
success = holder.failedNum == 0;
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
break;
// MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
default:
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
if (lastTaskOptional.isPresent()) {
// 存在则根据 reduce 任务来判断状态
TaskDO resultTask = lastTaskOptional.get();
TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
finished.set(true);
success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
result = resultTask.getResult();
}
} else {
**// 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
TaskDO newLastTask = new TaskDO();
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(LAST_TASK_ID);
newLastTask.setSubInstanceId(instanceId);
newLastTask.setAddress(workerRuntime.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask));**
}
}
}
}
2、归属于 ProcessorTracker 的 TaskTracker 如果正好触发 idle 检测
往往的原因 是 前阶段的任务 运行超过120s, 如果idle条件成立, 则
ProcessorTracker 会告知对应的 TaskTracker,:
下面的代码每隔 10S 运行(定时向 TaskTracker 汇报(携带任务执行信息的心跳)) :
private class CheckerAndReporter implements Runnable {
@Override
@SuppressWarnings({"squid:S1066","squid:S3776"})
public void run() {
// 超时检查,如果超时则自动关闭 TaskTracker
long interval = System.currentTimeMillis() - startTime;
// 秒级任务的ProcessorTracker不应该关闭
if (!TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
if (interval > instanceInfo.getInstanceTimeoutMS()) {
log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
destroy();
return;
}
}
// 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查
if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
lastIdleTime = -1;
lastCompletedTaskCount = threadPool.getCompletedTaskCount();
} else {
if (lastIdleTime == -1) {
lastIdleTime = System.currentTimeMillis();
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
// 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
statusReportReq.setAddress(workerRuntime.getWorkerAddress());
TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
destroy();
return;
}
}
}
3、这个 TaskTracker会将属于这个 ProcessorTracker 任务修改为 失败:
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
String idlePtAddress = heartbeatReq.getAddress();
// 该 ProcessorTracker 已销毁,重置为初始状态
ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
List<TaskDO> unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
}
}
}
4、这个 会导致本应该 触发的 任务(每5s 运行):
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
// 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
// 写入处理该任务的 ProcessorTracker
updateEntity.setAddress(processorTrackerAddress);
boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
if (!success) {
log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
return;
}
// 2. 更新 ProcessorTrackerStatus 状态
ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
// 3. 初始化缓存
taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));
// 4. 任务派发
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
}
找不到对应待派发的任务,但是只有一个对应的失败任务,从而对应的mapreduce 任务
有任务失败,而导致整个任务失败。
5、日志辅助参考:
正常的任务日志是收到 OMS_LAST_TASK任务后,后续会被派发任务派发:
receive new tasks: [{taskId='9999', instanceId=735024762429898816, subInstanceId=735024762429898816, taskName='OMS_LAST_TASK', address='10.201.242.23:27777', status=1, result='null', failedCnt=0, createdTime=1731019654519, lastModifiedTime=1731019654519, lastReportTime=-1}]
行 113093: 06:47:35.535[ctttp-735024762429898816-1]DEBUG t.p.w.c.t.task.heavy.HeavyTaskTracker - [TaskTracker-735024762429898816] dispatch task(taskId=9999,taskName=OMS_LAST_TASK) successfully.
失败的任务则是收到MS_LAST_TASK任务后,没有被派发,因为这时候状态已经是失败的状态了(被receiveProcessorTrackerHeartbeat改了):
receive new tasks: [{taskId='9999', instanceId=735024762274709568, subInstanceId=735024762274709568, taskName='OMS_LAST_TASK',
receive heartbeat: ProcessorTrackerStatusReportReq(type=1, instanceId=735024762274709568, time=1731019
同时 然后会出现这样的日志:
ProcessorTracker is idle now but have unfinished tasks:

但是任务处于待派遣状态,
进一步看接受任务和kill 的时间点非常接近,可以认为是同时发生(idle检测成功在派发任务之前):

问题总结: map阶段的最后一个任务 OMS_LAST_TASK 写入H2 数据库的后在任务派发定定时任务之前, ProcessorTracker Idle检测发生,并且条件成立,告知TaskTracker后会将,这个实例ID和IDLE的ProcessorTracker 刚刚待派发的任务修改为失败状态,然后后续的派发定时任务检测不到应该派遣的任务。因任务中有失败任务发生,导致了任务的失败。
5、解决方案参考:
在 TaskTracker 接收到空闲报告时,增加额外验证 这个 可能无效,因为 CheckerAndReporter 已经把 ProcessorTracker 销毁了,问题的关键在于 ProcessorTracker 的 CheckerAndReporter 过早地进行了销毁操作。应该先从源头解决这个问题,修改 ProcessorTracker 的检测逻辑:
private class CheckerAndReporter implements Runnable {
@OverRide
public void run() {
try {
// 1. 先检查是否还有待处理的定时任务
if (hasAnyPendingTasks()) {
// 重置空闲时间
lastIdleTime = -1;
return;
}
// 2. 检查线程池状态
if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
lastIdleTime = -1;
lastCompletedTaskCount = threadPool.getCompletedTaskCount();
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
// 3. 在销毁前再次确认是否真的没有待处理任务
if (!hasAnyPendingTasks()) {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, will destroy self.",
instanceId, idleTime);
。。。
} else {
log.info("[ProcessorTracker-{}] Cancel idle report as new tasks found",
instanceId);
}
private boolean hasAnyPendingTasks() {
// 综合检查是否有任何形式的待处理任务,下面的代码仅仅是形式化的代码
return hasScheduledPeriodicTasks() ||
!taskQueue.isEmpty() ||
threadPool.getActiveCount() > 0;
}