Skip to content

Commit 38db571

Browse files
committed
fix: synchronize mirror writes within transactional sync path
safeMirrorDbHandler resolves immediately via finally { resolve() } before the callback executes, making every mirror write fire-and-forget. When createAndProcessTransaction calls mirrorTransaction.commit(), the detached writes may not have completed, producing "commit has been called on this transaction" errors and leaving the mirror with stale data. Add mirrorWrite/mirrorWriteV2 helpers that run the callback directly (awaited) when a mirrorTransaction is provided, bypassing the authenticate/reconnect overhead since the caller already started the transaction. Non-transactional API writes keep the existing fire-and-forget path via safeMirrorDbHandler.
1 parent b6cfccd commit 38db571

37 files changed

Lines changed: 366 additions & 323 deletions

src/database/index.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,25 @@ export const safeMirrorDbHandler = (callback) => {
351351
});
352352
};
353353

354+
// When a mirrorTransaction is provided the caller already authenticated and
355+
// started the transaction (see createAndProcessTransaction in
356+
// sync-registries), so we can run the callback directly and synchronously.
357+
// This avoids the fire-and-forget race in safeMirrorDbHandler where the
358+
// transaction is committed before detached writes complete. When no
359+
// mirrorTransaction is present we fall back to the existing best-effort
360+
// fire-and-forget path so non-transactional API writes are unaffected.
361+
export const mirrorWrite = async (callback, mirrorTransaction) => {
362+
if (mirrorTransaction) {
363+
try {
364+
await callback();
365+
} catch (e) {
366+
logger.error(`mirror_error:${e.message}`);
367+
}
368+
} else {
369+
safeMirrorDbHandler(callback);
370+
}
371+
};
372+
354373
// Initialize a V1 mirror Sequelize Model synchronously at module-load time.
355374
//
356375
// Model.init() only registers schema metadata on the Sequelize instance; it

src/database/v2/index.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,21 @@ export const safeMirrorDbHandlerV2 = (callback) => {
324324
});
325325
};
326326

327+
// Synchronous mirror write when a mirrorTransaction is provided; fire-and-
328+
// forget via safeMirrorDbHandlerV2 otherwise. See mirrorWrite in the V1
329+
// database module for full rationale.
330+
export const mirrorWriteV2 = async (callback, mirrorTransaction) => {
331+
if (mirrorTransaction) {
332+
try {
333+
await callback();
334+
} catch (e) {
335+
loggerV2.error(`v2_mirror_error:${e.message}`);
336+
}
337+
} else {
338+
safeMirrorDbHandlerV2(callback);
339+
}
340+
};
341+
327342
// Initialize a V2 mirror Sequelize Model synchronously at module-load time.
328343
//
329344
// Model.init() only registers schema metadata on the Sequelize instance; it

src/models/audit/audit.model.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
import { Sequelize, Model } from 'sequelize';
4-
import { sequelize, safeMirrorDbHandler } from '../../database';
4+
import { sequelize, mirrorWrite } from '../../database';
55
import { AuditMirror } from './audit.model.mirror';
66
import ModelTypes from './audit.modeltypes.js';
77
import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js';
@@ -10,46 +10,46 @@ import { waitForSyncRegistriesTransaction } from '../../utils/model-utils.js';
1010

1111
class Audit extends Model {
1212
static async create(values, options) {
13-
safeMirrorDbHandler(async () => {
13+
await mirrorWrite(async () => {
1414
const mirrorOptions = {
1515
...options,
1616
transaction: options?.mirrorTransaction,
1717
};
1818
await AuditMirror.create(values, mirrorOptions);
19-
});
19+
}, options?.mirrorTransaction);
2020
return super.create(values, options);
2121
}
2222

2323
static async bulkCreate(values, options) {
24-
safeMirrorDbHandler(async () => {
24+
await mirrorWrite(async () => {
2525
const mirrorOptions = {
2626
...options,
2727
transaction: options?.mirrorTransaction,
2828
};
2929
await AuditMirror.bulkCreate(values, mirrorOptions);
30-
});
30+
}, options?.mirrorTransaction);
3131
return super.bulkCreate(values, options);
3232
}
3333

3434
static async destroy(options) {
35-
safeMirrorDbHandler(async () => {
35+
await mirrorWrite(async () => {
3636
const mirrorOptions = {
3737
...options,
3838
transaction: options?.mirrorTransaction,
3939
};
4040
await AuditMirror.destroy(mirrorOptions);
41-
});
41+
}, options?.mirrorTransaction);
4242
return super.destroy(options);
4343
}
4444

4545
static async upsert(values, options) {
46-
safeMirrorDbHandler(async () => {
46+
await mirrorWrite(async () => {
4747
const mirrorOptions = {
4848
...options,
4949
transaction: options?.mirrorTransaction,
5050
};
5151
await AuditMirror.upsert(values, mirrorOptions);
52-
});
52+
}, options?.mirrorTransaction);
5353
return super.upsert(values, options);
5454
}
5555

src/models/co-benefits/co-benefits.model.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { Model } from 'sequelize';
33

44
import { CoBenefitMirror } from './co-benefits.model.mirror';
5-
import { sequelize, safeMirrorDbHandler } from '../../database';
5+
import { sequelize, safeMirrorDbHandler, mirrorWrite } from '../../database';
66
import { Project } from '../projects';
77
import ModelTypes from './co-benefits.modeltypes.js';
88

@@ -24,35 +24,35 @@ class CoBenefit extends Model {
2424
}
2525

2626
static async create(values, options) {
27-
safeMirrorDbHandler(async () => {
27+
await mirrorWrite(async () => {
2828
const mirrorOptions = {
2929
...options,
3030
transaction: options?.mirrorTransaction,
3131
};
3232
await CoBenefitMirror.create(values, mirrorOptions);
33-
});
33+
}, options?.mirrorTransaction);
3434
return super.create(values, options);
3535
}
3636

3737
static async upsert(values, options) {
38-
safeMirrorDbHandler(async () => {
38+
await mirrorWrite(async () => {
3939
const mirrorOptions = {
4040
...options,
4141
transaction: options?.mirrorTransaction,
4242
};
4343
await CoBenefitMirror.upsert(values, mirrorOptions);
44-
});
44+
}, options?.mirrorTransaction);
4545
return super.upsert(values, options);
4646
}
4747

4848
static async destroy(options) {
49-
safeMirrorDbHandler(async () => {
49+
await mirrorWrite(async () => {
5050
const mirrorOptions = {
5151
...options,
5252
transaction: options?.mirrorTransaction,
5353
};
5454
await CoBenefitMirror.destroy(mirrorOptions);
55-
});
55+
}, options?.mirrorTransaction);
5656
return super.destroy(options);
5757
}
5858
}

src/models/estimations/estimations.model.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { Model } from 'sequelize';
33

44
import { EstimationMirror } from './estimations.model.mirror';
5-
import { sequelize, safeMirrorDbHandler } from '../../database';
5+
import { sequelize, safeMirrorDbHandler, mirrorWrite } from '../../database';
66
import { Project } from '../projects';
77
import ModelTypes from './estimations.modeltypes.js';
88

@@ -24,36 +24,36 @@ class Estimation extends Model {
2424
}
2525

2626
static async create(values, options) {
27-
safeMirrorDbHandler(async () => {
27+
await mirrorWrite(async () => {
2828
const mirrorOptions = {
2929
...options,
3030
transaction: options?.mirrorTransaction,
3131
};
3232

3333
await EstimationMirror.create(values, mirrorOptions);
34-
});
34+
}, options?.mirrorTransaction);
3535
return super.create(values, options);
3636
}
3737

3838
static async upsert(values, options) {
39-
safeMirrorDbHandler(async () => {
39+
await mirrorWrite(async () => {
4040
const mirrorOptions = {
4141
...options,
4242
transaction: options?.mirrorTransaction,
4343
};
4444
await EstimationMirror.upsert(values, mirrorOptions);
45-
});
45+
}, options?.mirrorTransaction);
4646
return super.upsert(values, options);
4747
}
4848

4949
static async destroy(options) {
50-
safeMirrorDbHandler(async () => {
50+
await mirrorWrite(async () => {
5151
const mirrorOptions = {
5252
...options,
5353
transaction: options?.mirrorTransaction,
5454
};
5555
await EstimationMirror.destroy(mirrorOptions);
56-
});
56+
}, options?.mirrorTransaction);
5757
return super.destroy(options);
5858
}
5959
}

src/models/issuances/issuances.model.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22
import { Model } from 'sequelize';
3-
import { sequelize, safeMirrorDbHandler } from '../../database';
3+
import { sequelize, safeMirrorDbHandler, mirrorWrite } from '../../database';
44
import { Project, Unit } from '..';
55

66
import ModelTypes from './issuances.modeltypes.js';
@@ -31,35 +31,35 @@ class Issuance extends Model {
3131
}
3232

3333
static async create(values, options) {
34-
safeMirrorDbHandler(async () => {
34+
await mirrorWrite(async () => {
3535
const mirrorOptions = {
3636
...options,
3737
transaction: options?.mirrorTransaction,
3838
};
3939
await IssuanceMirror.create(values, mirrorOptions);
40-
});
40+
}, options?.mirrorTransaction);
4141
return super.create(values, options);
4242
}
4343

4444
static async destroy(options) {
45-
safeMirrorDbHandler(async () => {
45+
await mirrorWrite(async () => {
4646
const mirrorOptions = {
4747
...options,
4848
transaction: options?.mirrorTransaction,
4949
};
5050
await IssuanceMirror.destroy(mirrorOptions);
51-
});
51+
}, options?.mirrorTransaction);
5252
return super.destroy(options);
5353
}
5454

5555
static async upsert(values, options) {
56-
safeMirrorDbHandler(async () => {
56+
await mirrorWrite(async () => {
5757
const mirrorOptions = {
5858
...options,
5959
transaction: options?.mirrorTransaction,
6060
};
6161
await IssuanceMirror.upsert(values, mirrorOptions);
62-
});
62+
}, options?.mirrorTransaction);
6363
return super.upsert(values, options);
6464
}
6565
}

src/models/labelUnits/labelUnits.model.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,42 @@
11
'use strict';
22

33
import { Model } from 'sequelize';
4-
import { sequelize, safeMirrorDbHandler } from '../../database';
4+
import { sequelize, mirrorWrite } from '../../database';
55

66
import ModelTypes from './labelUnits.modeltypes.js';
77
import { LabelUnitMirror } from './labelUnits.model.mirror';
88

99
class LabelUnit extends Model {
1010
static async create(values, options) {
11-
safeMirrorDbHandler(async () => {
11+
await mirrorWrite(async () => {
1212
const mirrorOptions = {
1313
...options,
1414
transaction: options?.mirrorTransaction,
1515
};
1616
await LabelUnitMirror.create(values, mirrorOptions);
17-
});
17+
}, options?.mirrorTransaction);
1818
return super.create(values, options);
1919
}
2020

2121
static async destroy(options) {
22-
safeMirrorDbHandler(async () => {
22+
await mirrorWrite(async () => {
2323
const mirrorOptions = {
2424
...options,
2525
transaction: options?.mirrorTransaction,
2626
};
2727
await LabelUnitMirror.destroy(mirrorOptions);
28-
});
28+
}, options?.mirrorTransaction);
2929
return super.destroy(options);
3030
}
3131

3232
static async upsert(values, options) {
33-
safeMirrorDbHandler(async () => {
33+
await mirrorWrite(async () => {
3434
const mirrorOptions = {
3535
...options,
3636
transaction: options?.mirrorTransaction,
3737
};
3838
await LabelUnitMirror.upsert(values, mirrorOptions);
39-
});
39+
}, options?.mirrorTransaction);
4040
return super.upsert(values, options);
4141
}
4242
}

src/models/labels/labels.model.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
import { Model } from 'sequelize';
4-
import { sequelize, safeMirrorDbHandler } from '../../database';
4+
import { sequelize, safeMirrorDbHandler, mirrorWrite } from '../../database';
55
import { Project } from '../projects';
66
import { Unit } from '../units';
77

@@ -37,35 +37,35 @@ class Label extends Model {
3737
}
3838

3939
static async create(values, options) {
40-
safeMirrorDbHandler(async () => {
40+
await mirrorWrite(async () => {
4141
const mirrorOptions = {
4242
...options,
4343
transaction: options?.mirrorTransaction,
4444
};
4545
await LabelMirror.create(values, mirrorOptions);
46-
});
46+
}, options?.mirrorTransaction);
4747
return super.create(values, options);
4848
}
4949

5050
static async destroy(options) {
51-
safeMirrorDbHandler(async () => {
51+
await mirrorWrite(async () => {
5252
const mirrorOptions = {
5353
...options,
5454
transaction: options?.mirrorTransaction,
5555
};
5656
await LabelMirror.destroy(mirrorOptions);
57-
});
57+
}, options?.mirrorTransaction);
5858
return super.destroy(options);
5959
}
6060

6161
static async upsert(values, options) {
62-
safeMirrorDbHandler(async () => {
62+
await mirrorWrite(async () => {
6363
const mirrorOptions = {
6464
...options,
6565
transaction: options?.mirrorTransaction,
6666
};
6767
await LabelMirror.upsert(values, mirrorOptions);
68-
});
68+
}, options?.mirrorTransaction);
6969
return super.upsert(values, options);
7070
}
7171
}

0 commit comments

Comments
 (0)