fix: de-duplicate operations received in quick succession from c8y/devicecontrol/notifications#3454
Conversation
Codecov ReportAttention: Patch coverage is
📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
Robot Results
|
albinsuresh
left a comment
There was a problem hiding this comment.
One issue that exists with this solution is a (admittedly likely very slow) memory leak caused by the mapper storing the operation IDs indefinitely. Ideally, we should delete these at some point where we know the operation is no longer marked as pending, though I'm not sure how to do this.
Although clearing this entry when the operation transitions to successful/failed states is good enough for majority of the cases, I understand that it is not a fool-proof solution as there is still the possibility of getting a duplicate message while these state transition messages are still in transit to the cloud (or buffered for processing either locally or on the cloud). But since the terminal state transitions usually happen after the executing transition has already happened(which changes the PENDING status of the op in the cloud), the risk is reduced even further, although not fully eliminated.
I'm in favour of risking a duplicate operation execution in such rare cases(where the duplicate is delivered even after the terminal state transition is published), compared to the risk of that slow memory leak.
| supported_operations: SupportedOperations, | ||
| pub operation_handler: OperationHandler, | ||
|
|
||
| processed_ids: HashSet<String>, |
There was a problem hiding this comment.
Could we not re-use the active_commands set? Or you avoided that because the entries in that set are cleared on the terminal state transition of those operations and you didn't want these entries cleared so soon?
There was a problem hiding this comment.
Looking at the doc comment for active_commands this sounds indeed as the correct place for that fix.
However, this raises an other point: why do we have this issue with duplicated commands while there is already a mechanism supposed to handle that?
There was a problem hiding this comment.
I think active_commands is a broken solution to the problem. I think there we are de-duplicating c8y/devicecontrol/notifications, but we are only tracking an active command once we receive the relevant tedge-topic command message, which will happen a short while later. I think this is what leaves open the window for an operation to be duplicated.
Assuming I've understood correctly, that would indicate that the problem could simply be solved by moving the active_commands insertion to where I'm currently inserting to processed_ids, and deleting the processed_ids stuff?
There was a problem hiding this comment.
Assuming I've understood correctly, that would indicate that the problem could simply be solved by moving the
active_commandsinsertion to where I'm currently inserting toprocessed_ids, and deleting theprocessed_idsstuff?
One point is sure: one should keep only a single de-duplication mechanism. What you propose makes sense: it's better to remove duplicates before any processing.
There was a problem hiding this comment.
why do we have this issue with duplicated commands while there is already a mechanism supposed to handle that?
For the cases where the duplicate messages are delivered after a restart, it is the lack of persistence of this set. But for duplicate messages delivered while the mapper is still live, this should have been sufficient.
There was a problem hiding this comment.
I've now modified active_commands to insert immediately post-conversion, rather than when we receive our outgoing message. As a result, I've deleted processed_ids.
I don't see how this can happen as a device is not expected to receive an operation that it hasn't declared as a supported operation (this is my expectation from C8Y) via the operation capability registration. |
Operation can be created via API which don't first look at which operations are supported by the device or not...so in cases of automation, it is not so uncommon for operations to be sent to devices as it is automation will assume that the an unsupported operation will be rejected by the agent (and this is deemed cheaper rather than backend service first checking each device if it supports the intended operation)...this is fairly common in large device fleets (> 200K). |
| let original = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( | ||
| {"delivery":{"log":[],"time":"2025-03-05T08:49:24.986Z","status":"PENDING"},"agentId":"1916574062","creationTime":"2025-03-05T08:49:24.967Z","deviceId":"1916574062","id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} | ||
| ).to_string()); | ||
| let redelivery = MqttMessage::new(&Topic::new_unchecked("c8y/devicecontrol/notifications"), json!( | ||
| {"delivery":{"log":[{"time":"2025-03-05T08:49:24.986Z","status":"PENDING"},{"time":"2025-03-05T08:49:25.000Z","status":"SEND"},{"time":"2025-03-05T08:49:25.162Z","status":"DELIVERED"}],"time":"2025-03-05T08:49:25.707Z","status":"PENDING"},"agentId":"1916574062","creationTime":"2025-03-05T08:49:24.967Z","deviceId":"1916574062","id":"16574089","status":"PENDING","c8y_Restart":{},"description":"do something","externalSource":{"externalId":"test-device","type":"c8y_Serial"}} | ||
| ).to_string()); |
There was a problem hiding this comment.
It would be good to make obvious that original and redelivery only differ on the delivery field`.
There was a problem hiding this comment.
I've now removed some of the extraneous fields and changed things so we send the original message twice, since the converter doesn't care about the delivery field.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn custom_operations_are_not_deduplicated_before_registration() { |
There was a problem hiding this comment.
I struggle to understand what is checked by this test and how.
CumulocityConverter.supported_operations field is patched before the first request then restored before the second. Okay, but why?
There was a problem hiding this comment.
This tests is checking what happens if we receive a custom operation that is unrecognised and later get redelivered the custom operation after it is registered with the mapper. If the converter naïvely assumes that the operation is active after we first receive it, the de-duplication mechanism will ignore the redelivery. But since we haven't yet processed this message, we should process such a redelivery. This is obviously dependent on something sending a 500 message once the operation is registered, but that could be the case for some custom operation handling service.
The patching of the supported_operations was intended as an easy way of ensuring the registered operations are made clear, since I'm not trying to test how we update supported_operations in this case.
There was a problem hiding this comment.
This is clearer. Might be good to add this response as a comment to the test.
There was a problem hiding this comment.
Technically this is fine. But seeing the detailed discussion above, it felt like this case could have been better represented as an integration test in tests.rs, where we can better simulate the dynamic custom operation registration during the execution of the test.
There was a problem hiding this comment.
I think I want the opposite of that, @albinsuresh. The point of this being a unit test is I don't also want to test the operation registration logic at the same time. I've added a comment to explain what it is I'm trying to test.
albinsuresh
left a comment
There was a problem hiding this comment.
Changes look fine. Some minor suggestions on the tests.
| assert_ne!( | ||
| converter | ||
| .parse_json_custom_operation_topic(&original) | ||
| .await | ||
| .unwrap(), | ||
| vec![], | ||
| "Initial operation delivery produces outgoing message" | ||
| ); |
There was a problem hiding this comment.
A slightly stricter check that validates at least the cmd topic would have been better than this non-empty output check. To avoid false positives like a converted error message sent to te/errors instead of the expected cmd beating this assertion.
|
|
||
| converter.supported_operations = after_registration; | ||
|
|
||
| assert_ne!( |
There was a problem hiding this comment.
Same comment as above, regarding a stricter check.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn custom_operations_are_not_deduplicated_before_registration() { |
There was a problem hiding this comment.
Technically this is fine. But seeing the detailed discussion above, it felt like this case could have been better represented as an integration test in tests.rs, where we can better simulate the dynamic custom operation registration during the execution of the test.
didier-wenzek
left a comment
There was a problem hiding this comment.
Two forgotten dbg! to be removed and some questions.
| &mut self, | ||
| message: &MqttMessage, | ||
| ) -> Result<Vec<MqttMessage>, ConversionError> { | ||
| if dbg!(self.active_commands_last_cleared.elapsed(&*self.clock)) > Duration::from_secs(3600) |
There was a problem hiding this comment.
Just a matter of taste: I would prefer an elapsed or elapsed_since method on the clock rather than the instant:
| if dbg!(self.active_commands_last_cleared.elapsed(&*self.clock)) > Duration::from_secs(3600) | |
| if self.clock.elapsed_since(&self.active_commands_last_cleared) > Duration::from_secs(3600) |
PS: dbg! to be removed
didier-wenzek
left a comment
There was a problem hiding this comment.
Approved. Thank you.
albinsuresh
left a comment
There was a problem hiding this comment.
The changes look much simpler now. I've gone one concern though, regarding the premature eviction from the cache, before the operation really completes.
| &mut self, | ||
| message: &MqttMessage, | ||
| ) -> Result<Vec<MqttMessage>, ConversionError> { | ||
| if self.active_commands_last_cleared.elapsed() > Duration::from_secs(3600) { |
There was a problem hiding this comment.
I was actually thinking that you'd use the timer actor from the mapper to get timeout notification message at desired intervals. But, it was smart to place the eviction logic here.
e50c720 to
288ab5e
Compare
albinsuresh
left a comment
There was a problem hiding this comment.
I re-confirm my approval.
Redid time-based expiry logic
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…for active_commands
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
288ab5e to
0f2d597
Compare
Proposed changes
Fixes the handling of in-progress operations by the Cumulocity converter so that the de-duplication mechanism is applied immediately. Since it takes a small amount of time between triggering the operation and the converter receiving the
te/...topic message (which was when the operation was marked as "active" in the converter), there was a race that could lead to duplicate messages from Cumulocity both being processed by the converter.Specifically, this PR changes the converter to mark the operation as "active" as it is initially handled by the converter. This has two advantages: firstly, this fixes the aforementioned race condition, and it additionally means that legacy custom operations based on smartrest are also de-duplicated. Since the operation doesn't have an associated
te/...topic, these "active" operations expire after 12 hours. For workflow-based/built-in operations, the "active" operation will be discarded by the mapper only when it is marked as complete on the relevantte/...topic. The de-duplication will work across mapper restarts (assuming MQTT broker persistence is approriately configured), as it did before.Types of changes
Paste Link to the issue
Checklist
just prepare-devonce)cargo fmtas mentioned in CODING_GUIDELINEScargo clippyas mentioned in CODING_GUIDELINESFurther comments