Skip to content

Commit 6e76235

Browse files
committed
fix: make background sync non-blocking on unsynced stores
Background reconciliation and governance sync tasks could block for up to 10 minutes per call on known-unsynced datalayer stores, and the V1 governance sync recursively retried up to 50 times on every failure. This caused visible startup and task-cadence stalls. - Add sync-status pre-checks to V1 and V2 reconcileOrganization: subscribe to the org store so it starts syncing, then skip reconcile on the next task run if either the org store or its derived singleton is unsynced. - Replace recursive retry in V1/V2 governance sync with a single attempt per task invocation. On failure the function logs and returns; the scheduler re-runs the task on its normal cadence, so the coroutine never blocks its scheduler slot. - Skip governance sync immediately when the body store or its versioned governance store reports unsynced, preserving cached governance data. - Keep the production wait-loop in getRegistryStoreIdFromSingleton so request-path callers (subscribe controller, import flow) still work on newly-discovered stores; the reconcile pre-check makes the wait a no-op on the background path. - Standardize the governance sync interval to 300 seconds (5 minutes) across defaults, task fallbacks, example env, and test config. The sync pre-check is a single lightweight RPC so running every 5 minutes is safe. - Add focused tests for reconcile and governance non-blocking behavior, idempotency, and schedule configuration.
1 parent 8faec6d commit 6e76235

12 files changed

Lines changed: 627 additions & 124 deletions

docker-compose-example.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ services:
3131
- LOG_LEVEL=info
3232

3333
# APP.TASKS settings
34-
- GOVERNANCE_SYNC_TASK_INTERVAL=86400
34+
- GOVERNANCE_SYNC_TASK_INTERVAL=300
3535
- ORGANIZATION_META_SYNC_TASK_INTERVAL=300
3636
- PICKLIST_SYNC_TASK_INTERVAL=60
3737
- MIRROR_CHECK_TASK_INTERVAL=86460

src/models/governance/governance.model.js

Lines changed: 73 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Sequelize, Model } from 'sequelize';
44
import { sequelize } from '../../database';
55
import { Meta } from '../../models';
66
import datalayer from '../../datalayer';
7-
import { keyValueToChangeList } from '../../utils/datalayer-utils';
7+
import { keyValueToChangeList, isDlStoreSynced } from '../../utils/datalayer-utils';
88
import { getConfig } from '../../utils/config-loader';
99
import { logger } from '../../config/logger.js';
1010
import {
@@ -155,83 +155,105 @@ class Governance extends Model {
155155
await Promise.all(updates.map(async (update) => Governance.upsert(update)));
156156
}
157157

158-
static async sync(retryCounter = 0) {
159-
try {
160-
logger.debug('running governance model sync()');
158+
static async sync() {
159+
logger.debug('[v1]: running governance model sync()');
161160

162-
if (!GOVERNANCE_BODY_ID) {
163-
throw new Error('Missing information in env to sync Governance data');
164-
}
161+
// Check simulator/dev mode first to match V2 behavior and avoid errors
162+
// in test/dev environments that may not have GOVERNANCE_BODY_ID configured.
163+
if (USE_SIMULATOR || USE_DEVELOPMENT_MODE) {
164+
logger.info('[v1]: SIMULATOR/TESTNET MODE: Using sample picklist');
165+
await Governance.upsert({
166+
metaKey: 'pickList',
167+
metaValue: JSON.stringify(PickListStub),
168+
confirmed: true,
169+
});
170+
return;
171+
}
165172

166-
// If on simulator or testnet, use the stubbed picklist data and return
167-
if (USE_SIMULATOR || USE_DEVELOPMENT_MODE) {
168-
logger.info('SIMULATOR/TESTNET MODE: Using sample picklist');
169-
// Await the upsert to ensure transaction completes before returning
170-
await Governance.upsert({
171-
metaKey: 'pickList',
172-
metaValue: JSON.stringify(PickListStub),
173-
confirmed: true,
174-
});
173+
if (!GOVERNANCE_BODY_ID) {
174+
logger.error('[v1]: Missing GOVERNANCE_BODY_ID in env, cannot sync governance data');
175+
return;
176+
}
175177

178+
// Check governance body store sync status before any blocking fetch.
179+
// If the store is not yet synced, return immediately so cached governance
180+
// data remains usable and the next task run retries.
181+
try {
182+
const bodyStoreSyncStatus = await datalayer.getDataLayerStoreSyncStatus(GOVERNANCE_BODY_ID);
183+
if (!isDlStoreSynced(bodyStoreSyncStatus?.sync_status)) {
184+
logger.info(
185+
`[v1]: governance body store ${GOVERNANCE_BODY_ID} not yet synced. Skipping sync, will retry on next task run.`,
186+
);
176187
return;
177188
}
189+
} catch (error) {
190+
logger.warn(
191+
`[v1]: could not check sync status for governance body store ${GOVERNANCE_BODY_ID}: ${error.message}. Skipping sync.`,
192+
);
193+
return;
194+
}
178195

196+
// Single attempt. On any failure, log and return — the scheduler will
197+
// run the task again on its normal cadence. No in-task retry loop so the
198+
// coroutine never blocks its scheduler slot.
199+
try {
179200
const governanceData = await datalayer.getSubscribedStoreData(
180201
GOVERNANCE_BODY_ID,
181202
undefined,
182-
true,
203+
false,
183204
);
184205

185-
// Check if there is v1, v2, v3 ..... and if not, then we assume this is a legacy governance table that isnt versioned
186206
const shouldSyncLegacy = !Object.keys(governanceData).some((key) =>
187207
/^v?[0-9]+$/.test(key),
188208
);
189209

190210
if (shouldSyncLegacy) {
191211
logger.info(
192-
`using legacy governance upsert method for governance store ${GOVERNANCE_BODY_ID}`,
193-
);
194-
await Governance.upsertGovernanceDownload(
195-
GOVERNANCE_BODY_ID,
196-
governanceData,
212+
`[v1]: using legacy governance upsert method for governance store ${GOVERNANCE_BODY_ID}`,
197213
);
214+
await Governance.upsertGovernanceDownload(GOVERNANCE_BODY_ID, governanceData);
215+
return;
198216
}
199217

200-
// Check if the governance data for this version exists
201218
const dataModelVersion = 'v1';
202219
const versionedGovernanceStoreId = governanceData[dataModelVersion];
203-
if (versionedGovernanceStoreId) {
204-
logger.debug(
205-
`getting ${dataModelVersion} governance data from store ${versionedGovernanceStoreId}`,
206-
);
207-
const versionedGovernanceData = await datalayer.getSubscribedStoreData(
208-
versionedGovernanceStoreId,
209-
undefined,
210-
true,
220+
if (!versionedGovernanceStoreId) {
221+
logger.error(
222+
`[v1]: governance data is not available from store ${GOVERNANCE_BODY_ID} for ${dataModelVersion} data model. Skipping sync.`,
211223
);
224+
return;
225+
}
212226

213-
await Governance.upsertGovernanceDownload(
214-
GOVERNANCE_BODY_ID,
215-
versionedGovernanceData,
216-
);
217-
} else {
218-
throw new Error(
219-
`Governance data is not available from store ${GOVERNANCE_BODY_ID} for ${dataModelVersion} data model.`,
227+
// Check versioned governance store sync status before fetching
228+
try {
229+
const versionedSyncStatus = await datalayer.getDataLayerStoreSyncStatus(versionedGovernanceStoreId);
230+
if (!isDlStoreSynced(versionedSyncStatus?.sync_status)) {
231+
logger.info(
232+
`[v1]: versioned governance store ${versionedGovernanceStoreId} not yet synced. Skipping sync, will retry on next task run.`,
233+
);
234+
return;
235+
}
236+
} catch (error) {
237+
logger.warn(
238+
`[v1]: could not check sync status for versioned governance store ${versionedGovernanceStoreId}: ${error.message}. Skipping sync.`,
220239
);
240+
return;
221241
}
242+
243+
logger.debug(
244+
`[v1]: getting ${dataModelVersion} governance data from store ${versionedGovernanceStoreId}`,
245+
);
246+
const versionedGovernanceData = await datalayer.getSubscribedStoreData(
247+
versionedGovernanceStoreId,
248+
undefined,
249+
false,
250+
);
251+
252+
await Governance.upsertGovernanceDownload(GOVERNANCE_BODY_ID, versionedGovernanceData);
222253
} catch (error) {
223-
await new Promise((resolve) => setTimeout(() => resolve(), 5000));
224-
const maxRetry = 50;
225-
if (retryCounter < maxRetry) {
226-
logger.error(
227-
`Error Syncing Governance Data. Retry attempt #${retryCounter + 1}. Retrying. Error:, ${error}`,
228-
);
229-
await Governance.sync(retryCounter + 1);
230-
} else {
231-
logger.error(
232-
`Error Syncing Governance Data. Retry attempts exceeded. This will not have the latest governance data and data sync may be impacted`,
233-
);
234-
}
254+
logger.error(
255+
`[v1]: Error syncing governance data: ${error.message}. Cached governance data will be used until next task run.`,
256+
);
235257
}
236258
}
237259

src/models/organizations/organizations.model.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,53 @@ class Organization extends Model {
830830
}
831831
}
832832

833+
// Subscribe to org store first so it begins syncing, then check sync status
834+
// before entering the blocking subscription flow. If either the org store or
835+
// its derived singleton store is not yet synced, return early so the periodic
836+
// task retries on the next run rather than waiting up to 10 minutes.
837+
try {
838+
await datalayer.subscribeToStoreOnDataLayer(orgUid);
839+
} catch (error) {
840+
logger.warn(
841+
`[v1]: reconcileOrganization: could not subscribe to org store ${orgUid}, skipping reconcile: ${error.message}`,
842+
);
843+
return;
844+
}
845+
846+
try {
847+
const orgSyncStatus = await getDataLayerStoreSyncStatus(orgUid);
848+
if (!isDlStoreSynced(orgSyncStatus?.sync_status)) {
849+
logger.info(
850+
`[v1]: reconcileOrganization: org store ${orgUid} not yet synced, skipping reconcile. Will retry on next task run.`,
851+
);
852+
return;
853+
}
854+
} catch (error) {
855+
logger.warn(
856+
`[v1]: reconcileOrganization: could not check sync status for org store ${orgUid}, skipping reconcile: ${error.message}`,
857+
);
858+
return;
859+
}
860+
861+
// Check the singleton (data model version) store before the blocking fetch inside
862+
// subscribeToOrganization so that an unsynced singleton doesn't stall the background task.
863+
if (dataModelVersionStoreId) {
864+
try {
865+
const singletonSyncStatus = await getDataLayerStoreSyncStatus(dataModelVersionStoreId);
866+
if (!isDlStoreSynced(singletonSyncStatus?.sync_status)) {
867+
logger.info(
868+
`[v1]: reconcileOrganization: singleton store ${dataModelVersionStoreId} for org ${orgUid} not yet synced, skipping reconcile. Will retry on next task run.`,
869+
);
870+
return;
871+
}
872+
} catch (error) {
873+
logger.warn(
874+
`[v1]: reconcileOrganization: could not check sync status for singleton store ${dataModelVersionStoreId}, skipping reconcile: ${error.message}`,
875+
);
876+
return;
877+
}
878+
}
879+
833880
logger.debug(
834881
`running the organization model subscription process on ${orgUid}`,
835882
);

0 commit comments

Comments
 (0)