Skip to content

【BUG】基于mapreduce的任务存在 reduce任务不被触发并且任务失败的BUG #1033

@liushuainudt

Description

@liushuainudt

Describe the bug
map阶段的最后一个任务 OMS_LAST_TASK 写入H2 数据库的后在任务派发定定时任务之前, ProcessorTracker Idle检测发生,并且条件成立,告知TaskTracker后会将,这个实例ID和IDLE的ProcessorTracker 刚刚待派发的任务修改为失败状态,然后后续的派发定时任务检测不到应该派遣的任务。因任务中有失败任务发生,导致了任务的失败。

To Reproduce
这个bug是偶发性的,不是100%能重现,但是map阶段的任务 时间结束120s -130s ,正好处于ProcessorTracker Idle检测的检测中,然后大量运行大概率会触发这个任务。

Expected behavior
在map任务中没有任务失败,reduce阶段应该能正常被触发和执行

Environment

  • PowerJob Version: [e.g. 4.3.2], 但是 应该所有版本都存在这个bug
  • Java Version: [e.g. OpenJDK 8]
  • OS: [e.g. CentOS 8.1]

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
分析过程:
1、在powerjob 的mapreduce 的map阶段 最后系统会生成一个OMS_LAST_TASK 的任务:
if (unfinishedNum == 0) {

            // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
            if (finishedNum == 0) {
                finished.set(true);
                result = SystemInstanceResult.TASK_INIT_FAILED;
            } else {
                ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

                switch (executeType) {

                    // STANDALONE 只有一个任务,完成即结束
                    case STANDALONE:
                        finished.set(true);
                        List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
                        if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                            result = SystemInstanceResult.UNKNOWN_BUG;
                            log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
                        } else {
                            result = allTask.get(0).getResult();
                            success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                        }
                        break;
                    // MAP 不关心结果,最简单
                    case MAP:
                        finished.set(true);
                        success = holder.failedNum == 0;
                        result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                        break;
                    // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
                    default:

                        Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
                        if (lastTaskOptional.isPresent()) {

                            // 存在则根据 reduce 任务来判断状态
                            TaskDO resultTask = lastTaskOptional.get();
                            TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());

                            if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
                                finished.set(true);
                                success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                                result = resultTask.getResult();
                            }

                        } else {

                            **// 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
                            TaskDO newLastTask = new TaskDO();
                            newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
                            newLastTask.setTaskId(LAST_TASK_ID);
                            newLastTask.setSubInstanceId(instanceId);
                            newLastTask.setAddress(workerRuntime.getWorkerAddress());
                            submitTask(Lists.newArrayList(newLastTask));**
                        }
                }
            }
        }

2、归属于 ProcessorTracker 的 TaskTracker 如果正好触发 idle 检测
往往的原因 是 前阶段的任务 运行超过120s, 如果idle条件成立, 则
ProcessorTracker 会告知对应的 TaskTracker,:
下面的代码每隔 10S 运行(定时向 TaskTracker 汇报(携带任务执行信息的心跳)) :
private class CheckerAndReporter implements Runnable {

    @Override
    @SuppressWarnings({"squid:S1066","squid:S3776"})
    public void run() {

        // 超时检查,如果超时则自动关闭 TaskTracker
        long interval = System.currentTimeMillis() - startTime;
        // 秒级任务的ProcessorTracker不应该关闭
        if (!TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
            if (interval > instanceInfo.getInstanceTimeoutMS()) {
                log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
                destroy();
                return;
            }
        }

        // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查
        if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
            lastIdleTime = -1;
            lastCompletedTaskCount = threadPool.getCompletedTaskCount();
        } else {
            if (lastIdleTime == -1) {
                lastIdleTime = System.currentTimeMillis();
            } else {
                long idleTime = System.currentTimeMillis() - lastIdleTime;
                if (idleTime > MAX_IDLE_TIME) {
                    log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);

                    // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
                    ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
                    statusReportReq.setAddress(workerRuntime.getWorkerAddress());
                    TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
                    destroy();
                    return;
                }
            }
        }

3、这个 TaskTracker会将属于这个 ProcessorTracker 任务修改为 失败:
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);

    // 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
    if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
        String idlePtAddress = heartbeatReq.getAddress();
        // 该 ProcessorTracker 已销毁,重置为初始状态
        ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
        List<TaskDO> unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
        if (!CollectionUtils.isEmpty(unfinishedTask)) {
            log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
            unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
        }
    }
}

4、这个 会导致本应该 触发的 任务(每5s 运行):
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {

    // 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理)
    TaskDO updateEntity = new TaskDO();
    updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
    // 写入处理该任务的 ProcessorTracker
    updateEntity.setAddress(processorTrackerAddress);
    boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity);
    if (!success) {
        log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName());
        return;
    }

    // 2. 更新 ProcessorTrackerStatus 状态
    ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
    // 3. 初始化缓存
    taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L));

    // 4. 任务派发
    TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress());
    TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter());

    log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
}

找不到对应待派发的任务,但是只有一个对应的失败任务,从而对应的mapreduce 任务
有任务失败,而导致整个任务失败。

5、日志辅助参考:
正常的任务日志是收到 OMS_LAST_TASK任务后,后续会被派发任务派发:
receive new tasks: [{taskId='9999', instanceId=735024762429898816, subInstanceId=735024762429898816, taskName='OMS_LAST_TASK', address='10.201.242.23:27777', status=1, result='null', failedCnt=0, createdTime=1731019654519, lastModifiedTime=1731019654519, lastReportTime=-1}]
行 113093: 06:47:35.535[ctttp-735024762429898816-1]DEBUG t.p.w.c.t.task.heavy.HeavyTaskTracker - [TaskTracker-735024762429898816] dispatch task(taskId=9999,taskName=OMS_LAST_TASK) successfully.

失败的任务则是收到MS_LAST_TASK任务后,没有被派发,因为这时候状态已经是失败的状态了(被receiveProcessorTrackerHeartbeat改了):
receive new tasks: [{taskId='9999', instanceId=735024762274709568, subInstanceId=735024762274709568, taskName='OMS_LAST_TASK',

receive heartbeat: ProcessorTrackerStatusReportReq(type=1, instanceId=735024762274709568, time=1731019

同时 然后会出现这样的日志:
ProcessorTracker is idle now but have unfinished tasks:
bug-1

但是任务处于待派遣状态,

进一步看接受任务和kill 的时间点非常接近,可以认为是同时发生(idle检测成功在派发任务之前):
bug-2

问题总结: map阶段的最后一个任务 OMS_LAST_TASK 写入H2 数据库的后在任务派发定定时任务之前, ProcessorTracker Idle检测发生,并且条件成立,告知TaskTracker后会将,这个实例ID和IDLE的ProcessorTracker 刚刚待派发的任务修改为失败状态,然后后续的派发定时任务检测不到应该派遣的任务。因任务中有失败任务发生,导致了任务的失败。

5、解决方案参考:
在 TaskTracker 接收到空闲报告时,增加额外验证 这个 可能无效,因为 CheckerAndReporter 已经把 ProcessorTracker 销毁了,问题的关键在于 ProcessorTracker 的 CheckerAndReporter 过早地进行了销毁操作。应该先从源头解决这个问题,修改 ProcessorTracker 的检测逻辑:

private class CheckerAndReporter implements Runnable {
@OverRide
public void run() {
try {
// 1. 先检查是否还有待处理的定时任务
if (hasAnyPendingTasks()) {
// 重置空闲时间
lastIdleTime = -1;
return;
}

        // 2. 检查线程池状态
        if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
            lastIdleTime = -1;
            lastCompletedTaskCount = threadPool.getCompletedTaskCount();
     } else {
                long idleTime = System.currentTimeMillis() - lastIdleTime;
                if (idleTime > MAX_IDLE_TIME) {
                    // 3. 在销毁前再次确认是否真的没有待处理任务
                    if (!hasAnyPendingTasks()) {
                        log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, will destroy self.", 
                            instanceId, idleTime);

。。。

} else {
log.info("[ProcessorTracker-{}] Cancel idle report as new tasks found",
instanceId);
}

private boolean hasAnyPendingTasks() {
    // 综合检查是否有任何形式的待处理任务,下面的代码仅仅是形式化的代码
    return hasScheduledPeriodicTasks() || 
           !taskQueue.isEmpty() || 
           threadPool.getActiveCount() > 0;
}

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions