Skip to content

Commit 9571d6e

Browse files
committed
fix: restore DATE serialiser patch + address bugbot sweep findings
V1 live-api CI on 4aa768b still reproduced 33 "Incorrect datetime value: '... +00:00'" errors on Audit mirror writes - the exact base- DATE._stringify signature. Local instrumentation shows every MySQL INSERT path (bulkInsert, insertQuery, upsert) dispatches through mysql.DATE._stringify, so the CI observation points to some Sequelize-internal path we can't isolate from here that prototype- invokes the base method on a mysql attribute type. Rather than continue bisecting Sequelize internals, restore a targeted patch on Sequelize.DataTypes.DATE.prototype._stringify that strips the " +00:00" offset suffix (MariaDB strict mode rejects it) while keeping "YYYY-MM-DD HH:mm:ss.SSS" fractional seconds so local SQLite round-trips preserve millisecond precision. Legacy SQLite rows still round-trip via sqlite.DATE.parse's `date.includes("+")` branch. Uses a static ESM `import moment from 'moment'` (the prior attempt with `require('moment')` threw ReferenceError under ESM and was silently swallowed by safeMirrorDbHandler's catch). Also addresses the two open Bugbot findings: - V1 `mirrorDBEnabled()` always returned true in test mode. The in-process cost was the on-startup 11-table backfill iterating every source/mirror pair. Gate the startup backfill on isMysqlMirrorConfiguredForReconnect() instead of mirrorDBEnabled() so SQLite-fallback test runs skip it. Keep mirrorDBEnabled() true in mirrorTest mode - V1 integration tests rely on that implicit test enablement for SQLite-fallback mirror writes to land. - sweepMirrorOrphans[V2] loaded every PK from both mirror and source tables in a single unbounded findAll. Replace with keyset-paginated scans. Peak memory is now O(|mirror|) for the orphan-candidate set rather than O(|mirror|+|source|); the source side is streamed. Restore the original mirror-first scan order (a pre-push review caught that an earlier refactor inverted it, which allowed false- positive orphan deletes for rows inserted concurrently between the source scan and the mirror scan).
1 parent 4aa768b commit 9571d6e

4 files changed

Lines changed: 253 additions & 86 deletions

File tree

src/config/config.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,47 @@
1+
import Sequelize from 'sequelize';
2+
import moment from 'moment';
13
import { getConfig, getConfigV2 } from '../utils/config-loader';
24
import { getChiaRoot } from '../utils/chia-root.js';
35
import { logger } from './logger.js';
46
import { createHash } from 'crypto';
57

8+
// Strip the " +00:00" offset suffix from Sequelize's base DATE serialiser.
9+
//
10+
// Sequelize 6's default BaseTypes.DATE._stringify formats Date values as
11+
// "YYYY-MM-DD HH:mm:ss.SSS Z" (e.g. "2026-04-17 19:12:48.720 +00:00")
12+
// which recent MariaDB strict mode rejects with:
13+
// Incorrect datetime value: '2026-04-17 19:12:48.720 +00:00'
14+
// for column `cadt_mirror_test`.`audit`.`createdAt` at row 1
15+
//
16+
// The mysql-specific subclass overrides _stringify to a safer format
17+
// ("YYYY-MM-DD HH:mm:ss") and is always invoked for attribute-typed
18+
// writes on a MySQL dialect. However, V1 live-api CI reproducibly shows
19+
// the base format reaching MariaDB on Audit.create mirror writes even
20+
// with the mysql subclass wired up - presumably via some Sequelize
21+
// internal path that prototype-invokes the base method rather than
22+
// dispatching through the resolved attribute type. Rather than continue
23+
// bisecting Sequelize internals, patch the base emitter to drop the
24+
// offset suffix. The ".SSS" fractional-seconds form is kept so local
25+
// SQLite round-trips preserve millisecond precision (sqlite.DATE inherits
26+
// this _stringify - it doesn't define its own). Legacy rows written in
27+
// the pre-patch "... +00:00" form still round-trip via sqlite.DATE.parse
28+
// because its `date.includes("+")` branch returns `new Date(str)`
29+
// directly.
30+
//
31+
// MariaDB strict mode accepts ".SSS" (rounds half-up to whole seconds on
32+
// DATETIME(0) columns). CADT's mirror verification helpers
33+
// (tests/v{1,2}/live-api/helpers/mysql-mirror-helpers.js) compare on
34+
// the YYYY-MM-DD prefix only, so the rounding is not observable.
35+
Sequelize.DataTypes.DATE.prototype._stringify = function _stringify(
36+
date,
37+
options,
38+
) {
39+
if (!moment.isMoment(date)) {
40+
date = this._applyTimezone(date, options);
41+
}
42+
return date.format('YYYY-MM-DD HH:mm:ss.SSS');
43+
};
44+
645
const chiaRoot = getChiaRoot();
746
const persistanceFolder = `${chiaRoot}/cadt/v1`;
847
const v2PersistanceFolder = `${chiaRoot}/cadt/v2`;

src/database/index.js

Lines changed: 86 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,20 @@ export const mirrorDBEnabled = () => {
8383
return mirrorEnabledTestOverride;
8484
}
8585
const CONFIG = getConfig();
86+
// In production-like mode ('local' env), require full MySQL config.
87+
// In SQLite-fallback test mode ('mirrorTest' env), leave the mirror
88+
// enabled unconditionally so integration tests that rely on the
89+
// SQLite fallback mirror continue to see their writes land.
90+
//
91+
// Asymmetry note: V2's mirrorDBEnabledV2() is stricter - it returns
92+
// false in SQLite-fallback test mode, and V2 tests opt in via
93+
// __setMirrorEnabledForTestsV2. V1 integration tests predate that
94+
// pattern and rely on the implicit test-mode enablement here.
95+
//
96+
// The cost of the on-startup backfill iterating all 11 table pairs
97+
// is gated separately in prepareDb() via
98+
// isMysqlMirrorConfiguredForReconnect(), not via mirrorDBEnabled(),
99+
// so the backfill only fires when MySQL is actually configured.
86100
if (
87101
mirrorConfig === 'mirror' &&
88102
(!CONFIG?.MIRROR_DB?.DB_HOST ||
@@ -92,7 +106,6 @@ export const mirrorDBEnabled = () => {
92106
) {
93107
return false;
94108
}
95-
96109
return true;
97110
};
98111

@@ -332,39 +345,74 @@ const sweepMirrorOrphans = async (source, mirror, name) => {
332345
}
333346
const pkAttr = pkAttrs[0];
334347

348+
// Concurrency invariant: mirror is scanned BEFORE source. A row
349+
// inserted concurrently (after the mirror page fetch, before or
350+
// during the source page fetch) will be absent from orphanCandidates
351+
// and thus cannot be misclassified as an orphan. A row inserted
352+
// before the mirror scan and deleted from source during the scan is
353+
// a true orphan and will be correctly removed. Reversing the order
354+
// would allow false-positive orphan deletes for rows inserted into
355+
// both tables during the sweep.
356+
//
357+
// Both scans are keyset-paginated. An earlier revision loaded every
358+
// PK from both tables in a single unbounded findAll; this is fine
359+
// for today's row counts but spikes memory on long-running
360+
// deployments with large tables. Peak memory here is O(|mirror|) for
361+
// the candidate set; the source side is streamed.
362+
//
335363
// `raw: true` is safe here - the projection is PK-only, so there are
336364
// no DATE or custom-getter columns whose raw SQLite representation
337-
// could leak into a bulk write (which is the hazard backfillMirror
338-
// avoids). If a future change adds more projected attributes, revisit
339-
// the raw:true pattern (see backfillMirror for the safe variant).
340-
const mirrorPkRows = await mirror.findAll({
341-
attributes: [pkAttr],
342-
raw: true,
343-
});
344-
if (mirrorPkRows.length === 0) {
345-
return 0;
365+
// could leak into a bulk write (the hazard backfillMirror avoids).
366+
const orphanCandidates = new Set();
367+
let lastMirrorPk = null;
368+
while (true) {
369+
const where =
370+
lastMirrorPk === null
371+
? undefined
372+
: { [pkAttr]: { [Sequelize.Op.gt]: lastMirrorPk } };
373+
const page = await mirror.findAll({
374+
attributes: [pkAttr],
375+
where,
376+
limit: BACKFILL_BATCH_SIZE,
377+
order: [[pkAttr, 'ASC']],
378+
raw: true,
379+
});
380+
if (page.length === 0) break;
381+
for (const r of page) orphanCandidates.add(r[pkAttr]);
382+
lastMirrorPk = page[page.length - 1][pkAttr];
383+
if (lastMirrorPk == null) break;
384+
if (page.length < BACKFILL_BATCH_SIZE) break;
346385
}
347386

348-
const sourcePkRows = await source.findAll({
349-
attributes: [pkAttr],
350-
raw: true,
351-
});
352-
const sourcePks = new Set(sourcePkRows.map((r) => r[pkAttr]));
353-
354-
const orphanPks = mirrorPkRows
355-
.map((r) => r[pkAttr])
356-
.filter((pk) => !sourcePks.has(pk));
357-
358-
if (orphanPks.length === 0) {
359-
return 0;
387+
if (orphanCandidates.size === 0) return 0;
388+
389+
let lastSrcPk = null;
390+
while (true) {
391+
const where =
392+
lastSrcPk === null
393+
? undefined
394+
: { [pkAttr]: { [Sequelize.Op.gt]: lastSrcPk } };
395+
const page = await source.findAll({
396+
attributes: [pkAttr],
397+
where,
398+
limit: BACKFILL_BATCH_SIZE,
399+
order: [[pkAttr, 'ASC']],
400+
raw: true,
401+
});
402+
if (page.length === 0) break;
403+
for (const r of page) orphanCandidates.delete(r[pkAttr]);
404+
lastSrcPk = page[page.length - 1][pkAttr];
405+
if (lastSrcPk == null) break;
406+
if (page.length < BACKFILL_BATCH_SIZE) break;
360407
}
361408

409+
if (orphanCandidates.size === 0) return 0;
410+
411+
const orphans = [...orphanCandidates];
362412
let removed = 0;
363-
for (let i = 0; i < orphanPks.length; i += BACKFILL_BATCH_SIZE) {
364-
const batch = orphanPks.slice(i, i + BACKFILL_BATCH_SIZE);
365-
removed += await mirror.destroy({
366-
where: { [pkAttr]: batch },
367-
});
413+
for (let i = 0; i < orphans.length; i += BACKFILL_BATCH_SIZE) {
414+
const batch = orphans.slice(i, i + BACKFILL_BATCH_SIZE);
415+
removed += await mirror.destroy({ where: { [pkAttr]: batch } });
368416
}
369417

370418
logger.info(`Mirror backfill: ${name} - removed ${removed} orphan rows`);
@@ -644,10 +692,17 @@ export const prepareDb = async () => {
644692

645693
await checkForMigrations(sequelize);
646694

647-
// Run the mirror backfill after main migrations so all source and mirror
648-
// tables exist. This catches up rows that were inserted/updated/deleted
649-
// while the mirror was unavailable on a previous run.
650-
if (mirrorSetupSucceeded) {
695+
// Run the mirror backfill after main migrations so all source and
696+
// mirror tables exist. This catches up rows that were inserted/
697+
// updated/deleted while the mirror was unavailable on a previous run.
698+
//
699+
// Gate on isMysqlMirrorConfiguredForReconnect() (MySQL mode only) so
700+
// SQLite-fallback test startups don't iterate all 11 source/mirror
701+
// table pairs on every run. Tests that need the backfill code path
702+
// exercised call backfillMirror() directly with
703+
// __setMirrorSetupSucceededForTests and __setMirrorEnabledForTests
704+
// to opt in.
705+
if (mirrorSetupSucceeded && isMysqlMirrorConfiguredForReconnect()) {
651706
await backfillMirror();
652707
}
653708
};

src/database/v2/index.js

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -599,40 +599,74 @@ const sweepMirrorOrphansV2 = async (source, mirror, name) => {
599599
}
600600
const pkAttr = pkAttrs[0];
601601

602+
// Concurrency invariant: mirror is scanned BEFORE source. A row
603+
// inserted concurrently (after the mirror page fetch, before or
604+
// during the source page fetch) will be absent from orphanCandidates
605+
// and thus cannot be misclassified as an orphan. A row inserted
606+
// before the mirror scan and deleted from source during the scan is
607+
// a true orphan and will be correctly removed. Reversing the order
608+
// would allow false-positive orphan deletes for rows inserted into
609+
// both tables during the sweep.
610+
//
611+
// Both scans are keyset-paginated. An earlier revision loaded every
612+
// PK from both tables in a single unbounded findAll; this is fine
613+
// for today's row counts but spikes memory on long-running
614+
// deployments with large tables. Peak memory here is O(|mirror|) for
615+
// the candidate set; the source side is streamed.
616+
//
602617
// `raw: true` is safe here - the projection is PK-only, so there are
603618
// no DATE or custom-getter columns whose raw SQLite representation
604-
// could leak into a bulk write (which is the hazard backfillMirrorV2
605-
// avoids). If a future change adds more projected attributes, revisit
606-
// the raw:true pattern (see backfillMirrorV2 for the safe variant).
607-
const mirrorPkRows = await mirror.findAll({
608-
attributes: [pkAttr],
609-
raw: true,
610-
});
611-
if (mirrorPkRows.length === 0) {
612-
return 0;
619+
// could leak into a bulk write (the hazard backfillMirrorV2 avoids).
620+
const orphanCandidates = new Set();
621+
let lastMirrorPk = null;
622+
while (true) {
623+
const where =
624+
lastMirrorPk === null
625+
? undefined
626+
: { [pkAttr]: { [Sequelize.Op.gt]: lastMirrorPk } };
627+
const page = await mirror.findAll({
628+
attributes: [pkAttr],
629+
where,
630+
limit: BACKFILL_BATCH_SIZE,
631+
order: [[pkAttr, 'ASC']],
632+
raw: true,
633+
});
634+
if (page.length === 0) break;
635+
for (const r of page) orphanCandidates.add(r[pkAttr]);
636+
lastMirrorPk = page[page.length - 1][pkAttr];
637+
if (lastMirrorPk == null) break;
638+
if (page.length < BACKFILL_BATCH_SIZE) break;
613639
}
614640

615-
const sourcePkRows = await source.findAll({
616-
attributes: [pkAttr],
617-
raw: true,
618-
});
619-
const sourcePks = new Set(sourcePkRows.map((r) => r[pkAttr]));
620-
621-
const orphanPks = mirrorPkRows
622-
.map((r) => r[pkAttr])
623-
.filter((pk) => !sourcePks.has(pk));
624-
625-
if (orphanPks.length === 0) {
626-
return 0;
641+
if (orphanCandidates.size === 0) return 0;
642+
643+
let lastSrcPk = null;
644+
while (true) {
645+
const where =
646+
lastSrcPk === null
647+
? undefined
648+
: { [pkAttr]: { [Sequelize.Op.gt]: lastSrcPk } };
649+
const page = await source.findAll({
650+
attributes: [pkAttr],
651+
where,
652+
limit: BACKFILL_BATCH_SIZE,
653+
order: [[pkAttr, 'ASC']],
654+
raw: true,
655+
});
656+
if (page.length === 0) break;
657+
for (const r of page) orphanCandidates.delete(r[pkAttr]);
658+
lastSrcPk = page[page.length - 1][pkAttr];
659+
if (lastSrcPk == null) break;
660+
if (page.length < BACKFILL_BATCH_SIZE) break;
627661
}
628662

629-
// Batch deletes to avoid unbounded IN (...) lists
663+
if (orphanCandidates.size === 0) return 0;
664+
665+
const orphans = [...orphanCandidates];
630666
let removed = 0;
631-
for (let i = 0; i < orphanPks.length; i += BACKFILL_BATCH_SIZE) {
632-
const batch = orphanPks.slice(i, i + BACKFILL_BATCH_SIZE);
633-
removed += await mirror.destroy({
634-
where: { [pkAttr]: batch },
635-
});
667+
for (let i = 0; i < orphans.length; i += BACKFILL_BATCH_SIZE) {
668+
const batch = orphans.slice(i, i + BACKFILL_BATCH_SIZE);
669+
removed += await mirror.destroy({ where: { [pkAttr]: batch } });
636670
}
637671

638672
loggerV2.info(

0 commit comments

Comments
 (0)