Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #4 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 7 8 +1
Lines 231 289 +58
=========================================
+ Hits 231 289 +58
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
| class MongoDBCDCTranslatorBase: | ||
| """ | ||
| Translate MongoDB CDC events into different representations. | ||
|
|
||
| Change streams allow applications to access real-time data changes without the prior | ||
| complexity and risk of manually tailing the oplog. Applications can use change streams | ||
| to subscribe to all data changes on a single collection, a database, or an entire | ||
| deployment, and immediately react to them. | ||
|
|
||
| Because change streams use the aggregation framework, applications can also filter | ||
| for specific changes or transform the notifications at will. | ||
|
|
||
| - https://www.mongodb.com/docs/manual/changeStreams/ | ||
| - https://www.mongodb.com/developer/languages/python/python-change-streams/ | ||
| """ | ||
|
|
||
| def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]: | ||
| """ | ||
| Deserialize MongoDB type-enriched nested JSON snippet into vanilla Python. | ||
|
|
||
| Example: | ||
| { | ||
| "_id": ObjectId("669683c2b0750b2c84893f3e"), | ||
| "id": "5F9E", | ||
| "data": {"temperature": 42.42, "humidity": 84.84}, | ||
| "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, | ||
| } | ||
| """ | ||
| return _json_convert(item) | ||
|
|
||
|
|
||
| class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): | ||
| """ | ||
| Translate MongoDB CDC events into CrateDB SQL statements that materialize them again. | ||
|
|
||
| Please note that change streams are only available for replica sets and sharded clusters. | ||
|
|
||
| Accepted events: insert, update, replace, delete | ||
| Ignored events: drop, invalidate | ||
|
|
||
| The current implementation uses the `fullDocument` representation to update records | ||
| in the sink database table. In order to receive them on `update` events as well, you | ||
| need to subscribe to change events using `watch(full_document="updateLookup")`. | ||
|
|
||
| The MongoDB documentation has a few remarks about the caveats of this approach: | ||
|
|
||
| > Updates with the `fullDocument` Option: The `fullDocument` option for Update Operations | ||
| > does not guarantee the returned document does not include further changes. In contrast | ||
| > to the document deltas that are guaranteed to be sent in order with update notifications, | ||
| > there is no guarantee that the `fullDocument` returned represents the document as it was | ||
| > exactly after the operation. | ||
| > | ||
| > `updateLookup` will poll the current version of the document. If changes happen quickly | ||
| > it is possible that the document was changed before the updateLookup finished. This means | ||
| > that the `fullDocument` might not represent the document at the time of the event thus | ||
| > potentially giving the impression events took place in a different order. | ||
| > | ||
| > -- https://www.mongodb.com/developer/languages/python/python-change-streams/ | ||
|
|
||
| The SQL DDL schema for CrateDB: | ||
| CREATE TABLE <tablename> (oid TEXT, data OBJECT(DYNAMIC)); | ||
| """ | ||
|
|
||
| # Define name of the column where MongoDB's OID for a document will be stored. | ||
| ID_COLUMN = "oid" | ||
|
|
||
| # Define name of the column where CDC's record data will get materialized into. | ||
| DATA_COLUMN = "data" | ||
|
|
||
| def __init__(self, table_name: str): | ||
| super().__init__() | ||
| self.table_name = self.quote_table_name(table_name) | ||
|
|
||
| @property | ||
| def sql_ddl(self): | ||
| """ | ||
| Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. | ||
| """ | ||
| return ( | ||
| f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" | ||
| ) | ||
|
|
||
| def to_sql(self, record: t.Dict[str, t.Any]) -> str: | ||
| """ | ||
| Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record. | ||
| """ | ||
|
|
||
| if "operationType" in record and record["operationType"]: | ||
| operation_type: str = str(record["operationType"]) | ||
| else: | ||
| raise ValueError(f"Operation Type missing or empty: {record}") | ||
|
|
||
| if operation_type == "insert": | ||
| oid: str = self.get_document_key(record) | ||
| full_document = self.get_full_document(record) | ||
| values_clause = self.full_document_to_values(full_document) | ||
| sql = ( | ||
| f"INSERT INTO {self.table_name} " | ||
| f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " | ||
| f"VALUES ('{oid}', '{values_clause}');" | ||
| ) | ||
|
|
||
| # In order to use "full document" representations from "update" events, | ||
| # you need to use `watch(full_document="updateLookup")`. | ||
| # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations | ||
| elif operation_type in ["update", "replace"]: | ||
| full_document = self.get_full_document(record) | ||
| values_clause = self.full_document_to_values(full_document) | ||
| where_clause = self.where_clause(record) | ||
| sql = f"UPDATE {self.table_name} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" | ||
|
|
||
| elif operation_type == "delete": | ||
| where_clause = self.where_clause(record) | ||
| sql = f"DELETE FROM {self.table_name} WHERE {where_clause};" | ||
|
|
||
| # TODO: Enable applying the "drop" operation conditionally when enabled. | ||
| elif operation_type == "drop": | ||
| logger.info("Received 'drop' operation, but skipping to apply 'DROP TABLE'") | ||
| sql = "" | ||
|
|
||
| elif operation_type == "invalidate": | ||
| logger.info("Ignoring 'invalidate' CDC operation") | ||
| sql = "" | ||
|
|
||
| else: | ||
| raise ValueError(f"Unknown CDC operation type: {operation_type}") | ||
|
|
||
| return sql | ||
|
|
||
| @staticmethod | ||
| def get_document_key(record: t.Dict[str, t.Any]) -> str: | ||
| """ | ||
| Return value of document key (MongoDB document OID) from CDC record. | ||
|
|
||
| "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} | ||
| """ | ||
| return str(record.get("documentKey", {}).get("_id")) | ||
|
|
||
| @staticmethod | ||
| def get_full_document(record: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: | ||
| """ | ||
| return `fullDocument` representation from record. | ||
| """ | ||
| return t.cast(dict, record.get("fullDocument")) | ||
|
|
||
| def full_document_to_values(self, document: t.Dict[str, t.Any]) -> str: | ||
| """ | ||
| Serialize CDC event's "fullDocument" representation to a `VALUES` clause in CrateDB SQL syntax. | ||
|
|
||
| IN (top-level stripped): | ||
| "fullDocument": { | ||
| "_id": ObjectId("669683c2b0750b2c84893f3e"), | ||
| "id": "5F9E", | ||
| "data": {"temperature": 42.42, "humidity": 84.84}, | ||
| "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, | ||
| } | ||
|
|
||
| OUT: | ||
| {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, | ||
| "id": "5F9E", | ||
| "data": {"temperature": 42.42, "humidity": 84.84}, | ||
| "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, | ||
| } | ||
| """ | ||
| return json.dumps(self.deserialize_item(document)) | ||
|
|
||
| def where_clause(self, record: t.Dict[str, t.Any]) -> str: | ||
| """ | ||
| When converging an oplog of a MongoDB collection, the primary key is always the MongoDB document OID. | ||
|
|
||
| IN (top-level stripped): | ||
| "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} | ||
|
|
||
| OUT: | ||
| WHERE oid = '669683c2b0750b2c84893f3e' | ||
| """ | ||
| oid = self.get_document_key(record) | ||
| return f"oid = '{oid}'" |
There was a problem hiding this comment.
Dear @hlcianfagna, @hammerhead, @surister, and @wierdvanderhaar,
may I kindly ask you to review those routines? If you worked with DynamoDB CDC convergence already, they might feel familiar to you.
Thanks in advance,
Andreas.
There was a problem hiding this comment.
DDL schema
In particular, I am interested if you agree with the SQL DDL schema and the corresponding update strategy, or if you would prefer a different one, and how that would look like.
CREATE TABLE <tablename> (oid TEXT, data OBJECT(DYNAMIC));Value serialization
Currently, typed values are retained in their nested-JSON forms, so a sample content of the data column looks like outlined below. Here, I would be interested what you think about the nestings of "timestamp": {"$date": "2024-07-11T23:17:42Z"} and friends, how MongoDB's special values are serialized like.
- Is it good to go for a first iteration?
- Should there be a subsequent iteration to address any shortcomings you may report, or wish, because crafting SQL queries which take those nestings into consideration will be totally silly or even impossible?
{
// Coming from MongoDB.
// Can also be omitted, because the OID will be stored into a dedicated `oid` column anyway.
// Right now, we retained the `fullDocument` payload 1:1, but that will be easy to spice up
// by introducing corresponding options/flavours.
"_id": {
"$oid": "6696c937f6e1770586480277"
},
// Coming from user.
"id": "5F9E",
"meta": {
// Scalar types will not cause any headaches.
"device": "foo",
// Currently, special data types are only value-unmarshalled, while retaining the structure
// as conveyed by MongoDB 1:1. Optionally, or when there is demand, the routine may
// also unmarshal the nested substructure.
"timestamp": {
"$date": "2024-07-11T23:17:42Z"
}
},
"data": {
"humidity": 84.84,
"temperature": 42.42
}
}
About
Similar to the transformer for DynamoDB CDC events, this patch adds a little converter which takes care of MongoDB Change Stream events. In this case, it translates ingress CDC events into SQL statements suitable for CrateDB.
Documentation
The Relay MongoDB Change Stream into CrateDB document describes how to get started using the basic relay example program.