Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Making JOIN Task Async failing some workflows #3528

@vamsibandarupalli

Description

@vamsibandarupalli

Describe the bug
After making JOIN task async some workflows are failing to execute successfully. JOIN task is failing with timeout. If workflow have DECISION tasks in FORK_JOIN branches JOIN whole workflow is failing cause JOIN task is failing with responseTimeOut error.
In the below provided workflow "mock_task" Task definition has 300 seconds timeout but failing even before JOIN starts.

Details
Conductor version: main branch
Persistence implementation: Dynomite
Queue implementation: Dynoqueues
Lock: Redis
Workflow definition:
{
"createTime": 1678401621130,
"updateTime": 1678308058828,
"accessPolicy": {},
"name": "fd-csd-flow_mock2",
"description": "fd-csd-flow",
"version": 1,
"tasks": [
{
"name": "mock_task",
"taskReferenceName": "InputJQ",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | {filters:[.filters[] | {repository :.repository, "filters":[.fields[] | {(.name):.values}] | add}],"filterflag": (.filters | length > 0)}",
"JSONJQINPUT": "${workflow.input.body}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "fork-join-0",
"inputParameters": {},
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "mock_task",
"taskReferenceName": "ARSCInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "arsc") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){'arsc';}else {for(var i = 0; i <$.filter.length; i++) { if ($.filter[i].repository == 'arsc'){ 'arsc'; break; }}}",
"decisionCases": {
"arsc": [
{
"name": "mock_task",
"taskReferenceName": "ARSCTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "arsc",
"response": "${ARSCTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCDecision_join",
"inputParameters": {
"output": "${ARSCJQT.output.result}",
"ARSCTaskOutput": "${ARSCTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${ARSCDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
[
{
"name": "mock_task",
"taskReferenceName": "OEEHInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "oeeh") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){\n\t'oeeh';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'oeeh'){ \n 'oeeh';\n break;\n }\n}\n}",
"decisionCases": {
"oeeh": [
{
"name": "mock_task",
"taskReferenceName": "OEEHTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "oeeh",
"response": "${OEEHTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHDecision_join",
"inputParameters": {
"output": "${OEEHJQT.output.result}",
"OEEHTaskOutput": "${OEEHTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${OEEHDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
[
{
"name": "mock_task",
"taskReferenceName": "APSVInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "apsv") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){\n\t'apsv';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'apsv'){ \n 'apsv';\n break;\n }\n}\n}",
"decisionCases": {
"apsv": [
{
"name": "mock_task",
"taskReferenceName": "APSVTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "apsv",
"response": "${APSVTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVDecision_join",
"inputParameters": {
"output": "${APSVJQT.output.result}",
"APSVTaskOutput": "${APSVTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${APSVDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
],
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "join-0",
"inputParameters": {},
"type": "JOIN",
"startDelay": 0,
"joinOn": [
"ARSCFilter",
"OEEHFilter",
"APSVFilter"
],
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "CSDMerge",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"arsc_jq": "${ARSCFilter.output.result}",
"oeeh_jq": "${OEEHFilter.output.result}",
"apsv_jq": "${APSVFilter.output.result}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
"inputParameters": [],
"outputParameters": {
"searchText": "${workflow.input.body.text}",
"applications": [
"${CSDMerge.output.result}"
]
},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"ownerEmail": "test@gmail.com",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {},
"inputTemplate": {}
}
Task definition:
{
"createTime": 1678399938459,
"createdBy": "",
"accessPolicy": {},
"name": "mock_task",
"description": "mock task request",
"retryCount": 0,
"timeoutSeconds": 0,
"inputKeys": [],
"outputKeys": [],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 2,
"responseTimeoutSeconds": 300,
"inputTemplate": {},
"rateLimitPerFrequency": 0,
"rateLimitFrequencyInSeconds": 1,
"ownerEmail": "test@test.com",
"backoffScaleFactor": 1
}

To Reproduce
Steps to reproduce the behavior:

  1. Use the above given workflow and task definitions to reproduce the issue
  2. Use this input for workflow
    {
    "body": {
    "text": "101",
    "version": 3,
    "logicalId": "lid://demo-products.1",
    "page_number": 1,
    "page_size": 20,
    "userContext": null,
    "filters": []
    }
    }
  3. See error
    conductor-server_1 | 2782317 [http-nio-8080-exec-3] INFO com.netflix.conductor.core.execution.WorkflowExecutor [] - Execution terminated of workflow: fd-csd-flow_mock2.1/1e79652f-0a2c-40d2-a9e9-9acec064cb4e.RUNNING
    conductor-server_1 | com.netflix.conductor.core.exception.TerminateWorkflowException: responseTimeout: 300 exceeded for the taskId: cc8dc3ff-3e09-406a-9867-ab7c6cc7747f with Task Definition: JOIN
    conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.retry(DeciderService.java:548) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:205) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:109) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1007) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.handleWorkflowEvaluationEvent(WorkflowExecutor.java:962) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]

Expected behavior
Expect JOIN task to run without error.

Additional context
When started workflow JOIN task scheduled is next here. JOIN is getting scheduled even before updating the taskUpdated time, so JOIN task is failing this check here and timing out.

I'll try to debug and will raise merge request if found anything.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions