Skip to content

Commit 9c440f0

Browse files
committed
refactor: share mirror in-sync gate between v1 and v2
The matchingUpdatedAtAttr/isMirrorInSync helpers were duplicated verbatim between the v1 and v2 backfill paths, differing only by logger instance and a "[v2]: " log prefix. Extract them into src/database/mirror-sync-gate.js and pass the logger and prefix as parameters so a fix applies to both.
1 parent 902f4f4 commit 9c440f0

3 files changed

Lines changed: 162 additions & 222 deletions

File tree

src/database/index.js

Lines changed: 2 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { seeders } from './seeders';
1313

1414
import dotenv from 'dotenv';
1515
import { installSqlitePragmas } from './sqlite-pragmas.js';
16+
import { matchingUpdatedAtAttr, isMirrorInSync } from './mirror-sync-gate.js';
1617
dotenv.config({ quiet: true });
1718

1819
// possible values: local, test
@@ -428,132 +429,6 @@ export const initMirrorModel = (initFn) => {
428429
*/
429430
const BACKFILL_BATCH_SIZE = 1000;
430431

431-
// Sequelize attribute names for the auto-managed updatedAt column. V1
432-
// models use the default 'updatedAt'; V2 models declare
433-
// `updatedAt: 'updated_at'` with `underscored: true`, which makes
434-
// 'updated_at' the rawAttribute key. Probe both candidates so the same
435-
// gate works against either convention without depending on Sequelize
436-
// internals (_timestampAttributes is undocumented).
437-
const UPDATED_AT_ATTR_CANDIDATES = ['updatedAt', 'updated_at'];
438-
439-
/**
440-
* Find a Sequelize attribute name for the updatedAt column that is
441-
* present on BOTH source and mirror models. Returns the attribute name,
442-
* or null when no consistent name exists (either model is missing the
443-
* column, or they disagree on naming).
444-
*
445-
* Disagreement is intentionally treated as "no gate" rather than picking
446-
* one side: a mismatched name would force one side's MAX() call to
447-
* reference a non-existent column and throw, which would mask a real
448-
* drift behind a swallowed error.
449-
*/
450-
const matchingUpdatedAtAttr = (source, mirror) => {
451-
const sourceAttrs = source.rawAttributes || {};
452-
const mirrorAttrs = mirror.rawAttributes || {};
453-
for (const attr of UPDATED_AT_ATTR_CANDIDATES) {
454-
if (attr in sourceAttrs && attr in mirrorAttrs) {
455-
return attr;
456-
}
457-
}
458-
return null;
459-
};
460-
461-
/**
462-
* Cheap "is the mirror table already caught up?" check used by
463-
* backfillMirror to short-circuit the bulk-upsert pass when there is
464-
* nothing to do. The orphan sweep runs unconditionally BEFORE this
465-
* helper so PK-swap drift (count and MAX preserved, row identities
466-
* differ) is exposed as a post-sweep count mismatch and falls through
467-
* to the upsert. See the call site for the full ordering rationale.
468-
*
469-
* Returns true ONLY when BOTH conditions hold:
470-
* 1. count(source) === count(mirror)
471-
* 2. Either both counts are 0 (truly empty on both sides), OR
472-
* max(mirror[updatedAtAttr]) >= max(source[updatedAtAttr])
473-
*
474-
* Returns false in every other case (including any error), so the
475-
* caller falls through to the existing full upsert. Crucially, false
476-
* is the safe default: a false negative just causes the existing
477-
* (correct) full sync to run, while a false positive would silently
478-
* leave the mirror stale. We deliberately bias toward the
479-
* cheap-but-fully-correct fall-through.
480-
*
481-
* Known limitation - non-max-row UPDATE drift: if a row is updated in
482-
* source via raw SQL with an updatedAt that's strictly below the
483-
* table's existing MAX(updatedAt), the gate can't see the change.
484-
* Sequelize-driven UPDATEs auto-bump updatedAt to NOW() (necessarily
485-
* greater than any prior MAX), so this only happens via raw SQL that
486-
* bypasses the ORM or via clock-skew adjustments. No such code path
487-
* exists in CADT today. If composite drift patterns become a concern,
488-
* replace this with a SUM(UNIX_TIMESTAMP(updatedAt)) checksum or a
489-
* per-table hash digest.
490-
*/
491-
const isMirrorInSync = async (source, mirror, name, updatedAtAttr) => {
492-
try {
493-
const [sourceCount, mirrorCount, sourceMax, mirrorMax] = await Promise.all([
494-
source.count(),
495-
mirror.count(),
496-
source.max(updatedAtAttr),
497-
mirror.max(updatedAtAttr),
498-
]);
499-
500-
if (sourceCount !== mirrorCount) {
501-
logger.debug(
502-
`Mirror backfill: ${name} - gate failed (count mismatch: source=${sourceCount} mirror=${mirrorCount})`,
503-
);
504-
return false;
505-
}
506-
507-
if (sourceCount === 0) {
508-
logger.debug(
509-
`Mirror backfill: ${name} - in sync, skipping (empty on both sides)`,
510-
);
511-
return true;
512-
}
513-
514-
// Counts agree and are non-zero. A null MAX(updatedAt) at this
515-
// point means rows exist with null timestamps - we can't compare
516-
// freshness, so fall through to the full sync rather than skip.
517-
if (sourceMax == null || mirrorMax == null) {
518-
logger.debug(
519-
`Mirror backfill: ${name} - gate failed (max(updatedAt) null with ${sourceCount} rows)`,
520-
);
521-
return false;
522-
}
523-
524-
// Sequelize.max returns either a Date (for DATE columns) or whatever
525-
// raw value the dialect returned. Coerce through Date so SQLite string
526-
// timestamps and MySQL Date instances compare consistently.
527-
const sourceMs = new Date(sourceMax).getTime();
528-
const mirrorMs = new Date(mirrorMax).getTime();
529-
if (Number.isNaN(sourceMs) || Number.isNaN(mirrorMs)) {
530-
logger.debug(
531-
`Mirror backfill: ${name} - gate failed (non-parseable max(updatedAt))`,
532-
);
533-
return false;
534-
}
535-
536-
if (mirrorMs >= sourceMs) {
537-
logger.debug(
538-
`Mirror backfill: ${name} - in sync, skipping (${sourceCount} rows, max(updatedAt) mirror=${mirrorMs} >= source=${sourceMs})`,
539-
);
540-
return true;
541-
}
542-
543-
logger.debug(
544-
`Mirror backfill: ${name} - gate failed (mirror max(updatedAt)=${mirrorMs} < source=${sourceMs})`,
545-
);
546-
return false;
547-
} catch (error) {
548-
// Never block the existing sync path on a gate error - just fall
549-
// through to the full upsert.
550-
logger.debug(
551-
`Mirror backfill: ${name} - gate check failed (${error.message}), falling through to full sync`,
552-
);
553-
return false;
554-
}
555-
};
556-
557432
// NOTE: when this early-returns for composite PKs, the in-sync gate
558433
// that runs after it in backfillMirror operates on raw (un-swept)
559434
// state. mirror-model-init.spec.js currently enforces single-column
@@ -809,6 +684,7 @@ const runBackfillMirror = async () => {
809684
mirror,
810685
name,
811686
updatedAtAttr,
687+
logger,
812688
);
813689
if (inSync) {
814690
totalGateSkipped += 1;

src/database/mirror-sync-gate.js

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
'use strict';
2+
3+
/**
4+
* Shared "is the mirror table already caught up?" gate used by both the V1
5+
* (`backfillMirror`) and V2 (`backfillMirrorV2`) backfill loops. The two code
6+
* paths previously carried verbatim copies of this logic; they now share this
7+
* single implementation so a fix applies to both.
8+
*
9+
* The only per-caller differences are the logger instance and the log-message
10+
* prefix (`''` for V1, `'[v2]: '` for V2), both passed in as parameters.
11+
*/
12+
13+
// Sequelize attribute names for the auto-managed updatedAt column. V1
14+
// models use the default 'updatedAt'; V2 models declare
15+
// `updatedAt: 'updated_at'` with `underscored: true`, which makes
16+
// 'updated_at' the rawAttribute key. Probe both candidates so the same
17+
// gate works against either convention without depending on Sequelize
18+
// internals (_timestampAttributes is undocumented).
19+
export const UPDATED_AT_ATTR_CANDIDATES = ['updatedAt', 'updated_at'];
20+
21+
/**
22+
* Find a Sequelize attribute name for the updatedAt column that is
23+
* present on BOTH source and mirror models. Returns the attribute name,
24+
* or null when no consistent name exists (either model is missing the
25+
* column, or they disagree on naming).
26+
*
27+
* Disagreement is intentionally treated as "no gate" rather than picking
28+
* one side: a mismatched name would force one side's MAX() call to
29+
* reference a non-existent column and throw, which would mask a real
30+
* drift behind a swallowed error.
31+
*/
32+
export const matchingUpdatedAtAttr = (source, mirror) => {
33+
const sourceAttrs = source.rawAttributes || {};
34+
const mirrorAttrs = mirror.rawAttributes || {};
35+
for (const attr of UPDATED_AT_ATTR_CANDIDATES) {
36+
if (attr in sourceAttrs && attr in mirrorAttrs) {
37+
return attr;
38+
}
39+
}
40+
return null;
41+
};
42+
43+
/**
44+
* Cheap "is the mirror table already caught up?" check used by
45+
* backfillMirror to short-circuit the bulk-upsert pass when there is
46+
* nothing to do. The orphan sweep runs unconditionally BEFORE this
47+
* helper so PK-swap drift (count and MAX preserved, row identities
48+
* differ) is exposed as a post-sweep count mismatch and falls through
49+
* to the upsert. See the call site for the full ordering rationale.
50+
*
51+
* Returns true ONLY when BOTH conditions hold:
52+
* 1. count(source) === count(mirror)
53+
* 2. Either both counts are 0 (truly empty on both sides), OR
54+
* max(mirror[updatedAtAttr]) >= max(source[updatedAtAttr])
55+
*
56+
* Returns false in every other case (including any error), so the
57+
* caller falls through to the existing full upsert. Crucially, false
58+
* is the safe default: a false negative just causes the existing
59+
* (correct) full sync to run, while a false positive would silently
60+
* leave the mirror stale. We deliberately bias toward the
61+
* cheap-but-fully-correct fall-through.
62+
*
63+
* Known limitation - non-max-row UPDATE drift: if a row is updated in
64+
* source via raw SQL with an updatedAt that's strictly below the
65+
* table's existing MAX(updatedAt), the gate can't see the change.
66+
* Sequelize-driven UPDATEs auto-bump updatedAt to NOW() (necessarily
67+
* greater than any prior MAX), so this only happens via raw SQL that
68+
* bypasses the ORM or via clock-skew adjustments. No such code path
69+
* exists in CADT today. If composite drift patterns become a concern,
70+
* replace this with a SUM(UNIX_TIMESTAMP(updatedAt)) checksum or a
71+
* per-table hash digest.
72+
*
73+
* @param {string} [logPrefix=''] - Prepended to every log line so V2 callers
74+
* can keep their '[v2]: ' tag without forking the implementation.
75+
*/
76+
export const isMirrorInSync = async (
77+
source,
78+
mirror,
79+
name,
80+
updatedAtAttr,
81+
logger,
82+
logPrefix = '',
83+
) => {
84+
try {
85+
const [sourceCount, mirrorCount, sourceMax, mirrorMax] = await Promise.all([
86+
source.count(),
87+
mirror.count(),
88+
source.max(updatedAtAttr),
89+
mirror.max(updatedAtAttr),
90+
]);
91+
92+
if (sourceCount !== mirrorCount) {
93+
logger.debug(
94+
`${logPrefix}Mirror backfill: ${name} - gate failed (count mismatch: source=${sourceCount} mirror=${mirrorCount})`,
95+
);
96+
return false;
97+
}
98+
99+
if (sourceCount === 0) {
100+
logger.debug(
101+
`${logPrefix}Mirror backfill: ${name} - in sync, skipping (empty on both sides)`,
102+
);
103+
return true;
104+
}
105+
106+
// Counts agree and are non-zero. A null MAX(updatedAt) at this
107+
// point means rows exist with null timestamps - we can't compare
108+
// freshness, so fall through to the full sync rather than skip.
109+
if (sourceMax == null || mirrorMax == null) {
110+
logger.debug(
111+
`${logPrefix}Mirror backfill: ${name} - gate failed (max(updatedAt) null with ${sourceCount} rows)`,
112+
);
113+
return false;
114+
}
115+
116+
// Sequelize.max returns either a Date (for DATE columns) or whatever
117+
// raw value the dialect returned. Coerce through Date so SQLite string
118+
// timestamps and MySQL Date instances compare consistently.
119+
const sourceMs = new Date(sourceMax).getTime();
120+
const mirrorMs = new Date(mirrorMax).getTime();
121+
if (Number.isNaN(sourceMs) || Number.isNaN(mirrorMs)) {
122+
logger.debug(
123+
`${logPrefix}Mirror backfill: ${name} - gate failed (non-parseable max(updatedAt))`,
124+
);
125+
return false;
126+
}
127+
128+
if (mirrorMs >= sourceMs) {
129+
logger.debug(
130+
`${logPrefix}Mirror backfill: ${name} - in sync, skipping (${sourceCount} rows, max(updatedAt) mirror=${mirrorMs} >= source=${sourceMs})`,
131+
);
132+
return true;
133+
}
134+
135+
logger.debug(
136+
`${logPrefix}Mirror backfill: ${name} - gate failed (mirror max(updatedAt)=${mirrorMs} < source=${sourceMs})`,
137+
);
138+
return false;
139+
} catch (error) {
140+
// Never block the existing sync path on a gate error - just fall
141+
// through to the full upsert.
142+
logger.debug(
143+
`${logPrefix}Mirror backfill: ${name} - gate check failed (${error.message}), falling through to full sync`,
144+
);
145+
return false;
146+
}
147+
};

0 commit comments

Comments
 (0)