Skip to content

Add rows processor to GenericTransfer#61143

Merged
potiuk merged 37 commits into
apache:mainfrom
dabla:feature/add-rows-processor-generic-transfer
Feb 10, 2026
Merged

Add rows processor to GenericTransfer#61143
potiuk merged 37 commits into
apache:mainfrom
dabla:feature/add-rows-processor-generic-transfer

Conversation

@dabla

@dabla dabla commented Jan 27, 2026

Copy link
Copy Markdown
Contributor

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

This PR extends the GenericTransfer operator with a rows_processor parameter, similar to SQLInsertRowsOperator.
The rows_processor allows users to post-process rows returned by the source hook before they are inserted into the destination hook.

This enables lightweight transformations (filtering, normalization, enrichment, etc.) without introducing an intermediate task, while keeping database-specific type handling encapsulated in the hook itself.

def rows_processor(rows, **context):
    # Example: normalize string values before insertion
    return [
        tuple(value.strip() if isinstance(value, str) else value for value in row)
        for row in rows
    ]

GenericTransfer(
    task_id="transfer_data",
    source_conn_id="source_db",
    destination_conn_id="target_db",
    sql="SELECT id, name FROM source_table",
    destination_table="target_table",
    rows_processor=rows_processor,
)

In addition, this PR makes SQLExecuteQueryTrigger non-blocking when running on Airflow 3.2 or higher.

Previously, the trigger had to remain blocking due to a bug in the CommsDecoder that could raise a “Response read out of order” error when handling asynchronous trigger events. With this issue fixed in Airflow 3.2, the trigger can now safely run in a non-blocking mode, improving triggerer scalability and resource usage.


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@dabla dabla requested a review from ephraimbuddy January 28, 2026 19:17

@Nataneljpwd Nataneljpwd left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, I have left a few comments and questions, mainly nitpicks

Comment thread providers/common/sql/src/airflow/providers/common/sql/triggers/sql.py Outdated

@Nataneljpwd Nataneljpwd left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!
Only 1 question regarding the python interface file

@dabla

dabla commented Feb 1, 2026

Copy link
Copy Markdown
Contributor Author

Looks great! Only 1 question regarding the python interface file

Thx for to review @Nataneljpwd, really appreciated, well done! I think I've fixed all issues, awaiting CI/CD now.

@dabla dabla requested a review from Nataneljpwd February 1, 2026 16:33

@Nataneljpwd Nataneljpwd left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, 1 minor nitpick for type hinting, other than that, looks great

@dabla dabla requested a review from Nataneljpwd February 2, 2026 18:21
@dabla

dabla commented Feb 3, 2026

Copy link
Copy Markdown
Contributor Author

@Nataneljpwd thx for the review and helping fixing the mypy issues!

@Nataneljpwd Nataneljpwd left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks amazing, nice work!

Comment thread providers/common/sql/src/airflow/providers/common/sql/triggers/sql.pyi Outdated
Comment thread providers/common/sql/src/airflow/providers/common/sql/operators/sql.py Outdated
@dabla dabla requested a review from shahar1 February 6, 2026 17:39
@potiuk potiuk merged commit d70216c into apache:main Feb 10, 2026
90 checks passed
Alok-kumar-priyadarshi pushed a commit to Alok-kumar-priyadarshi/airflow that referenced this pull request Feb 11, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Ratasa143 pushed a commit to Ratasa143/airflow that referenced this pull request Feb 15, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
choo121600 pushed a commit to choo121600/airflow that referenced this pull request Feb 22, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
AkshayArali pushed a commit to AkshayArali/airflow_630 that referenced this pull request Feb 27, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
AkshayArali pushed a commit to AkshayArali/airflow_630 that referenced this pull request Feb 27, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Subham-KRLX pushed a commit to Subham-KRLX/airflow that referenced this pull request Mar 4, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
dominikhei pushed a commit to dominikhei/airflow that referenced this pull request Mar 11, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Ankurdeewan pushed a commit to Ankurdeewan/airflow that referenced this pull request Mar 15, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
radhwene pushed a commit to radhwene/airflow that referenced this pull request Mar 21, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants