Skip to content

Commit 9be287e

Browse files
committed
fix: skip unsynced orgs in syncOrganizationMeta to prevent scheduler blocking
syncOrganizationMeta (V1 and V2) loops over every subscribed org and awaits datalayer.getStoreIfUpdated sequentially. When an org's root hash has changed but the new data has not yet propagated to the local datalayer, getStoreIfUpdated descends into syncService.getStoreData's MAX_RETRIES=20 x 10s retry loop — up to ~3.3 minutes per org. With N orgs updating near the same time, the scheduler can stall for N x 3.3 minutes before the task returns. Add a lightweight getDataLayerStoreSyncStatus pre-check per org and skip orgs whose store is not yet fully synced. The scheduler re-runs the task on its normal cadence so skipped orgs catch up on the next pass. Matches the pattern used in reconcileOrganization and governance sync in the companion fix.
1 parent 8faec6d commit 9be287e

3 files changed

Lines changed: 163 additions & 1 deletion

File tree

src/models/organizations/organizations.model.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1342,6 +1342,12 @@ class Organization extends Model {
13421342

13431343
/**
13441344
* Synchronizes metadata for all subscribed organizations.
1345+
*
1346+
* Skips orgs whose orgUid store is not yet fully synced on the datalayer.
1347+
* Without the pre-check, `datalayer.getStoreIfUpdated` can descend into
1348+
* `syncService.getStoreData`'s retry loop (20 × 10 s) for any org whose
1349+
* root hash has changed but data has not yet propagated, which serializes
1350+
* into minutes-per-org of scheduler blocking when multiple orgs update.
13451351
*/
13461352
static async syncOrganizationMeta() {
13471353
try {
@@ -1351,6 +1357,23 @@ class Organization extends Model {
13511357
});
13521358

13531359
for (const organization of allSubscribedOrganizations) {
1360+
if (!USE_SIMULATOR) {
1361+
try {
1362+
const syncStatus = await getDataLayerStoreSyncStatus(organization.orgUid);
1363+
if (!isDlStoreSynced(syncStatus?.sync_status)) {
1364+
logger.info(
1365+
`[v1]: syncOrganizationMeta: org store ${organization.orgUid} not yet synced, skipping this run.`,
1366+
);
1367+
continue;
1368+
}
1369+
} catch (error) {
1370+
logger.warn(
1371+
`[v1]: syncOrganizationMeta: could not check sync status for org store ${organization.orgUid}, skipping this run: ${error.message}`,
1372+
);
1373+
continue;
1374+
}
1375+
}
1376+
13541377
const processData = (data, keyFilter) =>
13551378
data
13561379
.filter(({ key }) => keyFilter(key))

src/models/v2/organizations-v2.model.js

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2067,7 +2067,14 @@ class OrganizationsV2 extends Model {
20672067
}
20682068

20692069
/**
2070-
* Synchronizes metadata for all subscribed organizations
2070+
* Synchronizes metadata for all subscribed organizations.
2071+
*
2072+
* Skips orgs whose orgUid store is not yet fully synced on the datalayer.
2073+
* Without the pre-check, `datalayer.getStoreIfUpdated` can descend into
2074+
* `syncService.getStoreData`'s retry loop (20 × 10 s) for any org whose
2075+
* root hash has changed but data has not yet propagated, which serializes
2076+
* into minutes-per-org of scheduler blocking when multiple orgs update.
2077+
*
20712078
* @returns {Promise<void>}
20722079
*/
20732080
static async syncOrganizationMeta() {
@@ -2078,6 +2085,23 @@ class OrganizationsV2 extends Model {
20782085
});
20792086

20802087
for (const organization of allSubscribedOrganizations) {
2088+
if (!USE_SIMULATOR) {
2089+
try {
2090+
const syncStatus = await datalayer.getDataLayerStoreSyncStatus(organization.org_uid);
2091+
if (!isDlStoreSynced(syncStatus?.sync_status)) {
2092+
loggerV2.info(
2093+
`[v2]: syncOrganizationMeta: org store ${organization.org_uid} not yet synced, skipping this run.`,
2094+
);
2095+
continue;
2096+
}
2097+
} catch (error) {
2098+
loggerV2.warn(
2099+
`[v2]: syncOrganizationMeta: could not check sync status for org store ${organization.org_uid}, skipping this run: ${error.message}`,
2100+
);
2101+
continue;
2102+
}
2103+
}
2104+
20812105
const processData = (data, keyFilter) =>
20822106
data
20832107
.filter(({ key }) => keyFilter(key))
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { expect } from 'chai';
2+
import { prepareV2Db } from '../../../src/database/v2/index.js';
3+
import { prepareDb } from '../../../src/database/index.js';
4+
import { OrganizationsV2 } from '../../../src/models/v2/index.js';
5+
import { Organization } from '../../../src/models/organizations/organizations.model.js';
6+
import TaskManager from '../../../src/tasks/index.js';
7+
8+
/**
9+
* sync-organization-meta non-blocking behavior tests
10+
*
11+
* Verifies that syncOrganizationMeta (V1 and V2) completes quickly in
12+
* simulator mode and that the method contract is preserved. The unsynced-
13+
* store pre-check is wrapped in `if (!USE_SIMULATOR)`, so production-mode
14+
* behavior is covered by the live integration tests. What we can assert
15+
* here in simulator mode is that:
16+
*
17+
* 1. The method exists and returns without error for empty inputs
18+
* 2. Subscribed orgs are queried from the DB
19+
* 3. The pre-check path does not introduce regressions in simulator mode
20+
*/
21+
describe('syncOrganizationMeta non-blocking behavior', function () {
22+
this.timeout(15000);
23+
24+
before(async function () {
25+
await prepareDb();
26+
await prepareV2Db();
27+
TaskManager.stopAll();
28+
});
29+
30+
afterEach(async function () {
31+
await OrganizationsV2.destroy({ where: {} });
32+
});
33+
34+
// ─────────────────────────────────────────────────────────
35+
// V1 Organization.syncOrganizationMeta
36+
// ─────────────────────────────────────────────────────────
37+
38+
describe('V1 Organization.syncOrganizationMeta', function () {
39+
it('should be available as a static method', function () {
40+
expect(Organization.syncOrganizationMeta).to.be.a('function');
41+
});
42+
43+
it('should complete without error when no subscribed orgs exist', async function () {
44+
let threw = false;
45+
try {
46+
await Organization.syncOrganizationMeta();
47+
} catch {
48+
threw = true;
49+
}
50+
expect(threw).to.be.false;
51+
});
52+
53+
it('should complete quickly in simulator mode', async function () {
54+
const start = Date.now();
55+
await Organization.syncOrganizationMeta();
56+
expect(Date.now() - start).to.be.below(1000);
57+
});
58+
});
59+
60+
// ─────────────────────────────────────────────────────────
61+
// V2 OrganizationsV2.syncOrganizationMeta
62+
// ─────────────────────────────────────────────────────────
63+
64+
describe('V2 OrganizationsV2.syncOrganizationMeta', function () {
65+
it('should be available as a static method', function () {
66+
expect(OrganizationsV2.syncOrganizationMeta).to.be.a('function');
67+
});
68+
69+
it('should complete without error when no subscribed orgs exist', async function () {
70+
let threw = false;
71+
try {
72+
await OrganizationsV2.syncOrganizationMeta();
73+
} catch {
74+
threw = true;
75+
}
76+
expect(threw).to.be.false;
77+
});
78+
79+
it('should complete quickly in simulator mode', async function () {
80+
const start = Date.now();
81+
await OrganizationsV2.syncOrganizationMeta();
82+
expect(Date.now() - start).to.be.below(1000);
83+
});
84+
85+
it('should skip unsubscribed orgs entirely', async function () {
86+
await OrganizationsV2.create({
87+
org_uid: 'unsubscribed-org',
88+
name: 'Unsub',
89+
is_home: false,
90+
subscribed: false,
91+
synced: false,
92+
sync_remaining: 0,
93+
balance: '0',
94+
pending_balance: '0',
95+
metadata: '{}',
96+
});
97+
98+
// Method queries `where: { subscribed: true }`, so this org is skipped.
99+
const subscribedOrgs = await OrganizationsV2.findAll({
100+
where: { subscribed: true },
101+
raw: true,
102+
});
103+
expect(subscribedOrgs).to.have.length(0);
104+
105+
// And the method call itself still succeeds.
106+
let threw = false;
107+
try {
108+
await OrganizationsV2.syncOrganizationMeta();
109+
} catch {
110+
threw = true;
111+
}
112+
expect(threw).to.be.false;
113+
});
114+
});
115+
});

0 commit comments

Comments
 (0)