[ML] add new snapshot upgrader API for upgrading older snapshots#64665
Conversation
This new API provides a way for users to upgrade their own anomaly job model snapshots. To upgrade a snapshot the following is done: - Open a native process given the job id and the desired snapshot id - load the snapshot to the process - write the snapshot again from the native task (now updated via the native process) closes elastic#64154
|
Pinging @elastic/ml-core (:ml) |
benwtrent
left a comment
There was a problem hiding this comment.
There was one thing in this change that was concerning to me.
The model size stats seem to simply disappear when upgrading the snapshot. This is probably because the CResourceMonitor::createMemoryUsageReport simply generates the report based on the current job usage (which is effectively nothing).
This strikes me as strange as revert actually does change the model size stats for the job...
Here were my test results:
BEFORE
"model_size_stats" : {
"job_id" : "largish-kibana-sample-data",
"result_type" : "model_size_stats",
"model_bytes" : 4972992,
"peak_model_bytes" : 2495692,
"model_bytes_exceeded" : 0,
"model_bytes_memory_limit" : 524288000,
"total_by_field_count" : 130,
"total_over_field_count" : 0,
"total_partition_field_count" : 129,
"bucket_allocation_failures_count" : 0,
"memory_status" : "ok",
"categorized_doc_count" : 0,
"total_category_count" : 0,
"frequent_category_count" : 0,
"rare_category_count" : 0,
"dead_category_count" : 0,
"failed_category_count" : 0,
"categorization_status" : "ok",
"log_time" : 1604588763336,
"timestamp" : 1607272200000
},
AFTER
"model_size_stats": {
"job_id": "largish-kibana-sample-data",
"result_type": "model_size_stats",
"model_bytes": 11954,
"peak_model_bytes": 0,
"model_bytes_exceeded": 0,
"model_bytes_memory_limit": 524288000,
"total_by_field_count": 130,
"total_over_field_count": 0,
"total_partition_field_count": 129,
"bucket_allocation_failures_count": 0,
"memory_status": "ok",
"categorized_doc_count": 0,
"total_category_count": 0,
"frequent_category_count": 0,
"rare_category_count": 0,
"dead_category_count": 0,
"failed_category_count": 0,
"categorization_status": "ok",
"log_time": 1604591566986,
"timestamp": 1607272200000
},
There might need to be changes on the C++ side so that model size states are not regenerated on snapshot save (especially when persist in foreground is used).
@droberts195 let me know what you think.
| String snapshotId = "1541587919"; | ||
|
|
||
| createModelSnapshot(jobId, snapshotId, Version.V_7_0_0); | ||
| //TODO add a true state from the past somehow |
There was a problem hiding this comment.
I am not 100% how to do this. Right now this test effectively just checks that the parameters are parsed and sent as the resulting error indicates that we at least tried to load the model snapshot.
The solution might be to get an actual snapshot, and then manually updating the doc so that the min_version is old.
There was a problem hiding this comment.
The solution might be to get an actual snapshot, and then manually updating the doc so that the min_version is old.
Yes, in terms of testing the infrastructure that would be a good way. Run a simple job like farequote, update the model snapshot document after the job is closed to have a min_version from the previous major, then upgrade it. Not sure this needs to be done in the HRLC tests though - for such a complex test the native multi node tests seems like the single place to do it. I am happy to leave this test as-is, just testing the parameter passing.
In terms of testing actual upgrade, it could be done in the BWC tests. We could have a BWC test (Java, not YAML) that does nothing when the old cluster is on the same major, but when the old cluster is on a different major it opens/runs/closes a job in the old cluster, then upgrades its model snapshot in the fully upgraded cluster (and does nothing in the mixed cluster).
There was a problem hiding this comment.
yeah, I have a BWC test class covering this case.
| super(NAME, Response::new); | ||
| } | ||
|
|
||
| public static class Request extends MasterNodeRequest<Request> implements ToXContentObject { |
There was a problem hiding this comment.
This is a master node action as we always want the latest cluster state information.
|
|
||
| public enum SnapshotUpgradeState implements Writeable { | ||
|
|
||
| READING_NEW_STATE, STOPPED, FAILED; |
There was a problem hiding this comment.
I didn't opt for a "writing_old_state" or an "opened" state as neither really conveyed any information. If the state is null, we know that either it is not assigned to a node or it is assigned and still loading the old snapshot.
Once we are in the reading_new_state, then that indicates that we have reached the point of no return and any failure from that state indicates a corrupted job model snapshot.
There was a problem hiding this comment.
reading_new_state is very much from the perspective of the Java code rather than the end user. As an end user who doesn't even know that the code is split between Java and C++ I would have thought writing_new_state makes more sense. Or saving_new_state would be a compromise that makes sense to both end users and Java developers.
I would also introduce a reading_old_state or loading_old_state enum value that can be used in stats and API responses instead of null. We went through that cycle with job states. Initially there was no opening state, because a null task state basically meant that. But then we found it was nicer to have a specific enum value for it and translate null to that enum value in some places. Even if it's not used anywhere initially it will avoid BWC code to add it to the enum from the outset.
| mlController = new DummyController(); | ||
| autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> | ||
| new BlackHoleAutodetectProcess(job.getId(), onProcessCrash); | ||
| autodetectProcessFactory = (pipelineId, job, autodetectParams, executorService, onProcessCrash) -> |
There was a problem hiding this comment.
pipelineId is for renaming the resulting file pipeline. See below comments for further explanation
|
|
||
| if (state.nodes().getMaxNodeVersion().after(state.nodes().getMinNodeVersion())) { | ||
| listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
| "Cannot upgrade job [{}] snapshot [{}] as not all nodes are on version {}. All nodes must be the same version", |
There was a problem hiding this comment.
This just eliminates the small edge cases of requiring job node assignment to take into account node version. These processes are short lived, and restricting the cluster to not be a mixed cluster is a sane limitation. Especially since this API is meant to be used right before upgrading to the next major version.
| public AutodetectProcess createAutodetectProcess(String pipelineId, | ||
| Job job, |
There was a problem hiding this comment.
One of the requirements was that this upgrade be possible WHILE the referenced job is running. Consequently, the snapshot upgrade task and the job task COULD be assigned to the same node. If the pipeline ID was not given directly, this would cause a file name conflict.
Admittedly, there is already this "unique pipeline flag" but that is a long value. I thought it would be nice to include the snapshot ID directly in the pipeline name. It makes the resulting logs very easy to investigate snapshot upgrader issues by looking for <job_id>-<snapshot_id>
| * <p> | ||
| * This is a single purpose result processor and only handles snapshot writes | ||
| */ | ||
| public class JobSnapshotUpgraderResultProcessor { |
There was a problem hiding this comment.
I created a new processor here as I didn't want to chance ANY other result being written back. This just protects us from inadvertently updating the job results/state when we didn't mean to.
I possibly could have had an AbstractResultProcessor class, but shared code was so little, it didn't really seem worth it.
| if (persistentTask == null) { | ||
| isCompleted = true; | ||
| return true; | ||
| } |
There was a problem hiding this comment.
In all my testing, the task is only null when it has been removed from cluster state. Since this predicate runs AFTER we have confirmed the task has been added to state (the start task API), it is good to assume that null is removal and thus is completion.
| if (SnapshotUpgradeState.READING_NEW_STATE.equals(jobState)) { | ||
| deleteSnapshotAndFailTask(task, params.getJobId(), params.getSnapshotId()); | ||
| return; |
There was a problem hiding this comment.
If we are assigned to a new node while reading_new_state, the snapshot could be corrupted since the files are being overwritten one by one.
Consequently, we audit, log and then delete the snapshot as it is unusable anyways.
It MIGHT be better to add a flag to the snapshot that says "bad snapshot". But, the way to recover here would be to delete the job model state and then restore from an elasticsearch snapshot...Up for debate.
| List<ModelSnapshot> snapshots = getModelSnapshots(job.getId(), snapshot.getSnapshotId()).snapshots(); | ||
| assertThat(snapshots, hasSize(1)); | ||
| assertThat(snapshot.getLatestRecordTimeStamp(), equalTo(snapshots.get(0).getLatestRecordTimeStamp())); | ||
|
|
||
| // Does the snapshot still work? | ||
| assertThat(hlrc.getJobStats(new GetJobStatsRequest(JOB_ID), RequestOptions.DEFAULT) | ||
| .jobStats() | ||
| .get(0) | ||
| .getDataCounts().getLatestRecordTimeStamp(), | ||
| greaterThan(snapshot.getLatestRecordTimeStamp())); |
There was a problem hiding this comment.
After backport I want to add a mixed cluster test (to make sure the mixed node error throws) and I want to verify that the min_version is updated on the new snapshot.
Right now, since 8.x does not support upgrades from 6.x, that is not possible here. But in 7.x, it will be good to test that min_version gets adjusted.
|
After this is merged and backported, another API should be added to check the stats of snapshot upgrades and changes to the deprecation API to include when a snapshot is too old to run in the next major. |
I agree there need to be changes on the C++ side. For For @edsavage please could you investigate those two things. |
droberts195
left a comment
There was a problem hiding this comment.
Thanks for writing this extremely complicated yet also tedious functionality.
There's a lot of code so I haven't reviewed it all in detail, but have left an initial set of comments. The biggest one is that we need to think about how to avoid excessive complexity in the Kibana migration assistant that will have to use this code eventually.
| String snapshotId = "1541587919"; | ||
|
|
||
| createModelSnapshot(jobId, snapshotId, Version.V_7_0_0); | ||
| //TODO add a true state from the past somehow |
There was a problem hiding this comment.
The solution might be to get an actual snapshot, and then manually updating the doc so that the min_version is old.
Yes, in terms of testing the infrastructure that would be a good way. Run a simple job like farequote, update the model snapshot document after the job is closed to have a min_version from the previous major, then upgrade it. Not sure this needs to be done in the HRLC tests though - for such a complex test the native multi node tests seems like the single place to do it. I am happy to leave this test as-is, just testing the parameter passing.
In terms of testing actual upgrade, it could be done in the BWC tests. We could have a BWC test (Java, not YAML) that does nothing when the old cluster is on the same major, but when the old cluster is on a different major it opens/runs/closes a job in the old cluster, then upgrades its model snapshot in the fully upgraded cluster (and does nothing in the mixed cluster).
| UpgradeJobModelSnapshotResponse response = client.machineLearning().upgradeJobSnapshot(request, RequestOptions.DEFAULT); | ||
| // end::upgrade-job-model-snapshot-execute | ||
| } catch (ElasticsearchException ex) { | ||
| // TODO have a true snapshot in the past to upgrade? |
There was a problem hiding this comment.
As above, this will be complex and expensive, and I am not sure that is justified for the docs tests. We can do it once as part of the native multi node tests, but burning that CPU many times in a full CI run seems unjustified.
There was a problem hiding this comment.
yeah, I have a BWC test class covering this case.
There was a problem hiding this comment.
In that case I think you should remove the TODO from here and instead have a comment to say that this is just checking syntax because actual upgrade is covered elsewhere.
| -------------------------------------------------- | ||
| <1> The job that owns the snapshot | ||
| <2> The snapshot id to upgrade | ||
| <3> The time out of the request |
There was a problem hiding this comment.
As an end user I would be interested in what this means if wait_for_completion=false, and what the default is.
|
|
||
| public enum SnapshotUpgradeState implements Writeable { | ||
|
|
||
| READING_NEW_STATE, STOPPED, FAILED; |
There was a problem hiding this comment.
reading_new_state is very much from the perspective of the Java code rather than the end user. As an end user who doesn't even know that the code is split between Java and C++ I would have thought writing_new_state makes more sense. Or saving_new_state would be a compromise that makes sense to both end users and Java developers.
I would also introduce a reading_old_state or loading_old_state enum value that can be used in stats and API responses instead of null. We went through that cycle with job states. Initially there was no opening state, because a null task state basically meant that. But then we found it was nicer to have a specific enum value for it and translate null to that enum value in some places. Even if it's not used anywhere initially it will avoid BWC code to add it to the enum from the outset.
| try { | ||
| return PARSER.parse(parser, null); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Did you consider UncheckedIOException?
| if (response.result.getMinVersion().major >= UPGRADE_FROM_MAJOR) { | ||
| listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
| "Cannot upgrade job [{}] snapshot [{}] as it is already compatible with current major version {}", | ||
| request.getJobId(), | ||
| request.getSnapshotId(), | ||
| UPGRADE_FROM_MAJOR)); | ||
| return; | ||
| } |
There was a problem hiding this comment.
I would do the check on the exact version rather than just the major. Although it shouldn't be necessary, upgrading the format from e.g. 7.0 format to 7.11 format might be a useful piece of functionality to have in the future to work around some other bug.
| listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
| "Cannot upgrade snapshot [{}] for job [{}] as it is the current primary job snapshot", | ||
| request.getSnapshotId(), | ||
| request.getJobId() | ||
| )); |
There was a problem hiding this comment.
This means extra complication for the Kibana upgrade assistant though. For every model snapshot that exists that is too old it will now have to recommend one of two possible courses of action, depending on whether the snapshot is the active one or not. Opening and closing a job normally without sending it any data doesn't rewrite the snapshot, so the user would also have to feed some data to actually change the active model snapshot of the job. So I think this check should be altered to only ban upgrading the active snapshot if the job is open.
| return fieldIndexes; | ||
| } | ||
|
|
||
| void writeHeader() throws IOException { |
There was a problem hiding this comment.
I am not surprised you had to write a header. You could probably get away with writing one with just the control field (field name .). But it's not particularly important, so I'm happy to leave what's here.
droberts195
left a comment
There was a problem hiding this comment.
LGTM
I noticed a few more minor things but am happy to merge this once they're resolved.
| UpgradeJobModelSnapshotResponse response = client.machineLearning().upgradeJobSnapshot(request, RequestOptions.DEFAULT); | ||
| // end::upgrade-job-model-snapshot-execute | ||
| } catch (ElasticsearchException ex) { | ||
| // TODO have a true snapshot in the past to upgrade? |
There was a problem hiding this comment.
In that case I think you should remove the TODO from here and instead have a comment to say that this is just checking syntax because actual upgrade is covered elsewhere.
| lengthEncodedWriter.flush(); | ||
| } | ||
|
|
||
| public void writeStartBackgroundPersistMessage(long snapshotTimestamp, String snapshotId, String description) throws IOException { |
There was a problem hiding this comment.
Please add a Javadoc comment to say whether snapshotTimestamp is in epoch millis or epoch seconds. Also, it might be worth adding Seconds or Millis to the variable name to make ultra clear which it is for future maintainers.
|
|
||
| /** | ||
| * Ask the process to persist state, even if it is unchanged. | ||
| * @param snapshotTimestamp The snapshot timestamp |
There was a problem hiding this comment.
Please add whether this is epoch seconds or epoch millis.
| // C++ is expecting the timestamp to be in seconds, not Milliseconds | ||
| params.modelSnapshot().getTimestamp().getTime()/1000, |
There was a problem hiding this comment.
Since Java rarely uses epoch seconds, it's probably better to move the /1000 closer to the point of passing the information to the C++ process, e.g. in AutodetectControlMsgWriter.
| bulkResultsPersister.executeRequest(); | ||
| } | ||
| } catch (Exception e) { | ||
| LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); |
There was a problem hiding this comment.
| LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); | |
| LOGGER.warn(new ParameterizedMessage("[{}] Error persisting model snapshot [{}] upgrade results", jobId, snapshotId), e); |
| // that it would have been better to close jobs before shutting down, | ||
| // but we now fully expect jobs to move between nodes without doing | ||
| // all their graceful close activities. | ||
| LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); |
There was a problem hiding this comment.
| LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); | |
| LOGGER.warn("[{}] some model snapshot [{}] upgrade results not processed due to the process being killed", jobId, snapshotId); |
| LOGGER.warn("[{}] some results not processed due to the process being killed", jobId); | ||
| } else if (process.isProcessAliveAfterWaiting() == false) { | ||
| // Don't log the stack trace to not shadow the root cause. | ||
| LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId); |
There was a problem hiding this comment.
| LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId); | |
| LOGGER.warn("[{}] some model snapshot [{}] upgrade results not processed due to the termination of autodetect", jobId, snapshotId); |
| } else { | ||
| // We should only get here if the iterator throws in which | ||
| // case parsing the autodetect output has failed. | ||
| LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", jobId), e); |
There was a problem hiding this comment.
| LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output", jobId), e); | |
| LOGGER.error(new ParameterizedMessage("[{}] error parsing autodetect output during model snapshot [{}] upgrade", jobId, snapshotId), e); |
| if (isAlive() == false) { | ||
| throw e; | ||
| } | ||
| LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); |
There was a problem hiding this comment.
| LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result", jobId), e); | |
| LOGGER.warn(new ParameterizedMessage("[{}] Error processing autodetect result during model snapshot [{}] upgrade", jobId, snapshotId), e); |
|
|
||
| private void logUnexpectedResult(String resultType) { | ||
| LOGGER.info("[{}] [{}] unexpected result read [{}]", jobId, snapshotId, resultType); | ||
| } |
There was a problem hiding this comment.
Consider adding assert resultType == null or something else that will detect if this happens during our integration tests.
…stic#64665) This new API provides a way for users to upgrade their own anomaly job model snapshots. To upgrade a snapshot the following is done: - Open a native process given the job id and the desired snapshot id - load the snapshot to the process - write the snapshot again from the native task (now updated via the native process) relates elastic#64154
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review)
#64665) (#65010) * [ML] add new snapshot upgrader API for upgrading older snapshots (#64665) This new API provides a way for users to upgrade their own anomaly job model snapshots. To upgrade a snapshot the following is done: - Open a native process given the job id and the desired snapshot id - load the snapshot to the process - write the snapshot again from the native task (now updated via the native process) relates #64154
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review)
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review) Backport of elastic#1585
When a persist control message with arguments is received by the anomaly detector it doesn't go through the standard chain of persistence calls, as it unconditionally rewrites the state (even if no data has been seen) and includes only the anomaly detector state rather than the categorizer state too. Because of this the memory usage was not being recalculated prior to persisting the state as would normally happen. This PR rectifies that omission. Fixes one of the problems detailed in elastic/elasticsearch#64665 (review) Backport of #1585
This new API provides a way for users to upgrade their own anomaly job
model snapshots.
To upgrade a snapshot the following is done:
native process)
relates #64154