Skip to content

Conversation

@RobertIndie
Copy link
Member

PIP: #21079

Motivation

Please see PIP-297:

pulsar/pip/pip-297.md

Lines 71 to 91 in 3cb7926

Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown
outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler
cannot handle the fatal exceptions that are thrown outside the function instance thread.
For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If
any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will
not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue
here: https://github.com/apache/pulsar/issues/9464
The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from
an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has
been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
the notifyError method to the `PushSource` class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this
does not solve the same problem that all source connectors face because not all connectors are implemented based on
the `PushSource` class.
The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function
framework. We need to provide a way for the function developer to implement it.
We need a way for the connector and function developers to throw fatal exceptions outside the function instance
thread. The function framework should catch these exceptions and terminate the function accordingly.

Modifications

pulsar/pip/pip-297.md

Lines 108 to 112 in 3cb7926

Introduce a new method `fatal` to the context. All the connector implementation code and the function code
can use this context and call the `fatal` method to terminate the instance while raising a fatal exception.
After the connector or function raises the fatal exception, the function instance thread will be interrupted.
The function framework then could catch the exception, log it, and then terminate the function instance.

Verifying this change

This change added tests.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@RobertIndie RobertIndie added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/function labels Sep 7, 2023
@RobertIndie RobertIndie added this to the 3.2.0 milestone Sep 7, 2023
@RobertIndie RobertIndie self-assigned this Sep 7, 2023
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Sep 7, 2023
@RobertIndie RobertIndie marked this pull request as ready for review September 8, 2023 01:33
@codecov-commenter
Copy link

codecov-commenter commented Sep 13, 2023

Codecov Report

❌ Patch coverage is 91.66667% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.16%. Comparing base (c675a3d) to head (17aa607).
⚠️ Report is 1758 commits behind head on master.

Files with missing lines Patch % Lines
...ulsar/functions/instance/JavaInstanceRunnable.java 90.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #21143       +/-   ##
=============================================
+ Coverage     33.37%   73.16%   +39.79%     
- Complexity    12158    32456    +20298     
=============================================
  Files          1623     1887      +264     
  Lines        127410   140080    +12670     
  Branches      13933    15429     +1496     
=============================================
+ Hits          42527   102496    +59969     
+ Misses        79258    29472    -49786     
- Partials       5625     8112     +2487     
Flag Coverage Δ
inttests 24.19% <0.00%> (+0.12%) ⬆️
systests 25.07% <45.83%> (?)
unittests 72.45% <91.66%> (+40.55%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...a/org/apache/pulsar/functions/api/BaseContext.java 0.00% <ø> (ø)
.../apache/pulsar/functions/instance/ContextImpl.java 61.37% <100.00%> (+39.04%) ⬆️
...ulsar/functions/instance/JavaInstanceRunnable.java 71.60% <90.00%> (+25.12%) ⬆️

... and 1533 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@RobertIndie RobertIndie merged commit 1ac19fc into apache:master Sep 14, 2023
@RobertIndie RobertIndie deleted the pip-297 branch September 14, 2023 01:51
RobertIndie added a commit that referenced this pull request Sep 25, 2023
…1204)

### Motivation

We have introduced the `SinkContext.fatal` method in #21143
This PR use this fatal method to handle the elastic sink connector exception correctly.

### Modifications

- Use `SinkContext.fatal` to throw connector exception
- Add state to the ElasticSearchClient
liangyuanpeng pushed a commit to liangyuanpeng/pulsar that referenced this pull request Oct 11, 2023
…ache#21204)

### Motivation

We have introduced the `SinkContext.fatal` method in apache#21143
This PR use this fatal method to handle the elastic sink connector exception correctly.

### Modifications

- Use `SinkContext.fatal` to throw connector exception
- Add state to the ElasticSearchClient
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/function doc-required Your PR changes impact docs and you will update later. ready-to-test type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants