@@ -111,6 +111,21 @@ let mirrorSetupPromise = null;
111111// before new operations are applied.
112112let reconnectBackfillPromise = null ;
113113
114+ // Single in-flight backfillMirror call. Distinct from
115+ // reconnectBackfillPromise above: that one guards the reconnect-detection
116+ // path inside safeMirrorDbHandler; this one guards backfillMirror itself
117+ // against concurrent callers. Both startup (prepareDb) and reconnect
118+ // (startReconnectBackfill) can fire backfillMirror, and in practice they
119+ // race at cold start because src/middleware.js eagerly imports the V1
120+ // models barrel - which runs the per-model associate() methods - which
121+ // historically called safeMirrorDbHandler at module-load time, triggering
122+ // a setupNeverRan reconnect in parallel with prepareDb's own backfill.
123+ // The associate() trigger was removed in this change but the in-flight
124+ // guard remains as a belt-and-suspenders defence against any other
125+ // concurrent invocation. Cleared on resolve/reject so reconnects after
126+ // the first run still re-execute.
127+ let backfillInFlightPromise = null ;
128+
114129export const mirrorDBEnabled = ( ) => {
115130 if ( mirrorEnabledTestOverride !== null ) {
116131 return mirrorEnabledTestOverride ;
@@ -413,6 +428,139 @@ export const initMirrorModel = (initFn) => {
413428 */
414429const BACKFILL_BATCH_SIZE = 1000 ;
415430
431+ // Sequelize attribute names for the auto-managed updatedAt column. V1
432+ // models use the default 'updatedAt'; V2 models declare
433+ // `updatedAt: 'updated_at'` with `underscored: true`, which makes
434+ // 'updated_at' the rawAttribute key. Probe both candidates so the same
435+ // gate works against either convention without depending on Sequelize
436+ // internals (_timestampAttributes is undocumented).
437+ const UPDATED_AT_ATTR_CANDIDATES = [ 'updatedAt' , 'updated_at' ] ;
438+
439+ /**
440+ * Find a Sequelize attribute name for the updatedAt column that is
441+ * present on BOTH source and mirror models. Returns the attribute name,
442+ * or null when no consistent name exists (either model is missing the
443+ * column, or they disagree on naming).
444+ *
445+ * Disagreement is intentionally treated as "no gate" rather than picking
446+ * one side: a mismatched name would force one side's MAX() call to
447+ * reference a non-existent column and throw, which would mask a real
448+ * drift behind a swallowed error.
449+ */
450+ const matchingUpdatedAtAttr = ( source , mirror ) => {
451+ const sourceAttrs = source . rawAttributes || { } ;
452+ const mirrorAttrs = mirror . rawAttributes || { } ;
453+ for ( const attr of UPDATED_AT_ATTR_CANDIDATES ) {
454+ if ( attr in sourceAttrs && attr in mirrorAttrs ) {
455+ return attr ;
456+ }
457+ }
458+ return null ;
459+ } ;
460+
461+ /**
462+ * Cheap "is the mirror table already caught up?" check used by
463+ * backfillMirror to short-circuit the bulk-upsert pass when there is
464+ * nothing to do. The orphan sweep runs unconditionally BEFORE this
465+ * helper so PK-swap drift (count and MAX preserved, row identities
466+ * differ) is exposed as a post-sweep count mismatch and falls through
467+ * to the upsert. See the call site for the full ordering rationale.
468+ *
469+ * Returns true ONLY when BOTH conditions hold:
470+ * 1. count(source) === count(mirror)
471+ * 2. Either both counts are 0 (truly empty on both sides), OR
472+ * max(mirror[updatedAtAttr]) >= max(source[updatedAtAttr])
473+ *
474+ * Returns false in every other case (including any error), so the
475+ * caller falls through to the existing full upsert. Crucially, false
476+ * is the safe default: a false negative just causes the existing
477+ * (correct) full sync to run, while a false positive would silently
478+ * leave the mirror stale. We deliberately bias toward the
479+ * cheap-but-fully-correct fall-through.
480+ *
481+ * Known limitation - non-max-row UPDATE drift: if a row is updated in
482+ * source via raw SQL with an updatedAt that's strictly below the
483+ * table's existing MAX(updatedAt), the gate can't see the change.
484+ * Sequelize-driven UPDATEs auto-bump updatedAt to NOW() (necessarily
485+ * greater than any prior MAX), so this only happens via raw SQL that
486+ * bypasses the ORM or via clock-skew adjustments. No such code path
487+ * exists in CADT today. If composite drift patterns become a concern,
488+ * replace this with a SUM(UNIX_TIMESTAMP(updatedAt)) checksum or a
489+ * per-table hash digest.
490+ */
491+ const isMirrorInSync = async ( source , mirror , name , updatedAtAttr ) => {
492+ try {
493+ const [ sourceCount , mirrorCount , sourceMax , mirrorMax ] = await Promise . all ( [
494+ source . count ( ) ,
495+ mirror . count ( ) ,
496+ source . max ( updatedAtAttr ) ,
497+ mirror . max ( updatedAtAttr ) ,
498+ ] ) ;
499+
500+ if ( sourceCount !== mirrorCount ) {
501+ logger . debug (
502+ `Mirror backfill: ${ name } - gate failed (count mismatch: source=${ sourceCount } mirror=${ mirrorCount } )` ,
503+ ) ;
504+ return false ;
505+ }
506+
507+ if ( sourceCount === 0 ) {
508+ logger . debug (
509+ `Mirror backfill: ${ name } - in sync, skipping (empty on both sides)` ,
510+ ) ;
511+ return true ;
512+ }
513+
514+ // Counts agree and are non-zero. A null MAX(updatedAt) at this
515+ // point means rows exist with null timestamps - we can't compare
516+ // freshness, so fall through to the full sync rather than skip.
517+ if ( sourceMax == null || mirrorMax == null ) {
518+ logger . debug (
519+ `Mirror backfill: ${ name } - gate failed (max(updatedAt) null with ${ sourceCount } rows)` ,
520+ ) ;
521+ return false ;
522+ }
523+
524+ // Sequelize.max returns either a Date (for DATE columns) or whatever
525+ // raw value the dialect returned. Coerce through Date so SQLite string
526+ // timestamps and MySQL Date instances compare consistently.
527+ const sourceMs = new Date ( sourceMax ) . getTime ( ) ;
528+ const mirrorMs = new Date ( mirrorMax ) . getTime ( ) ;
529+ if ( Number . isNaN ( sourceMs ) || Number . isNaN ( mirrorMs ) ) {
530+ logger . debug (
531+ `Mirror backfill: ${ name } - gate failed (non-parseable max(updatedAt))` ,
532+ ) ;
533+ return false ;
534+ }
535+
536+ if ( mirrorMs >= sourceMs ) {
537+ logger . debug (
538+ `Mirror backfill: ${ name } - in sync, skipping (${ sourceCount } rows, max(updatedAt) mirror=${ mirrorMs } >= source=${ sourceMs } )` ,
539+ ) ;
540+ return true ;
541+ }
542+
543+ logger . debug (
544+ `Mirror backfill: ${ name } - gate failed (mirror max(updatedAt)=${ mirrorMs } < source=${ sourceMs } )` ,
545+ ) ;
546+ return false ;
547+ } catch ( error ) {
548+ // Never block the existing sync path on a gate error - just fall
549+ // through to the full upsert.
550+ logger . debug (
551+ `Mirror backfill: ${ name } - gate check failed (${ error . message } ), falling through to full sync` ,
552+ ) ;
553+ return false ;
554+ }
555+ } ;
556+
557+ // NOTE: when this early-returns for composite PKs, the in-sync gate
558+ // that runs after it in backfillMirror operates on raw (un-swept)
559+ // state. mirror-model-init.spec.js currently enforces single-column
560+ // PKs on every mirror, so this path is unreachable today. If a
561+ // composite-PK mirror is introduced later, the gate's PK-swap
562+ // protection no longer applies and the gate would need to be
563+ // disabled for that table or replaced with a stronger check.
416564const sweepMirrorOrphans = async ( source , mirror , name ) => {
417565 const pkAttrs = mirror . primaryKeyAttributes ;
418566 if ( ! pkAttrs || pkAttrs . length !== 1 ) {
@@ -502,6 +650,31 @@ export const backfillMirror = async () => {
502650 return ;
503651 }
504652
653+ // Coalesce concurrent calls. Without this guard, prepareDb's startup
654+ // backfill and any other code path that invokes backfillMirror() would
655+ // each pull and upsert the entire dataset. The race is observable in
656+ // production logs: identical "synced N records" / "MySQL mirror backfill
657+ // completed - N records upserted" lines appearing twice with matching
658+ // counts on the same boot. The orphan sweep and bulk upsert are
659+ // idempotent so the duplicate work is benign correctness-wise, but it
660+ // doubles the cold-start time and MySQL write traffic.
661+ //
662+ // Cleared on settle so subsequent reconnects after the first run still
663+ // perform a fresh catch-up.
664+ if ( backfillInFlightPromise ) {
665+ return backfillInFlightPromise ;
666+ }
667+
668+ backfillInFlightPromise = ( async ( ) => {
669+ await runBackfillMirror ( ) ;
670+ } ) ( ) . finally ( ( ) => {
671+ backfillInFlightPromise = null ;
672+ } ) ;
673+
674+ return backfillInFlightPromise ;
675+ } ;
676+
677+ const runBackfillMirror = async ( ) => {
505678 logger . info ( 'Starting MySQL mirror backfill from SQLite...' ) ;
506679
507680 try {
@@ -572,6 +745,7 @@ export const backfillMirror = async () => {
572745
573746 let totalSynced = 0 ;
574747 let totalOrphansRemoved = 0 ;
748+ let totalGateSkipped = 0 ;
575749
576750 for ( const { source, mirror, name } of mirrorPairs ) {
577751 try {
@@ -586,10 +760,62 @@ export const backfillMirror = async () => {
586760 }
587761
588762 // Orphan sweep first (mirror snapshot before source snapshot) so
589- // concurrent inserts aren't wrongly classified as orphans.
763+ // concurrent inserts aren't wrongly classified as orphans. The
764+ // sweep runs UNCONDITIONALLY - before the in-sync gate below -
765+ // because the gate's COUNT(*) + MAX(updatedAt) check cannot
766+ // distinguish a healthy mirror from one where rows were
767+ // delete+inserted with a different PK during an outage (count
768+ // and MAX preserved, but the row identities differ). Running
769+ // the sweep first makes such drift visible to the gate via the
770+ // post-sweep count mismatch, which then falls through to the
771+ // full upsert. See PR review for the reproducer.
590772 const orphansRemoved = await sweepMirrorOrphans ( source , mirror , name ) ;
591773 totalOrphansRemoved += orphansRemoved ;
592774
775+ // Fast in-sync gate (post-sweep). Skip the bulk upsert entirely
776+ // when COUNT(*) matches on both sides AND mirror's MAX(updatedAt)
777+ // is at least as new as source's. Both queries are index-backed
778+ // (PK + updatedAt), so the gate is cheap - vastly cheaper than
779+ // the full keyset walk + bulk upsert it bypasses. (The orphan
780+ // sweep above still runs on every restart; we only avoid the
781+ // expensive per-row data read + MySQL upsert pass when truly
782+ // in sync.)
783+ //
784+ // Correctness invariant: the gate ONLY short-circuits the upsert;
785+ // it never substitutes a partial sync for a full one. On any
786+ // mismatch (count differs, mirror is missing rows entirely, or
787+ // mirror's MAX(updatedAt) is older than source's) we fall through
788+ // to the existing full sync, which catches arbitrary gaps
789+ // including outage windows where the mirror missed rows.
790+ //
791+ // Skip the gate (fall through to full sync) when source and mirror
792+ // disagree about which timestamp attribute name to use, or when
793+ // either lacks an updatedAt timestamp entirely. Both conditions
794+ // mean we can't ask a single MAX() question that both sides answer
795+ // consistently.
796+ //
797+ // Known limitation: an in-place UPDATE to a non-max-row whose
798+ // newly-bumped updatedAt happens to remain below the table's
799+ // existing MAX(updatedAt) would not be detected by the gate.
800+ // Sequelize-driven UPDATEs always bump updatedAt to NOW(), which
801+ // is necessarily greater than any prior MAX, so this requires
802+ // raw-SQL manipulation that bypasses the ORM. No such code path
803+ // exists in CADT today; if one is added, switch the gate to a
804+ // SUM(UNIX_TIMESTAMP(updatedAt)) checksum.
805+ const updatedAtAttr = matchingUpdatedAtAttr ( source , mirror ) ;
806+ if ( updatedAtAttr ) {
807+ const inSync = await isMirrorInSync (
808+ source ,
809+ mirror ,
810+ name ,
811+ updatedAtAttr ,
812+ ) ;
813+ if ( inSync ) {
814+ totalGateSkipped += 1 ;
815+ continue ;
816+ }
817+ }
818+
593819 const updateFields = Object . keys ( mirror . rawAttributes ) . filter (
594820 ( attr ) => ! mirror . primaryKeyAttributes . includes ( attr ) ,
595821 ) ;
@@ -692,7 +918,7 @@ export const backfillMirror = async () => {
692918 }
693919
694920 logger . info (
695- `MySQL mirror backfill completed - ${ totalSynced } records upserted, ${ totalOrphansRemoved } orphan rows removed` ,
921+ `MySQL mirror backfill completed - ${ totalSynced } records upserted, ${ totalOrphansRemoved } orphan rows removed, ${ totalGateSkipped } tables skipped (already in sync) ` ,
696922 ) ;
697923 } catch ( error ) {
698924 logger . error (
0 commit comments