Skip to content

Commit 114b3c9

Browse files
committed
Add transformer for DynamoDB CDC to CrateDB SQL conversion
It vendorizes DynamoDB's `TypeDeserializer` Python implementation from boto3, in order to optimally provide type support without needing to pull in the full package as a dependency.
1 parent de911a7 commit 114b3c9

11 files changed

Lines changed: 593 additions & 0 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33

44
## Unreleased
55
- Added decoders for Airrohr, Tasmota, and TTS/TTN from Kotori DAQ
6+
- Added transformer for DynamoDB CDC to CrateDB SQL conversion
67

78
## 2024/07/15 0.0.1

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ To install the most recent version, run:
2424
pip install --upgrade data-x
2525
```
2626

27+
## License
28+
The project uses the LGPLv3 license for the whole ensemble. However, individual
29+
portions of the code base are vendorized from other Python packages, where
30+
deviating licenses may apply. Please check for detailed license information
31+
within the header sections of relevant files.
32+
2733
## Contributing
2834
The `data-x` package is an open source project, and is
2935
[managed on GitHub](https://github.com/daq-tools/data-x).

src/data_x/transform/__init__.py

Whitespace-only changes.

src/data_x/transform/dynamodb.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
2+
# Distributed under the terms of the LGPLv3 license, see LICENSE.
3+
4+
# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction
5+
6+
import logging
7+
import typing as t
8+
9+
import simplejson as json
10+
import toolz
11+
12+
from data_x.vendor.boto3.dynamodb.types import TypeDeserializer
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class DynamoCDCTranslatorBase:
18+
"""
19+
Translate DynamoDB CDC events into different representations.
20+
"""
21+
22+
def __init__(self):
23+
self.deserializer = TypeDeserializer()
24+
25+
def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]:
26+
"""
27+
Deserialize DynamoDB type-enriched nested JSON snippet into vanilla Python.
28+
29+
Example:
30+
{
31+
"humidity": {"N": "84.84"},
32+
"temperature": {"N": "42.42"},
33+
"device": {"S": "qux"},
34+
"timestamp": {"S": "2024-07-12T01:17:42"},
35+
}
36+
37+
A complete list of DynamoDB data type descriptors:
38+
39+
S – String
40+
N – Number
41+
B – Binary
42+
BOOL – Boolean
43+
NULL – Null
44+
M – Map
45+
L – List
46+
SS – String Set
47+
NS – Number Set
48+
BS – Binary Set
49+
50+
-- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors
51+
"""
52+
return toolz.valmap(self.deserializer.deserialize, item)
53+
54+
55+
class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase):
56+
"""
57+
Translate DynamoDB CDC events into CrateDB SQL statements that materialize them again.
58+
59+
The SQL DDL schema for CrateDB:
60+
CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
61+
62+
Blueprint:
63+
https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/
64+
"""
65+
66+
# Define name of the column where CDC's record data will get materialized into.
67+
DATA_COLUMN = "data"
68+
69+
def __init__(self, table_name: str):
70+
super().__init__()
71+
self.table_name = self.quote_table_name(table_name)
72+
73+
@property
74+
def sql_ddl(self):
75+
"""
76+
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
77+
"""
78+
return f"CREATE TABLE {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"
79+
80+
def to_sql(self, record: t.Dict[str, t.Any]) -> str:
81+
"""
82+
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
83+
"""
84+
event_source = record.get("eventSource")
85+
event_name = record.get("eventName")
86+
87+
if event_source != "aws:dynamodb":
88+
raise ValueError(f"Unknown eventSource: {event_source}")
89+
90+
if event_name == "INSERT":
91+
values_clause = self.image_to_values(record["dynamodb"]["NewImage"])
92+
sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');"
93+
94+
elif event_name == "MODIFY":
95+
values_clause = self.image_to_values(record["dynamodb"]["NewImage"])
96+
where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
97+
sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = '{values_clause}' " f"WHERE {where_clause};"
98+
99+
elif event_name == "REMOVE":
100+
where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
101+
sql = f"DELETE FROM {self.table_name} " f"WHERE {where_clause};"
102+
103+
else:
104+
raise ValueError(f"Unknown CDC event name: {event_name}")
105+
106+
return sql
107+
108+
def image_to_values(self, image: t.Dict[str, t.Any]) -> str:
109+
"""
110+
Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax.
111+
112+
IN (top-level stripped):
113+
"NewImage": {
114+
"humidity": {"N": "84.84"},
115+
"temperature": {"N": "42.42"},
116+
"device": {"S": "foo"},
117+
"timestamp": {"S": "2024-07-12T01:17:42"},
118+
}
119+
120+
OUT:
121+
{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}
122+
"""
123+
return json.dumps(self.deserialize_item(image))
124+
125+
def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
126+
"""
127+
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
128+
129+
IN (top-level stripped):
130+
"Keys": {
131+
"device": {"S": "foo"},
132+
"timestamp": {"S": "2024-07-12T01:17:42"},
133+
}
134+
135+
OUT:
136+
WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42'
137+
"""
138+
constraints: t.List[str] = []
139+
for key_name, key_value_raw in keys.items():
140+
key_value = self.deserializer.deserialize(key_value_raw)
141+
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
142+
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
143+
constraints.append(constraint)
144+
return " AND ".join(constraints)
145+
146+
@staticmethod
147+
def quote_table_name(name: str):
148+
"""
149+
Poor man's table quoting.
150+
151+
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
152+
"""
153+
if '"' not in name:
154+
name = f'"{name}"'
155+
return name

src/data_x/util/io.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Copyright (c) 2016-2024, The Kotori Developers and contributors.
2+
# Distributed under the terms of the LGPLv3 license, see LICENSE.
3+
import json
4+
import typing as t
5+
from pathlib import Path
6+
7+
8+
def read_jsonfile(name: t.Union[str, Path]) -> t.Dict[str, t.Any]:
9+
return json.loads(Path(name).read_text())

src/data_x/vendor/__init__.py

Whitespace-only changes.

src/data_x/vendor/boto3/__init__.py

Whitespace-only changes.

src/data_x/vendor/boto3/dynamodb/__init__.py

Whitespace-only changes.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# https://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
# Patched: from boto3.compat import collections_abc
14+
from decimal import (
15+
Clamped,
16+
Context,
17+
Inexact,
18+
Overflow,
19+
Rounded,
20+
Underflow,
21+
)
22+
23+
STRING = "S"
24+
NUMBER = "N"
25+
BINARY = "B"
26+
STRING_SET = "SS"
27+
NUMBER_SET = "NS"
28+
BINARY_SET = "BS"
29+
NULL = "NULL"
30+
BOOLEAN = "BOOL"
31+
MAP = "M"
32+
LIST = "L"
33+
34+
35+
DYNAMODB_CONTEXT = Context(
36+
Emin=-128,
37+
Emax=126,
38+
prec=38,
39+
traps=[Clamped, Overflow, Inexact, Rounded, Underflow],
40+
)
41+
42+
43+
BINARY_TYPES = (bytearray, bytes)
44+
45+
46+
class Binary:
47+
"""A class for representing Binary in dynamodb
48+
49+
Especially for Python 2, use this class to explicitly specify
50+
binary data for item in DynamoDB. It is essentially a wrapper around
51+
binary. Unicode and Python 3 string types are not allowed.
52+
"""
53+
54+
def __init__(self, value):
55+
if not isinstance(value, BINARY_TYPES):
56+
types = ", ".join([str(t) for t in BINARY_TYPES])
57+
raise TypeError(f"Value must be of the following types: {types}")
58+
self.value = value
59+
60+
def __eq__(self, other):
61+
if isinstance(other, Binary):
62+
return self.value == other.value
63+
return self.value == other
64+
65+
def __ne__(self, other):
66+
return not self.__eq__(other)
67+
68+
def __repr__(self):
69+
return f"Binary({self.value!r})"
70+
71+
def __str__(self):
72+
return self.value
73+
74+
def __bytes__(self):
75+
return self.value
76+
77+
def __hash__(self):
78+
return hash(self.value)
79+
80+
81+
class TypeDeserializer:
82+
"""This class deserializes DynamoDB types to Python types."""
83+
84+
def deserialize(self, value):
85+
"""The method to deserialize the DynamoDB data types.
86+
87+
:param value: A DynamoDB value to be deserialized to a pythonic value.
88+
Here are the various conversions:
89+
90+
DynamoDB Python
91+
-------- ------
92+
{'NULL': True} None
93+
{'BOOL': True/False} True/False
94+
{'N': str(value)} Decimal(str(value))
95+
{'S': string} string
96+
{'B': bytes} Binary(bytes)
97+
{'NS': [str(value)]} set([Decimal(str(value))])
98+
{'SS': [string]} set([string])
99+
{'BS': [bytes]} set([bytes])
100+
{'L': list} list
101+
{'M': dict} dict
102+
103+
:returns: The pythonic value of the DynamoDB type.
104+
"""
105+
106+
if not value:
107+
raise TypeError("Value must be a nonempty dictionary whose key is a valid dynamodb type.")
108+
dynamodb_type = list(value.keys())[0]
109+
try:
110+
deserializer = getattr(self, f"_deserialize_{dynamodb_type}".lower())
111+
except AttributeError as ex:
112+
raise TypeError(f"Dynamodb type {dynamodb_type} is not supported") from ex
113+
return deserializer(value[dynamodb_type])
114+
115+
def _deserialize_null(self, value):
116+
return None
117+
118+
def _deserialize_bool(self, value):
119+
return value
120+
121+
def _deserialize_n(self, value):
122+
return DYNAMODB_CONTEXT.create_decimal(value)
123+
124+
def _deserialize_s(self, value):
125+
return value
126+
127+
def _deserialize_b(self, value):
128+
return Binary(value)
129+
130+
def _deserialize_ns(self, value):
131+
return set(map(self._deserialize_n, value))
132+
133+
def _deserialize_ss(self, value):
134+
return set(map(self._deserialize_s, value))
135+
136+
def _deserialize_bs(self, value):
137+
return set(map(self._deserialize_b, value))
138+
139+
def _deserialize_l(self, value):
140+
return [self.deserialize(v) for v in value]
141+
142+
def _deserialize_m(self, value):
143+
return {k: self.deserialize(v) for k, v in value.items()}

0 commit comments

Comments
 (0)