Skip to content

Commit 4aae47d

Browse files
committed
fix: stop double-running mirror backfill on cold start
Two cooperating fixes resolve the production symptom where every CADT restart on a MySQL-mirrored deployment emitted each "synced N records" line twice with matching totals, doubling cold-start mirror write traffic against the MySQL sidecar: 1. V1 model associate() methods previously wrapped their mirror association wiring in safeMirrorDbHandler(), which authenticated against sequelizeMirror at module-load time. Because src/middleware.js statically imports the V1 models barrel, that authenticate fired during the initial import phase - before prepareDb() ran - and tripped the setupNeverRan branch in safeMirrorDbHandler, kicking off startReconnectBackfill() concurrently with prepareDb's own backfillMirror() call. Replace the wrapper with a synchronous mirrorDBEnabled() check across all 9 affected V1 models; Sequelize associations are pure schema metadata and never needed an authenticated connection. 2. Add an in-flight memoization guard on backfillMirror() and backfillMirrorV2() following the same pattern used by mirrorSetupPromise / prepareV2DbPromise. Concurrent callers now share one execution; cleared on settle so subsequent reconnects still re-run. Belt-and-suspenders defence against any other future trigger of a parallel backfill. Also add a cheap per-table in-sync gate (post-orphan-sweep) that skips the bulk upsert when COUNT(*) matches on both sides AND mirror's MAX(updatedAt) is at least as new as source's. Gate runs AFTER the orphan sweep so PK-swap drift (delete B, insert C with old timestamp) is still caught via the post-sweep count mismatch. Falls through to the existing full sync on any mismatch or any error - the gate never substitutes a partial sync for a full one. Documented limitation: non-max-row in-place UPDATE drift via raw SQL is not detected (Sequelize-driven UPDATEs auto-bump updatedAt, so this can't occur through ORM code paths in CADT today). New tests cover concurrent-call coalescing, the in-sync gate skip path, the count-mismatch and max-mismatch fall-through paths, the PK-swap regression for the gate, and the no-authenticate-at-module-load property of the new V1 associate() wiring.
1 parent 5cd5960 commit 4aae47d

13 files changed

Lines changed: 1094 additions & 35 deletions

File tree

src/database/index.js

Lines changed: 228 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,21 @@ let mirrorSetupPromise = null;
111111
// before new operations are applied.
112112
let reconnectBackfillPromise = null;
113113

