Skip to content

Allow Operators to specify SKIPPED status internally#1292

Merged
jlowin merged 5 commits into
apache:masterfrom
withnale:skip_exception
Apr 6, 2016
Merged

Allow Operators to specify SKIPPED status internally#1292
jlowin merged 5 commits into
apache:masterfrom
withnale:skip_exception

Conversation

@withnale

@withnale withnale commented Apr 4, 2016

Copy link
Copy Markdown
Contributor

NB: For discussion at present. Not expecting this PR to be merged as-is

Allow Operators to specify SKIPPED status internally

At present there is no clean way to specify that a downstream tasks should be skipped. The current options are:

  • generate an Exception for an Operator but this is messy since the task will flag as ERROR in the web interface. We only want Exceptions to occur when things are not 'working as expected' and someone might need to investigate
  • use the BranchOperator or ShortCircuit operator. However, both of these modify the Session directly and do not honour the DAG properly. For example, if an Operator is marked with 'one_success' this will never get run because it is artificially marked as SKIPPED, even if one of it's other upstream dependencies is successful.

Rather than have operators manipulate the session directly this should be provided centrally.

PR: Specify SKIPPED via Exception

The following PullRequest implements an Exception (AirflowSkipException - although since it isn't flagging an error state maybe a name change?) which when generated by an Operator indicates that the job completed without errors but should not be considered as successful.

The scheduler during it's normal DAG traversal looking for task instances to schedule will the see the state 'SKIPPED' and will use this in its decision making.

An example DAG is provided which shows two distinct trees using this approach. One of them includes a join using trigger_rule='all_success' and the other uses the rule='one_success', which probably explains things better than I can. In this circumstance, the DummySkipOperator just raises AirflowSkipException immediately. The example DAG graph from the webserver can be seen below:

example workflow

I have also included an additional parameter to the BaseSensorOperator called soft_fail (defaults to false). If this is set to true, the sensor will error with a SKIPPED rather than an error. This should allow general sensors to encapsulate optional logic without generating errors on the DAG webserver frontpage.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health decreased by 0.03% when pulling 25fe6fc on withnale:skip_exception into 5ab2567 on airbnb:master.

@jlowin

jlowin commented Apr 4, 2016

Copy link
Copy Markdown
Member

I really like this idea. In fact I would like to use this to eliminate the short circuit operator (because even though it was created to make a confusing situation simpler, it's still pretty confusing). With a pre_execute() hook that possibly raises an AirflowSkipException, ANY operator can short-circuit.

I still need to look at the implementation but here are a couple thoughts:

  • please add some unit tests. I have some open PRs that make these sort of tests easier; let me know if you'd like help setting them up.
  • errors are the primary means of communicating state to processes in Airflow. I think it would be best to subclass AirflowException and maybe call this something less frightening like AirflowOperatorSkip
  • following up that idea, it would be nice to analogously raise AirflowOperatorFail or AirflowDagDeadlock as appropriate... If they were also subclasses that could be done with minimal code changes. That could be a separate PR.

@jlowin jlowin self-assigned this Apr 4, 2016
@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.01% when pulling a900a52 on withnale:skip_exception into 5ab2567 on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.02% when pulling f7cb593 on withnale:skip_exception into 0bae60f on airbnb:master.

@coveralls

Copy link
Copy Markdown

Coverage Status

Coverage decreased (-0.009%) to 67.05% when pulling f7cb59302cb840b2ae83cc9e527ebf5ab27b6460 on withnale:skip_exception into 0bae60f on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.04% when pulling 87bb2e2 on withnale:skip_exception into 0bae60f on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.04% when pulling eba92ad on withnale:skip_exception into 0bae60f on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.11% when pulling 7dc3eb7 on withnale:skip_exception into 0bae60f on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.11% when pulling 932b8e2 on withnale:skip_exception into a8234d0 on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.11% when pulling 9ba7fcb on withnale:skip_exception into a8234d0 on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.11% when pulling d8ae115 on withnale:skip_exception into a8234d0 on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.11% when pulling 6a56b49 on withnale:skip_exception into a8234d0 on airbnb:master.

Comment thread tests/models.py
self.assertEqual(ti.state, State.SUCCESS)

@parameterized.expand([
#

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a header comment here listing all the fields, maybe abbreviated? I know they're in the function signature but at first glance it's hard to tell which of the numbers (5, 0, 0, 0, 0, 0) goes with which field. A header would help.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.13% when pulling fc26245 on withnale:skip_exception into a8234d0 on airbnb:master.

@coveralls

Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.2%) to 67.864% when pulling fc26245 on withnale:skip_exception into a8234d0 on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.13% when pulling 297cfb8 on withnale:skip_exception into a8234d0 on airbnb:master.

@coveralls

Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.3%) to 68.044% when pulling 297cfb8 on withnale:skip_exception into a8234d0 on airbnb:master.

@landscape-bot

Copy link
Copy Markdown

Code Health
Repository health increased by 0.05% when pulling 81b93d3 on withnale:skip_exception into a8234d0 on airbnb:master.

@coveralls

Copy link
Copy Markdown

Coverage Status

Coverage increased (+0.3%) to 68.031% when pulling 81b93d3 on withnale:skip_exception into a8234d0 on airbnb:master.

@jlowin jlowin merged commit 81ff5cc into apache:master Apr 6, 2016
@jlowin

jlowin commented Apr 6, 2016

Copy link
Copy Markdown
Member

Thank you!

@r39132

r39132 commented Apr 17, 2016

Copy link
Copy Markdown
Contributor

👏 As an aside, when using the Branch operator, the second assumption above is not exactly accurate, specifically For example, if an Operator is marked with 'one_success' this will never get run because it is artificially marked as SKIPPED, even if one of it's other upstream dependencies is successful.

I agree it is a stumbling point and hence welcome the work in this PR.

screenshot 2016-04-17 15 46 31

In the above example, I branch at check_for_time_to_build_model_branch_condition and join at wait_for_previous_hour with a trigger rule of ONE_SUCCESS. I needed to insert a dummy operator prejoin_preagg_dummy_job to make this work unfortunately, because if I didn't, wait_for_previous_hour would always be skipped as mentioned by @withnale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants