[Data] Emit events rather than just logs for detected issues#55717
[Data] Emit events rather than just logs for detected issues#55717alexeykudinkin merged 1 commit intoray-project:masterfrom
Conversation
6b883c4 to
ac9b742
Compare
omatthew98
left a comment
There was a problem hiding this comment.
Overall makes sense. Some general comments:
- Let's use dataset rather than data to avoid overloading data (e.g. not using double data in
ExportDataOperatorEventData). Left some comments about this, but definitely missed some. - I was a little confused about how extensible we want this to be. Do we want to support more than just IssueDetection with this?
- Can you update the PR description with some examples of what the protos exported looks like.
Thank you!
| double event_time = 4; | ||
|
|
||
| // The type of the event | ||
| string event_type = 5; |
There was a problem hiding this comment.
Could make this an enum?
There was a problem hiding this comment.
Added enum for this field. At this moment, we only have two issue types.
| return proto_operator_event_data | ||
|
|
||
|
|
||
| def get_operator_event_exporter() -> "OperatorEventExporter": |
There was a problem hiding this comment.
We might want to make this configurable like we have for the dataset metadata export api?
There was a problem hiding this comment.
I am wondering if we want to always enable it for either debugging or reporting purpose.
There was a problem hiding this comment.
I think we should just follow the same pattern we have for the existing ray data exporting, like what we have configurable here.
| incorporating a position or index (e.g., "ReadParquet_0") | ||
| operator_name: The name of the operator. | ||
| event_time: The timestamp when the event is emitted (in seconds since epoch). | ||
| event_type: The type of the event. |
There was a problem hiding this comment.
What are the types of event types? I thought that this was the type of issue, but is your intention to support more than just issues with this?
There was a problem hiding this comment.
The intention here is to make this event export a general way for all of the operator level events. Issue detection is one type, and the event types can be hanging / high memory. In the future, we can have more events during the operator lifecycle, in addition to issues, such as start / finish / failed.
e59ed2d to
84fac96
Compare
84fac96 to
12455bd
Compare
Renamed the variables/classes and added example export events to the PR description. |
12455bd to
0253479
Compare
can-anyscale
left a comment
There was a problem hiding this comment.
stamp from core ownership, defer to data team for review ;)
omatthew98
left a comment
There was a problem hiding this comment.
I think only two things left to clean up are to maybe get rid of the enum dict and just directly use the enum and then mimic the create_if_enable method here for consistency.
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| ISSUE_TYPE_TO_EXPORT_OPERATOR_EVENT_TYPE = { |
There was a problem hiding this comment.
What is the purpose of this dict? Can we just directly use the enum for this? Maybe we can have an enum to enum matching?
There was a problem hiding this comment.
Perhaps we should define the enum in operator_event_exporter.py? Although it seems like we should not need two enums if they are functionally the same. If the main difference is that we want to prepend ISSUE_DETECTION as a prefix, we can just handle that below.
There was a problem hiding this comment.
@omatthew98 just made the following updates:
- used the same
create_if_enablepattern for the OperatorEventExporter - removed the dict, and added a separate function to format issue event name and prepend the
ISSUE_DETECTIONprefix
3add988 to
2706611
Compare
omatthew98
left a comment
There was a problem hiding this comment.
Thanks for iterating on this @coqian!
| // EXPORT_DATASET_OPERATOR type events from Ray Data operators. | ||
| message ExportDatasetOperatorEventData { | ||
| enum DatasetOperatorEventType { | ||
| ISSUE_DETECTION_HANGING = 0; |
There was a problem hiding this comment.
First enum value has to be always unset (default)
There was a problem hiding this comment.
@aslonnie do we have proto linter (it should be catching theses things)?
There was a problem hiding this comment.
Update the enum, with the first value unset as UNSPECIFIED = 0
Signed-off-by: cong.qian <cong.qian@anyscale.com>
2706611 to
2cb921f
Compare
…ject#55717) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? In Ray Data, we have an issue detection feature, but they are only available as printed logs. We also want to show these insights at our Data Dashboard for users to debug their code, by exporting these events. Example export event: ``` { "event_id": "Ce4Ee1B7AFd9Cb13B3", "timestamp": 1756423457, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "ReadImage->Map(preprocess_image)_1", "operator_name": "ReadImage->Map(preprocess_image)", "event_time": 1756423457.9333386, "event_type": "ISSUE_DETECTION_HIGH_MEMORY", "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n" } }, { "event_id": "bE831Ee49c4AadAcFB", "timestamp": 1756423458, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "MapBatches(ResnetModel)_2", "operator_name": "MapBatches(ResnetModel)", "event_time": 1756423458.1420121, "event_type": "ISSUE_DETECTION_HANGING", "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues." } } ``` <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [X] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [X] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: cong.qian <cong.qian@anyscale.com> Signed-off-by: zac <zac@anyscale.com>
…ject#55717) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? In Ray Data, we have an issue detection feature, but they are only available as printed logs. We also want to show these insights at our Data Dashboard for users to debug their code, by exporting these events. Example export event: ``` { "event_id": "Ce4Ee1B7AFd9Cb13B3", "timestamp": 1756423457, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "ReadImage->Map(preprocess_image)_1", "operator_name": "ReadImage->Map(preprocess_image)", "event_time": 1756423457.9333386, "event_type": "ISSUE_DETECTION_HIGH_MEMORY", "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n" } }, { "event_id": "bE831Ee49c4AadAcFB", "timestamp": 1756423458, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "MapBatches(ResnetModel)_2", "operator_name": "MapBatches(ResnetModel)", "event_time": 1756423458.1420121, "event_type": "ISSUE_DETECTION_HANGING", "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues." } } ``` <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [X] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [X] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: cong.qian <cong.qian@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
Original PR #55717 by coqian Original: ray-project/ray#55717
…etected issues Merged from original PR #55717 Original: ray-project/ray#55717
Original PR #55717 by coqian Original: ray-project/ray#55717
…etected issues Merged from original PR #55717 Original: ray-project/ray#55717
…ject#55717) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? In Ray Data, we have an issue detection feature, but they are only available as printed logs. We also want to show these insights at our Data Dashboard for users to debug their code, by exporting these events. Example export event: ``` { "event_id": "Ce4Ee1B7AFd9Cb13B3", "timestamp": 1756423457, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "ReadImage->Map(preprocess_image)_1", "operator_name": "ReadImage->Map(preprocess_image)", "event_time": 1756423457.9333386, "event_type": "ISSUE_DETECTION_HIGH_MEMORY", "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n" } }, { "event_id": "bE831Ee49c4AadAcFB", "timestamp": 1756423458, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "MapBatches(ResnetModel)_2", "operator_name": "MapBatches(ResnetModel)", "event_time": 1756423458.1420121, "event_type": "ISSUE_DETECTION_HANGING", "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues." } } ``` <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [X] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [X] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: cong.qian <cong.qian@anyscale.com>
…ject#55717) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? In Ray Data, we have an issue detection feature, but they are only available as printed logs. We also want to show these insights at our Data Dashboard for users to debug their code, by exporting these events. Example export event: ``` { "event_id": "Ce4Ee1B7AFd9Cb13B3", "timestamp": 1756423457, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "ReadImage->Map(preprocess_image)_1", "operator_name": "ReadImage->Map(preprocess_image)", "event_time": 1756423457.9333386, "event_type": "ISSUE_DETECTION_HIGH_MEMORY", "message": "\n\nOperator 'ReadImage->Map(preprocess_image)' uses xxx of memory per\ntask on average, but Ray only requests xxx per task at the start of\nthe pipeline.\n\nTo avoid out-of-memory errors, consider setting `memory=xxx` in the\nappropriate function or method call. (This might be unnecessary if the\nnumber of concurrent tasks is low.)\n\nTo change the frequency of this warning, set\n`DataContext.get_current().issue_detectors_config.high_memory_detector_config.detection_time_interval_s`,\nor disable the warning by setting value to -1. (current value: xxx)\n" } }, { "event_id": "bE831Ee49c4AadAcFB", "timestamp": 1756423458, "source_type": "EXPORT_DATASET_OPERATOR_EVENT", "event_data": { "dataset_id": "dataset_6_0", "operator_id": "MapBatches(ResnetModel)_2", "operator_name": "MapBatches(ResnetModel)", "event_time": 1756423458.1420121, "event_type": "ISSUE_DETECTION_HANGING", "message": "A task of operator MapBatches(ResnetModel) with task index xxx has been running for xxxs, which is longer than the average task duration of this operator (xxxs). If this message persists, please check the stack trace of the task for potential hanging issues." } } ``` <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [X] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [X] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: cong.qian <cong.qian@anyscale.com>
Why are these changes needed?
In Ray Data, we have an issue detection feature, but they are only available as printed logs. We also want to show these insights at our Data Dashboard for users to debug their code, by exporting these events.
Example export event:
Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.