-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Making JOIN Task Async failing some workflows #3528
Description
Describe the bug
After making JOIN task async some workflows are failing to execute successfully. JOIN task is failing with timeout. If workflow have DECISION tasks in FORK_JOIN branches JOIN whole workflow is failing cause JOIN task is failing with responseTimeOut error.
In the below provided workflow "mock_task" Task definition has 300 seconds timeout but failing even before JOIN starts.
Details
Conductor version: main branch
Persistence implementation: Dynomite
Queue implementation: Dynoqueues
Lock: Redis
Workflow definition:
{
"createTime": 1678401621130,
"updateTime": 1678308058828,
"accessPolicy": {},
"name": "fd-csd-flow_mock2",
"description": "fd-csd-flow",
"version": 1,
"tasks": [
{
"name": "mock_task",
"taskReferenceName": "InputJQ",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | {filters:[.filters[] | {repository :.repository, "filters":[.fields[] | {(.name):.values}] | add}],"filterflag": (.filters | length > 0)}",
"JSONJQINPUT": "${workflow.input.body}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "fork-join-0",
"inputParameters": {},
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "mock_task",
"taskReferenceName": "ARSCInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "arsc") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){'arsc';}else {for(var i = 0; i <$.filter.length; i++) { if ($.filter[i].repository == 'arsc'){ 'arsc'; break; }}}",
"decisionCases": {
"arsc": [
{
"name": "mock_task",
"taskReferenceName": "ARSCTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "arsc",
"response": "${ARSCTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCDecision_join",
"inputParameters": {
"output": "${ARSCJQT.output.result}",
"ARSCTaskOutput": "${ARSCTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${ARSCDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
[
{
"name": "mock_task",
"taskReferenceName": "OEEHInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "oeeh") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){\n\t'oeeh';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'oeeh'){ \n 'oeeh';\n break;\n }\n}\n}",
"decisionCases": {
"oeeh": [
{
"name": "mock_task",
"taskReferenceName": "OEEHTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "oeeh",
"response": "${OEEHTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHDecision_join",
"inputParameters": {
"output": "${OEEHJQT.output.result}",
"OEEHTaskOutput": "${OEEHTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${OEEHDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
[
{
"name": "mock_task",
"taskReferenceName": "APSVInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "apsv") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){\n\t'apsv';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'apsv'){ \n 'apsv';\n break;\n }\n}\n}",
"decisionCases": {
"apsv": [
{
"name": "mock_task",
"taskReferenceName": "APSVTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "apsv",
"response": "${APSVTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVDecision_join",
"inputParameters": {
"output": "${APSVJQT.output.result}",
"APSVTaskOutput": "${APSVTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${APSVDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
],
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "join-0",
"inputParameters": {},
"type": "JOIN",
"startDelay": 0,
"joinOn": [
"ARSCFilter",
"OEEHFilter",
"APSVFilter"
],
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "CSDMerge",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"arsc_jq": "${ARSCFilter.output.result}",
"oeeh_jq": "${OEEHFilter.output.result}",
"apsv_jq": "${APSVFilter.output.result}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
"inputParameters": [],
"outputParameters": {
"searchText": "${workflow.input.body.text}",
"applications": [
"${CSDMerge.output.result}"
]
},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"ownerEmail": "test@gmail.com",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {},
"inputTemplate": {}
}
Task definition:
{
"createTime": 1678399938459,
"createdBy": "",
"accessPolicy": {},
"name": "mock_task",
"description": "mock task request",
"retryCount": 0,
"timeoutSeconds": 0,
"inputKeys": [],
"outputKeys": [],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 2,
"responseTimeoutSeconds": 300,
"inputTemplate": {},
"rateLimitPerFrequency": 0,
"rateLimitFrequencyInSeconds": 1,
"ownerEmail": "test@test.com",
"backoffScaleFactor": 1
}
To Reproduce
Steps to reproduce the behavior:
- Use the above given workflow and task definitions to reproduce the issue
- Use this input for workflow
{
"body": {
"text": "101",
"version": 3,
"logicalId": "lid://demo-products.1",
"page_number": 1,
"page_size": 20,
"userContext": null,
"filters": []
}
} - See error
conductor-server_1 | 2782317 [http-nio-8080-exec-3] INFO com.netflix.conductor.core.execution.WorkflowExecutor [] - Execution terminated of workflow: fd-csd-flow_mock2.1/1e79652f-0a2c-40d2-a9e9-9acec064cb4e.RUNNING
conductor-server_1 | com.netflix.conductor.core.exception.TerminateWorkflowException: responseTimeout: 300 exceeded for the taskId: cc8dc3ff-3e09-406a-9867-ab7c6cc7747f with Task Definition: JOIN
conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.retry(DeciderService.java:548) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:205) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:109) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1007) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.handleWorkflowEvaluationEvent(WorkflowExecutor.java:962) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
Expected behavior
Expect JOIN task to run without error.
Additional context
When started workflow JOIN task scheduled is next here. JOIN is getting scheduled even before updating the taskUpdated time, so JOIN task is failing this check here and timing out.
I'll try to debug and will raise merge request if found anything.