114+
// Single in-flight backfillMirror call. Distinct from
115+
// reconnectBackfillPromise above: that one guards the reconnect-detection
116+
// path inside safeMirrorDbHandler; this one guards backfillMirror itself
117+
// against concurrent callers. Both startup (prepareDb) and reconnect
118+
// (startReconnectBackfill) can fire backfillMirror, and in practice they
119+
// race at cold start because src/middleware.js eagerly imports the V1
120+
// models barrel - which runs the per-model associate() methods - which
121+
// historically called safeMirrorDbHandler at module-load time, triggering
122+
// a setupNeverRan reconnect in parallel with prepareDb's own backfill.
123+
// The associate() trigger was removed in this change but the in-flight
124+
// guard remains as a belt-and-suspenders defence against any other
125+
// concurrent invocation. Cleared on resolve/reject so reconnects after
126+
// the first run still re-execute.
127+
let backfillInFlightPromise = null;
128+
114129
export const mirrorDBEnabled = () => {
115130
if (mirrorEnabledTestOverride !== null) {
116131
return mirrorEnabledTestOverride;
@@ -413,6 +428,139 @@ export const initMirrorModel = (initFn) => {
413428
*/
414429
const BACKFILL_BATCH_SIZE = 1000;
415430

431+
// Sequelize attribute names for the auto-managed updatedAt column. V1
432+
// models use the default 'updatedAt'; V2 models declare
433+
// `updatedAt: 'updated_at'` with `underscored: true`, which makes
434+
// 'updated_at' the rawAttribute key. Probe both candidates so the same
435+
// gate works against either convention without depending on Sequelize
436+
// internals (_timestampAttributes is undocumented).
437+
const UPDATED_AT_ATTR_CANDIDATES = ['updatedAt', 'updated_at'];
438+
439+
/**
440+
* Find a Sequelize attribute name for the updatedAt column that is
441+
* present on BOTH source and mirror models. Returns the attribute name,
442+
* or null when no consistent name exists (either model is missing the
443+
* column, or they disagree on naming).
444+
*
445+
* Disagreement is intentionally treated as "no gate" rather than picking
446+
* one side: a mismatched name would force one side's MAX() call to
447+
* reference a non-existent column and throw, which would mask a real
448+
* drift behind a swallowed error.
449+
*/
450+
const matchingUpdatedAtAttr = (source, mirror) => {
451+
const sourceAttrs = source.rawAttributes || {};
452+
const mirrorAttrs = mirror.rawAttributes || {};
453+
for (const attr of UPDATED_AT_ATTR_CANDIDATES) {
454+
if (attr in sourceAttrs && attr in mirrorAttrs) {
455+
return attr;
456+
}
457+
}
458+
return null;
459+
};
460+
461+
/**
462+
* Cheap "is the mirror table already caught up?" check used by
463+
* backfillMirror to short-circuit the bulk-upsert pass when there is
464+
* nothing to do. The orphan sweep runs unconditionally BEFORE this
465+
* helper so PK-swap drift (count and MAX preserved, row identities
466+
* differ) is exposed as a post-sweep count mismatch and falls through
467+
* to the upsert. See the call site for the full ordering rationale.
468+
*
469+
* Returns true ONLY when BOTH conditions hold:
470+
* 1. count(source) === count(mirror)
471+
* 2. Either both counts are 0 (truly empty on both sides), OR
472+
* max(mirror[updatedAtAttr]) >= max(source[updatedAtAttr])
473+
*
474+
* Returns false in every other case (including any error), so the
475+
* caller falls through to the existing full upsert. Crucially, false
476+
* is the safe default: a false negative just causes the existing
477+
* (correct) full sync to run, while a false positive would silently
478+
* leave the mirror stale. We deliberately bias toward the
479+
* cheap-but-fully-correct fall-through.
480+
*
481+
* Known limitation - non-max-row UPDATE drift: if a row is updated in
482+
* source via raw SQL with an updatedAt that's strictly below the
483+
* table's existing MAX(updatedAt), the gate can't see the change.
484+
* Sequelize-driven UPDATEs auto-bump updatedAt to NOW() (necessarily
485+
* greater than any prior MAX), so this only happens via raw SQL that
486+
* bypasses the ORM or via clock-skew adjustments. No such code path
487+
* exists in CADT today. If composite drift patterns become a concern,
488+
* replace this with a SUM(UNIX_TIMESTAMP(updatedAt)) checksum or a
489+
* per-table hash digest.
490+
*/
491+
const isMirrorInSync = async (source, mirror, name, updatedAtAttr) => {
492+
try {
493+
const [sourceCount, mirrorCount, sourceMax, mirrorMax] = await Promise.all([
494+
source.count(),
495+
mirror.count(),
496+
source.max(updatedAtAttr),
497+
mirror.max(updatedAtAttr),
498+
]);
499+
500+
if (sourceCount !== mirrorCount) {
501+
logger.debug(
502+
`Mirror backfill: ${name} - gate failed (count mismatch: source=${sourceCount} mirror=${mirrorCount})`,
503+
);
504+
return false;
505+
}
506+
507+
if (sourceCount === 0) {
508+
logger.debug(
509+
`Mirror backfill: ${name} - in sync, skipping (empty on both sides)`,
510+
);
511+
return true;
512+
}
513+
514+
// Counts agree and are non-zero. A null MAX(updatedAt) at this
515+
// point means rows exist with null timestamps - we can't compare
516+
// freshness, so fall through to the full sync rather than skip.
517+
if (sourceMax == null || mirrorMax == null) {
518+
logger.debug(
519+
`Mirror backfill: ${name} - gate failed (max(updatedAt) null with ${sourceCount} rows)`,
520+
);
521+
return false;
522+
}
523+
524+
// Sequelize.max returns either a Date (for DATE columns) or whatever
525+
// raw value the dialect returned. Coerce through Date so SQLite string
526+
// timestamps and MySQL Date instances compare consistently.
527+
const sourceMs = new Date(sourceMax).getTime();
528+
const mirrorMs = new Date(mirrorMax).getTime();
529+
if (Number.isNaN(sourceMs) || Number.isNaN(mirrorMs)) {
530+
logger.debug(
531+
`Mirror backfill: ${name} - gate failed (non-parseable max(updatedAt))`,
532+
);
533+
return false;
534+
}
535+
536+
if (mirrorMs >= sourceMs) {
537+
logger.debug(
538+
`Mirror backfill: ${name} - in sync, skipping (${sourceCount} rows, max(updatedAt) mirror=${mirrorMs} >= source=${sourceMs})`,
539+
);
540+
return true;
541+
}
542+
543+
logger.debug(
544+
`Mirror backfill: ${name} - gate failed (mirror max(updatedAt)=${mirrorMs} < source=${sourceMs})`,
545+
);
546+
return false;
547+
} catch (error) {
548+
// Never block the existing sync path on a gate error - just fall
549+
// through to the full upsert.
550+
logger.debug(
551+
`Mirror backfill: ${name} - gate check failed (${error.message}), falling through to full sync`,
552+
);
553+
return false;
554+
}
555+
};
556+
557+
// NOTE: when this early-returns for composite PKs, the in-sync gate
558+
// that runs after it in backfillMirror operates on raw (un-swept)
559+
// state. mirror-model-init.spec.js currently enforces single-column
560+
// PKs on every mirror, so this path is unreachable today. If a
561+
// composite-PK mirror is introduced later, the gate's PK-swap
562+
// protection no longer applies and the gate would need to be
563+
// disabled for that table or replaced with a stronger check.
416564
const sweepMirrorOrphans = async (source, mirror, name) => {
417565
const pkAttrs = mirror.primaryKeyAttributes;
418566
if (!pkAttrs || pkAttrs.length !== 1) {
@@ -502,6 +650,31 @@ export const backfillMirror = async () => {
502650
return;
503651
}
504652

653+
// Coalesce concurrent calls. Without this guard, prepareDb's startup
654+
// backfill and any other code path that invokes backfillMirror() would
655+
// each pull and upsert the entire dataset. The race is observable in
656+
// production logs: identical "synced N records" / "MySQL mirror backfill
657+
// completed - N records upserted" lines appearing twice with matching
658+
// counts on the same boot. The orphan sweep and bulk upsert are
659+
// idempotent so the duplicate work is benign correctness-wise, but it
660+
// doubles the cold-start time and MySQL write traffic.
661+
//
662+
// Cleared on settle so subsequent reconnects after the first run still
663+
// perform a fresh catch-up.
664+
if (backfillInFlightPromise) {
665+
return backfillInFlightPromise;
666+
}
667+
668+
backfillInFlightPromise = (async () => {
669+
await runBackfillMirror();
670+
})().finally(() => {
671+
backfillInFlightPromise = null;
672+
});
673+
674+
return backfillInFlightPromise;
675+
};
676+
677+
const runBackfillMirror = async () => {
505678
logger.info('Starting MySQL mirror backfill from SQLite...');
506679

507680
try {
@@ -572,6 +745,7 @@ export const backfillMirror = async () => {
572745

573746
let totalSynced = 0;
574747
let totalOrphansRemoved = 0;
748+
let totalGateSkipped = 0;
575749

576750
for (const { source, mirror, name } of mirrorPairs) {
577751
try {
@@ -586,10 +760,62 @@ export const backfillMirror = async () => {
586760
}
587761

588762
// Orphan sweep first (mirror snapshot before source snapshot) so
589-
// concurrent inserts aren't wrongly classified as orphans.
763+
// concurrent inserts aren't wrongly classified as orphans. The
764+
// sweep runs UNCONDITIONALLY - before the in-sync gate below -
765+
// because the gate's COUNT(*) + MAX(updatedAt) check cannot
766+
// distinguish a healthy mirror from one where rows were
767+
// delete+inserted with a different PK during an outage (count
768+
// and MAX preserved, but the row identities differ). Running
769+
// the sweep first makes such drift visible to the gate via the
770+
// post-sweep count mismatch, which then falls through to the
771+
// full upsert. See PR review for the reproducer.
590772
const orphansRemoved = await sweepMirrorOrphans(source, mirror, name);
591773
totalOrphansRemoved += orphansRemoved;
592774

775+
// Fast in-sync gate (post-sweep). Skip the bulk upsert entirely
776+
// when COUNT(*) matches on both sides AND mirror's MAX(updatedAt)
777+
// is at least as new as source's. Both queries are index-backed
778+
// (PK + updatedAt), so the gate is cheap - vastly cheaper than
779+
// the full keyset walk + bulk upsert it bypasses. (The orphan
780+
// sweep above still runs on every restart; we only avoid the
781+
// expensive per-row data read + MySQL upsert pass when truly
782+
// in sync.)
783+
//
784+
// Correctness invariant: the gate ONLY short-circuits the upsert;
785+
// it never substitutes a partial sync for a full one. On any
786+
// mismatch (count differs, mirror is missing rows entirely, or
787+
// mirror's MAX(updatedAt) is older than source's) we fall through
788+
// to the existing full sync, which catches arbitrary gaps
789+
// including outage windows where the mirror missed rows.
790+
//
791+
// Skip the gate (fall through to full sync) when source and mirror
792+
// disagree about which timestamp attribute name to use, or when
793+
// either lacks an updatedAt timestamp entirely. Both conditions
794+
// mean we can't ask a single MAX() question that both sides answer
795+
// consistently.
796+
//
797+
// Known limitation: an in-place UPDATE to a non-max-row whose
798+
// newly-bumped updatedAt happens to remain below the table's
799+
// existing MAX(updatedAt) would not be detected by the gate.
800+
// Sequelize-driven UPDATEs always bump updatedAt to NOW(), which
801+
// is necessarily greater than any prior MAX, so this requires
802+
// raw-SQL manipulation that bypasses the ORM. No such code path
803+
// exists in CADT today; if one is added, switch the gate to a
804+
// SUM(UNIX_TIMESTAMP(updatedAt)) checksum.
805+
const updatedAtAttr = matchingUpdatedAtAttr(source, mirror);
806+
if (updatedAtAttr) {
807+
const inSync = await isMirrorInSync(
808+
source,
809+
mirror,
810+
name,
811+
updatedAtAttr,
812+
);
813+
if (inSync) {
814+
totalGateSkipped += 1;
815+
continue;
816+
}
817+
}
818+
593819
const updateFields = Object.keys(mirror.rawAttributes).filter(
594820
(attr) => !mirror.primaryKeyAttributes.includes(attr),
595821
);
@@ -692,7 +918,7 @@ export const backfillMirror = async () => {
692918
}
693919

694920
logger.info(
695-
`MySQL mirror backfill completed - ${totalSynced} records upserted, ${totalOrphansRemoved} orphan rows removed`,
921+
`MySQL mirror backfill completed - ${totalSynced} records upserted, ${totalOrphansRemoved} orphan rows removed, ${totalGateSkipped} tables skipped (already in sync)`,
696922
);
697923
} catch (error) {
698924
logger.error(

0 commit comments

Comments
 (0)