Skip to content

Commit b272f9c

Browse files
fix: ensure datetime-related values fully compatible with MySQL and BigQuery (#15026)
1 parent 001dc22 commit b272f9c

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

airflow/providers/google/cloud/transfers/mysql_to_gcs.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
"""MySQL to GCS operator."""
1919

2020
import base64
21-
import calendar
22-
from datetime import date, datetime, timedelta
21+
from datetime import date, datetime, time, timedelta
2322
from decimal import Decimal
2423
from typing import Dict
2524

@@ -100,10 +99,12 @@ def convert_type(self, value, schema_type: str):
10099
Takes a value from MySQLdb, and converts it to a value that's safe for
101100
JSON/Google Cloud Storage/BigQuery.
102101

103-
* Datetimes are converted to UTC seconds.
102+
* Datetimes are converted to `str(value)` (`datetime.isoformat(' ')`)
103+
strings.
104+
* Times are converted to `str((datetime.min + value).time())` strings.
104105
* Decimals are converted to floats.
105-
* Dates are converted to ISO formatted string if given schema_type is
106-
DATE, or UTC seconds otherwise.
106+
* Dates are converted to ISO formatted strings if given schema_type is
107+
DATE, or `datetime.isoformat(' ')` strings otherwise.
107108
* Binary type fields are converted to integer if given schema_type is
108109
INTEGER, or encoded with base64 otherwise. Imported BYTES data must
109110
be base64-encoded according to BigQuery documentation:
@@ -117,16 +118,16 @@ def convert_type(self, value, schema_type: str):
117118
if value is None:
118119
return value
119120
if isinstance(value, datetime):
120-
value = calendar.timegm(value.timetuple())
121+
value = str(value)
121122
elif isinstance(value, timedelta):
122-
value = value.total_seconds()
123+
value = str((datetime.min + value).time())
123124
elif isinstance(value, Decimal):
124125
value = float(value)
125126
elif isinstance(value, date):
126127
if schema_type == "DATE":
127128
value = value.isoformat()
128129
else:
129-
value = calendar.timegm(value.timetuple())
130+
value = str(datetime.combine(value, time.min))
130131
elif isinstance(value, bytes):
131132
if schema_type == "INTEGER":
132133
value = int.from_bytes(value, "big")

tests/providers/google/cloud/transfers/test_mysql_to_gcs.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,14 @@ def test_init(self):
8888
@parameterized.expand(
8989
[
9090
("string", None, "string"),
91-
(datetime.date(1970, 1, 2), None, 86400),
91+
(datetime.date(1970, 1, 2), None, "1970-01-02 00:00:00"),
92+
(datetime.date(1000, 1, 2), None, "1000-01-02 00:00:00"),
9293
(datetime.date(1970, 1, 2), "DATE", "1970-01-02"),
93-
(datetime.datetime(1970, 1, 1, 1, 0), None, 3600),
94+
(datetime.date(1000, 1, 2), "DATE", "1000-01-02"),
95+
(datetime.datetime(1970, 1, 1, 1, 0), None, "1970-01-01 01:00:00"),
96+
(datetime.datetime(1000, 1, 1, 1, 0), None, "1000-01-01 01:00:00"),
97+
(datetime.timedelta(), None, "00:00:00"),
98+
(datetime.timedelta(hours=23, minutes=59, seconds=59), None, "23:59:59"),
9499
(decimal.Decimal(5), None, 5),
95100
(b"bytes", "BYTES", "Ynl0ZXM="),
96101
(b"\x00\x01", "INTEGER", 1),

0 commit comments

Comments
 (0)