Skip to content

[Data] Emit events rather than just logs for detected issues#55717

Merged
alexeykudinkin merged 1 commit intoray-project:masterfrom
coqian:coqian/issue-export
Sep 10, 2025
Merged

[Data] Emit events rather than just logs for detected issues#55717
alexeykudinkin merged 1 commit intoray-project:masterfrom
coqian:coqian/issue-export

Conversation

@coqian
Copy link
Copy Markdown
Contributor

@coqian coqian commented Aug 18, 2025

Why are these changes needed?

In Ray Data, we have an issue detection feature, but they are only available as printed logs. We also want to show these insights at our Data Dashboard for users to debug their code, by exporting these events.

Example export event:

{
  "event_id": "Ce4Ee1B7AFd9Cb13B3",
  "timestamp": 1756423457,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "ReadImage->Map(preprocess_image)_1",
    "operator_name": "ReadImage->Map(preprocess_image)",
    "event_time": 1756423457.9333386,
    "event_type": "ISSUE_DETECTION_HIGH_MEMORY",
    "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n"
  }
},
{
  "event_id": "bE831Ee49c4AadAcFB",
  "timestamp": 1756423458,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "MapBatches(ResnetModel)_2",
    "operator_name": "MapBatches(ResnetModel)",
    "event_time": 1756423458.1420121,
    "event_type": "ISSUE_DETECTION_HANGING",
    "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues."
  }
}

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@coqian coqian force-pushed the coqian/issue-export branch 5 times, most recently from 6b883c4 to ac9b742 Compare August 19, 2025 18:08
@coqian coqian changed the title [WIP][Data] Emit events rather than just logs for detected issues [Data] Emit events rather than just logs for detected issues Aug 19, 2025
@coqian coqian marked this pull request as ready for review August 19, 2025 18:10
@coqian coqian requested review from a team as code owners August 19, 2025 18:10
@ray-gardener ray-gardener bot added data Ray Data-related issues observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Aug 19, 2025
Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall makes sense. Some general comments:

  • Let's use dataset rather than data to avoid overloading data (e.g. not using double data in ExportDataOperatorEventData). Left some comments about this, but definitely missed some.
  • I was a little confused about how extensible we want this to be. Do we want to support more than just IssueDetection with this?
  • Can you update the PR description with some examples of what the protos exported looks like.

Thank you!

double event_time = 4;

// The type of the event
string event_type = 5;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make this an enum?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added enum for this field. At this moment, we only have two issue types.

return proto_operator_event_data


def get_operator_event_exporter() -> "OperatorEventExporter":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to make this configurable like we have for the dataset metadata export api?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we want to always enable it for either debugging or reporting purpose.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just follow the same pattern we have for the existing ray data exporting, like what we have configurable here.

incorporating a position or index (e.g., "ReadParquet_0")
operator_name: The name of the operator.
event_time: The timestamp when the event is emitted (in seconds since epoch).
event_type: The type of the event.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the types of event types? I thought that this was the type of issue, but is your intention to support more than just issues with this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention here is to make this event export a general way for all of the operator level events. Issue detection is one type, and the event types can be hanging / high memory. In the future, we can have more events during the operator lifecycle, in addition to issues, such as start / finish / failed.

@coqian coqian force-pushed the coqian/issue-export branch 2 times, most recently from e59ed2d to 84fac96 Compare August 21, 2025 02:10
@coqian coqian force-pushed the coqian/issue-export branch from 84fac96 to 12455bd Compare August 29, 2025 00:43
@coqian
Copy link
Copy Markdown
Contributor Author

coqian commented Aug 29, 2025

Overall makes sense. Some general comments:

  • Let's use dataset rather than data to avoid overloading data (e.g. not using double data in ExportDataOperatorEventData). Left some comments about this, but definitely missed some.
  • I was a little confused about how extensible we want this to be. Do we want to support more than just IssueDetection with this?
  • Can you update the PR description with some examples of what the protos exported looks like.

Thank you!

Renamed the variables/classes and added example export events to the PR description.

@coqian coqian force-pushed the coqian/issue-export branch from 12455bd to 0253479 Compare August 29, 2025 00:53
Copy link
Copy Markdown
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stamp from core ownership, defer to data team for review ;)

Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think only two things left to clean up are to maybe get rid of the enum dict and just directly use the enum and then mimic the create_if_enable method here for consistency.

logger = logging.getLogger(__name__)


ISSUE_TYPE_TO_EXPORT_OPERATOR_EVENT_TYPE = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this dict? Can we just directly use the enum for this? Maybe we can have an enum to enum matching?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should define the enum in operator_event_exporter.py? Although it seems like we should not need two enums if they are functionally the same. If the main difference is that we want to prepend ISSUE_DETECTION as a prefix, we can just handle that below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omatthew98 just made the following updates:

  1. used the same create_if_enable pattern for the OperatorEventExporter
  2. removed the dict, and added a separate function to format issue event name and prepend the ISSUE_DETECTION prefix

@coqian coqian force-pushed the coqian/issue-export branch 2 times, most recently from 3add988 to 2706611 Compare September 8, 2025 21:39
Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating on this @coqian!

@omatthew98 omatthew98 added the go add ONLY when ready to merge, run all tests label Sep 8, 2025
// EXPORT_DATASET_OPERATOR type events from Ray Data operators.
message ExportDatasetOperatorEventData {
enum DatasetOperatorEventType {
ISSUE_DETECTION_HANGING = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First enum value has to be always unset (default)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aslonnie do we have proto linter (it should be catching theses things)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the enum, with the first value unset as UNSPECIFIED = 0

Signed-off-by: cong.qian <cong.qian@anyscale.com>
@coqian coqian force-pushed the coqian/issue-export branch from 2706611 to 2cb921f Compare September 8, 2025 23:57
@alexeykudinkin alexeykudinkin merged commit 08a7068 into ray-project:master Sep 10, 2025
5 checks passed
ZacAttack pushed a commit to ZacAttack/ray that referenced this pull request Sep 24, 2025
…ject#55717)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
In Ray Data, we have an issue detection feature, but they are only
available as printed logs. We also want to show these insights at our
Data Dashboard for users to debug their code, by exporting these events.

Example export event:
```
{
  "event_id": "Ce4Ee1B7AFd9Cb13B3",
  "timestamp": 1756423457,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "ReadImage->Map(preprocess_image)_1",
    "operator_name": "ReadImage->Map(preprocess_image)",
    "event_time": 1756423457.9333386,
    "event_type": "ISSUE_DETECTION_HIGH_MEMORY",
    "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n"
  }
},
{
  "event_id": "bE831Ee49c4AadAcFB",
  "timestamp": 1756423458,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "MapBatches(ResnetModel)_2",
    "operator_name": "MapBatches(ResnetModel)",
    "event_time": 1756423458.1420121,
    "event_type": "ISSUE_DETECTION_HANGING",
    "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues."
  }
}
```

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: zac <zac@anyscale.com>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…ject#55717)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
In Ray Data, we have an issue detection feature, but they are only
available as printed logs. We also want to show these insights at our
Data Dashboard for users to debug their code, by exporting these events.

Example export event:
```
{
  "event_id": "Ce4Ee1B7AFd9Cb13B3",
  "timestamp": 1756423457,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "ReadImage->Map(preprocess_image)_1",
    "operator_name": "ReadImage->Map(preprocess_image)",
    "event_time": 1756423457.9333386,
    "event_type": "ISSUE_DETECTION_HIGH_MEMORY",
    "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n"
  }
},
{
  "event_id": "bE831Ee49c4AadAcFB",
  "timestamp": 1756423458,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "MapBatches(ResnetModel)_2",
    "operator_name": "MapBatches(ResnetModel)",
    "event_time": 1756423458.1420121,
    "event_type": "ISSUE_DETECTION_HANGING",
    "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues."
  }
}
```

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
snorkelopstesting2-coder pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_55717_975c0892-2ff1-45cd-a2b3-0f75376f4844 that referenced this pull request Oct 11, 2025
snorkelopstesting2-coder added a commit to snorkel-marlin-repos/ray-project_ray_pr_55717_975c0892-2ff1-45cd-a2b3-0f75376f4844 that referenced this pull request Oct 11, 2025
…etected issues

Merged from original PR #55717
Original: ray-project/ray#55717
snorkelopstesting2-coder pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_55717_baed0557-8a30-41e3-8979-53ad3a31cf9d that referenced this pull request Oct 11, 2025
snorkelopstesting2-coder added a commit to snorkel-marlin-repos/ray-project_ray_pr_55717_baed0557-8a30-41e3-8979-53ad3a31cf9d that referenced this pull request Oct 11, 2025
…etected issues

Merged from original PR #55717
Original: ray-project/ray#55717
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
…ject#55717)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
In Ray Data, we have an issue detection feature, but they are only
available as printed logs. We also want to show these insights at our
Data Dashboard for users to debug their code, by exporting these events.

Example export event:
```
{
  "event_id": "Ce4Ee1B7AFd9Cb13B3",
  "timestamp": 1756423457,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "ReadImage->Map(preprocess_image)_1",
    "operator_name": "ReadImage->Map(preprocess_image)",
    "event_time": 1756423457.9333386,
    "event_type": "ISSUE_DETECTION_HIGH_MEMORY",
    "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n"
  }
},
{
  "event_id": "bE831Ee49c4AadAcFB",
  "timestamp": 1756423458,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "MapBatches(ResnetModel)_2",
    "operator_name": "MapBatches(ResnetModel)",
    "event_time": 1756423458.1420121,
    "event_type": "ISSUE_DETECTION_HANGING",
    "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues."
  }
}
```

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…ject#55717)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?
In Ray Data, we have an issue detection feature, but they are only
available as printed logs. We also want to show these insights at our
Data Dashboard for users to debug their code, by exporting these events.

Example export event:
```
{
  "event_id": "Ce4Ee1B7AFd9Cb13B3",
  "timestamp": 1756423457,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "ReadImage->Map(preprocess_image)_1",
    "operator_name": "ReadImage->Map(preprocess_image)",
    "event_time": 1756423457.9333386,
    "event_type": "ISSUE_DETECTION_HIGH_MEMORY",
    "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n"
  }
},
{
  "event_id": "bE831Ee49c4AadAcFB",
  "timestamp": 1756423458,
  "source_type": "EXPORT_DATASET_OPERATOR_EVENT",
  "event_data": {
    "dataset_id": "dataset_6_0",
    "operator_id": "MapBatches(ResnetModel)_2",
    "operator_name": "MapBatches(ResnetModel)",
    "event_time": 1756423458.1420121,
    "event_type": "ISSUE_DETECTION_HANGING",
    "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues."
  }
}
```

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [X] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [X] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants