Skip to content

Commit b35bf3b

Browse files
committed
feat(APP): add ONLY_CADT_SUBSCRIPTIONS orglist reconcile
When enabled, default-organization sync keeps DataLayer subscriptions aligned with the governance orgList: unsubscribe removed orgs (data retained), re-subscribe orgs on the list, and skip removal when the cached orgList is empty. Validate tasks defer unsubscribe for orglist orgs so re-subscribe is not torn down mid-flight.
1 parent 0d9a979 commit b35bf3b

9 files changed

Lines changed: 527 additions & 8 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ In the `CHIA_ROOT` directory (usually `~/.chia/mainnet` on Linux), CADT will add
277277
* **DATALAYER_FILE_SERVER_URL**: Publicly available URL and port where Chia Datalayer [files are served](#datalayer-http-file-serving), including schema (http:// or https://). If serving DataLayer files from S3, this would be the public URL of the S3 bucket. Port can be omitted if using standard ports for http or https requests.
278278
* **AUTO_SUBSCRIBE_FILESTORE**: Subscribing to the filestore for any organization is optional. To automatically subscribe and sync the filestore to every organization you subscribe to, set this to `true`.
279279
* **AUTO_MIRROR_EXTERNAL_STORES**: When set to true (the default), CADT will automatically create mirrors for each store you are subscribed to. Mirroring all subscriptions using the `DATALAYER_FILE_SERVER_URL` will make the entire CADT network more resilient and distributed. Note: `DATALAYER_FILE_SERVER_URL` must also be set to a valid URL or IP address for mirrors to be created. Both settings are required for external store mirroring to function.
280+
* **ONLY_CADT_SUBSCRIPTIONS**: When set to `true`, CADT keeps DataLayer subscriptions aligned with the governance **orgList** in both directions. Organizations removed from the orgList are unsubscribed from DataLayer (`subscribed: false`); already-synced registry data on this node is **not** deleted. Organizations on the orgList that are not subscribed are subscribed (including orgs re-added after a prior removal, including orgs previously removed via the API delete flow). The home organization and governance body store are never auto-unsubscribed. Reconciliation runs only when a **non-empty** orgList is present locally (after a successful governance sync); an empty orgList is treated as “not ready” and does not trigger unsubscribes. Default `false`. While enabled, a manual unsubscribe of an org still listed on the orgList will be reverted on the next sync cycle.
280281
* **LOG_LEVEL**: Controls verbosity of logging. Common settings are `info` and `debug`. Setting to `silly` will log all queries.
281282
* **TASKS**: Section for configuring sync intervals.
282283
* **GOVERNANCE_SYNC_TASK_INTERVAL**: Syncs picklist, orgList, and glossary from the governance node. Default 30 seconds.

src/tasks/sync-default-organizations-v2.js

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ import {
66
import { getDefaultOrganizationListV2 } from '../utils/v2-data-loaders.js';
77
import { MetaV2, OrganizationsV2 } from '../models/v2/index.js';
88
import { loggerV2 } from '../config/logger.js';
9-
import { getConfig } from '../utils/config-loader.js';
9+
import { getConfig, getConfigV2 } from '../utils/config-loader.js';
10+
import {
11+
buildOrgListAllowSet,
12+
unsubscribeOrgsNotInOrgList,
13+
} from '../utils/orglist-subscription-reconcile.js';
1014

1115
const CONFIG = getConfig().APP;
1216

@@ -26,12 +30,17 @@ const task = new Task('sync-default-organizations-v2', async () => {
2630
// them.
2731
const defaultOrgList = await getDefaultOrganizationListV2();
2832
const userDeletedOrgs = await MetaV2.getUserDeletedOrgUids();
33+
const onlyCadtSubscriptions = CONFIG.ONLY_CADT_SUBSCRIPTIONS === true;
2934

3035
const pending = [];
3136
const imported = [];
37+
const resubscribePending = [];
3238

3339
for (const { orgUid } of defaultOrgList) {
34-
if (userDeletedOrgs?.includes(orgUid)) {
40+
if (
41+
!onlyCadtSubscriptions &&
42+
userDeletedOrgs?.includes(orgUid)
43+
) {
3544
loggerV2.verbose(
3645
`default organization ${orgUid} has been explicitly removed from this instance. not adding or checking that it exists`,
3746
);
@@ -47,6 +56,13 @@ const task = new Task('sync-default-organizations-v2', async () => {
4756
pending.push(orgUid);
4857
} else {
4958
imported.push(orgUid);
59+
if (
60+
onlyCadtSubscriptions &&
61+
defaultOrgList.length > 0 &&
62+
!Boolean(organization.subscribed)
63+
) {
64+
resubscribePending.push(orgUid);
65+
}
5066
}
5167
}
5268

@@ -91,6 +107,41 @@ const task = new Task('sync-default-organizations-v2', async () => {
91107
);
92108
}
93109
});
110+
111+
const resubscribeResults = await Promise.allSettled(
112+
resubscribePending.map(async (orgUid) => {
113+
await OrganizationsV2.subscribeToOrganization(orgUid);
114+
loggerV2.info(
115+
`[v2]: ONLY_CADT_SUBSCRIPTIONS: re-subscribed organization ${orgUid}`,
116+
);
117+
}),
118+
);
119+
resubscribeResults.forEach((result, i) => {
120+
if (result.status === 'rejected') {
121+
loggerV2.warn(
122+
`[v2]: ONLY_CADT_SUBSCRIPTIONS: failed to re-subscribe organization ${resubscribePending[i]}: ${result.reason?.message || result.reason}. Will retry on next task run.`,
123+
);
124+
}
125+
});
126+
127+
if (onlyCadtSubscriptions) {
128+
const { GOVERNANCE_BODY_ID } = getConfigV2().GOVERNANCE;
129+
const allowSet = buildOrgListAllowSet(defaultOrgList, GOVERNANCE_BODY_ID);
130+
await unsubscribeOrgsNotInOrgList({
131+
defaultOrgList,
132+
allowSet,
133+
organizationModel: OrganizationsV2,
134+
fieldNames: {
135+
orgUid: 'org_uid',
136+
isHome: 'is_home',
137+
subscribed: 'subscribed',
138+
},
139+
unsubscribeFromOrganizationStores:
140+
OrganizationsV2.unsubscribeFromOrganizationStores.bind(OrganizationsV2),
141+
logger: loggerV2,
142+
apiVersionLabel: 'v2',
143+
});
144+
}
94145
}
95146
} catch (error) {
96147
loggerV2.error(

src/tasks/sync-default-organizations.js

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import { getDefaultOrganizationList } from '../utils/data-loaders.js';
77
import { Meta, Organization } from '../models/index.js';
88
import { logger } from '../config/logger.js';
99
import { getConfig } from '../utils/config-loader.js';
10+
import {
11+
buildOrgListAllowSet,
12+
unsubscribeOrgsNotInOrgList,
13+
} from '../utils/orglist-subscription-reconcile.js';
1014

1115
const CONFIG = getConfig();
1216

@@ -26,12 +30,17 @@ const task = new Task('sync-default-organizations', async () => {
2630
// them.
2731
const defaultOrgRecords = await getDefaultOrganizationList();
2832
const userDeletedOrgs = await Meta.getUserDeletedOrgUids();
33+
const onlyCadtSubscriptions = CONFIG.APP.ONLY_CADT_SUBSCRIPTIONS === true;
2934

3035
const pending = [];
3136
const imported = [];
37+
const resubscribePending = [];
3238

3339
for (const { orgUid } of defaultOrgRecords) {
34-
if (userDeletedOrgs?.includes(orgUid)) {
40+
if (
41+
!onlyCadtSubscriptions &&
42+
userDeletedOrgs?.includes(orgUid)
43+
) {
3544
logger.verbose(
3645
`default organization ${orgUid} has been explicitly removed from this instance. not adding or checking that it exists`,
3746
);
@@ -47,6 +56,13 @@ const task = new Task('sync-default-organizations', async () => {
4756
pending.push(orgUid);
4857
} else {
4958
imported.push(orgUid);
59+
if (
60+
onlyCadtSubscriptions &&
61+
defaultOrgRecords.length > 0 &&
62+
!Boolean(organization.subscribed)
63+
) {
64+
resubscribePending.push(orgUid);
65+
}
5066
}
5167
}
5268

@@ -80,6 +96,41 @@ const task = new Task('sync-default-organizations', async () => {
8096
);
8197
}
8298
});
99+
100+
const resubscribeResults = await Promise.allSettled(
101+
resubscribePending.map(async (orgUid) => {
102+
await Organization.subscribeToOrganization(orgUid);
103+
logger.info(
104+
`[v1]: ONLY_CADT_SUBSCRIPTIONS: re-subscribed organization ${orgUid}`,
105+
);
106+
}),
107+
);
108+
resubscribeResults.forEach((result, i) => {
109+
if (result.status === 'rejected') {
110+
logger.warn(
111+
`[v1]: ONLY_CADT_SUBSCRIPTIONS: failed to re-subscribe organization ${resubscribePending[i]}: ${result.reason?.message || result.reason}. Will retry on next task run.`,
112+
);
113+
}
114+
});
115+
116+
if (onlyCadtSubscriptions) {
117+
const { GOVERNANCE_BODY_ID } = CONFIG.GOVERNANCE;
118+
const allowSet = buildOrgListAllowSet(defaultOrgRecords, GOVERNANCE_BODY_ID);
119+
await unsubscribeOrgsNotInOrgList({
120+
defaultOrgList: defaultOrgRecords,
121+
allowSet,
122+
organizationModel: Organization,
123+
fieldNames: {
124+
orgUid: 'orgUid',
125+
isHome: 'isHome',
126+
subscribed: 'subscribed',
127+
},
128+
unsubscribeFromOrganizationStores:
129+
Organization.unsubscribeFromOrganizationStores.bind(Organization),
130+
logger,
131+
apiVersionLabel: 'v1',
132+
});
133+
}
83134
}
84135
} catch (error) {
85136
logger.error(

src/tasks/validate-organization-table-and-subscriptions-v2.js

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import {
55
assertWalletIsSynced,
66
} from '../utils/data-assertions.js';
77
import { loggerV2 } from '../config/logger.js';
8-
import { getConfig } from '../utils/config-loader.js';
8+
import { getConfig, getConfigV2 } from '../utils/config-loader.js';
9+
import { getDefaultOrganizationListV2 } from '../utils/v2-data-loaders.js';
10+
import { buildOrgListAllowSet } from '../utils/orglist-subscription-reconcile.js';
911
import dotenv from 'dotenv';
1012

1113
const CONFIG = getConfig().APP;
@@ -18,6 +20,16 @@ const task = new Task('validate-organization-table-v2', async () => {
1820
await assertWalletIsSynced();
1921

2022
if (!CONFIG.USE_SIMULATOR) {
23+
const onlyCadtSubscriptions = CONFIG.ONLY_CADT_SUBSCRIPTIONS === true;
24+
let orgListAllowSet = null;
25+
if (onlyCadtSubscriptions) {
26+
const defaultOrgList = await getDefaultOrganizationListV2();
27+
if (defaultOrgList.length > 0) {
28+
const { GOVERNANCE_BODY_ID } = getConfigV2().GOVERNANCE;
29+
orgListAllowSet = buildOrgListAllowSet(defaultOrgList, GOVERNANCE_BODY_ID);
30+
}
31+
}
32+
2133
const organizations = await OrganizationsV2.findAll({ raw: true });
2234
loggerV2.info(
2335
'validating V2 organization table record store ids against datalayer store ids',
@@ -48,6 +60,13 @@ const task = new Task('validate-organization-table-v2', async () => {
4860
`failed reconcile organization records and subscriptions for organization ${organization.org_uid}. Error: ${error.message}. `,
4961
);
5062
}
63+
} else if (
64+
onlyCadtSubscriptions &&
65+
orgListAllowSet?.has(organization.org_uid)
66+
) {
67+
loggerV2.verbose(
68+
`[v2]: ONLY_CADT_SUBSCRIPTIONS: skipping validate unsubscribe for orglist org ${organization.org_uid} (sync-default-organizations owns subscription)`,
69+
);
5170
} else {
5271
loggerV2.info(
5372
`organization ${organization.org_uid} is marked as unsubscribed. ensuring all organization stores are unsubscribed`,

src/tasks/validate-organization-table-and-subscriptions.js

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import {
66
} from '../utils/data-assertions';
77
import { logger } from '../config/logger.js';
88
import { getConfig } from '../utils/config-loader';
9-
const CONFIG = getConfig().APP;
9+
import { getDefaultOrganizationList } from '../utils/data-loaders.js';
10+
import { buildOrgListAllowSet } from '../utils/orglist-subscription-reconcile.js';
11+
12+
const CONFIG = getConfig();
1013

1114
import dotenv from 'dotenv';
1215

@@ -17,7 +20,17 @@ const task = new Task('validate-organization-table', async () => {
1720
await assertDataLayerAvailable();
1821
await assertWalletIsSynced();
1922

20-
if (!CONFIG.USE_SIMULATOR) {
23+
if (!CONFIG.APP.USE_SIMULATOR) {
24+
const onlyCadtSubscriptions = CONFIG.APP.ONLY_CADT_SUBSCRIPTIONS === true;
25+
let orgListAllowSet = null;
26+
if (onlyCadtSubscriptions) {
27+
const defaultOrgList = await getDefaultOrganizationList();
28+
if (defaultOrgList.length > 0) {
29+
const { GOVERNANCE_BODY_ID } = CONFIG.GOVERNANCE;
30+
orgListAllowSet = buildOrgListAllowSet(defaultOrgList, GOVERNANCE_BODY_ID);
31+
}
32+
}
33+
2134
const organizations = await Organization.findAll({ raw: true });
2235
logger.info(
2336
'validating organization table record store ids against datalayer store ids',
@@ -48,6 +61,13 @@ const task = new Task('validate-organization-table', async () => {
4861
`failed reconcile organization records and subscriptions for organization ${organization.orgUid}. Error: ${error.message}. `,
4962
);
5063
}
64+
} else if (
65+
onlyCadtSubscriptions &&
66+
orgListAllowSet?.has(organization.orgUid)
67+
) {
68+
logger.verbose(
69+
`[v1]: ONLY_CADT_SUBSCRIPTIONS: skipping validate unsubscribe for orglist org ${organization.orgUid} (sync-default-organizations owns subscription)`,
70+
);
5171
} else {
5272
logger.info(
5373
`organization ${organization.orgUid} is marked as unsubscribed. ensuring all organization stores are unsubscribed`,
@@ -60,7 +80,7 @@ const task = new Task('validate-organization-table', async () => {
6080
} catch (error) {
6181
logger.error(
6282
`failed to validate default organization records and subscriptions. Error ${error.message}. ` +
63-
`Retrying in ${CONFIG?.TASKS?.VALIDATE_ORGANIZATION_TABLE_TASK_INTERVAL || 900} seconds`,
83+
`Retrying in ${CONFIG?.APP?.TASKS?.VALIDATE_ORGANIZATION_TABLE_TASK_INTERVAL || 900} seconds`,
6484
);
6585
}
6686
});
@@ -75,7 +95,7 @@ const task = new Task('validate-organization-table', async () => {
7595
*/
7696
const job = new SimpleIntervalJob(
7797
{
78-
seconds: CONFIG?.TASKS?.VALIDATE_ORGANIZATION_TABLE_TASK_INTERVAL || 900,
98+
seconds: CONFIG?.APP?.TASKS?.VALIDATE_ORGANIZATION_TABLE_TASK_INTERVAL || 900,
7999
runImmediately: true,
80100
},
81101
task,

src/utils/defaultConfig.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ export const defaultConfig = {
2929
DATALAYER_FILE_SERVER_URL: null,
3030
AUTO_SUBSCRIBE_FILESTORE: false,
3131
AUTO_MIRROR_EXTERNAL_STORES: true,
32+
/**
33+
* When true, DataLayer subscriptions are kept in sync with the governance
34+
* orgList: orgs removed from the list are unsubscribed (data retained);
35+
* orgs on the list with subscribed=false are re-subscribed. Requires a
36+
* non-empty orgList from a successful governance sync before any removal.
37+
*/
38+
ONLY_CADT_SUBSCRIPTIONS: false,
3239
LOG_LEVEL: 'info',
3340
TASKS: {
3441
GOVERNANCE_SYNC_TASK_INTERVAL: 30,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
'use strict';
2+
3+
/**
4+
* Build the set of organization UIDs that may remain subscribed when
5+
* ONLY_CADT_SUBSCRIPTIONS is enabled: governance orgList entries plus the
6+
* governance body store (needed to fetch the orgList).
7+
*
8+
* @param {Array<{ orgUid: string }>} defaultOrgList
9+
* @param {string|null|undefined} governanceBodyId
10+
* @returns {Set<string>}
11+
*/
12+
export const buildOrgListAllowSet = (defaultOrgList, governanceBodyId) => {
13+
const allowSet = new Set(
14+
defaultOrgList.map(({ orgUid }) => orgUid).filter(Boolean),
15+
);
16+
if (governanceBodyId) {
17+
allowSet.add(governanceBodyId);
18+
}
19+
return allowSet;
20+
};
21+
22+
/**
23+
* Unsubscribe from organizations that are subscribed locally but no longer
24+
* appear on the governance orgList (and are not the home org or governance body).
25+
* Skips when defaultOrgList is empty (unsynced governance / safety guard).
26+
*
27+
* @param {object} options
28+
* @param {Array<{ orgUid: string }>} options.defaultOrgList
29+
* @param {Set<string>} options.allowSet
30+
* @param {import('sequelize').ModelStatic} options.organizationModel
31+
* @param {{ orgUid: string, isHome: string, subscribed: string }} options.fieldNames
32+
* @param {(organization: object) => Promise<void>} options.unsubscribeFromOrganizationStores
33+
* @param {{ info: Function, warn: Function }} options.logger
34+
* @param {string} [options.apiVersionLabel]
35+
*/
36+
export const unsubscribeOrgsNotInOrgList = async ({
37+
defaultOrgList,
38+
allowSet,
39+
organizationModel,
40+
fieldNames,
41+
unsubscribeFromOrganizationStores,
42+
logger,
43+
apiVersionLabel = 'v2',
44+
}) => {
45+
if (!defaultOrgList.length) {
46+
return;
47+
}
48+
49+
const organizations = await organizationModel.findAll({ raw: true });
50+
for (const organization of organizations) {
51+
const orgUid = organization[fieldNames.orgUid];
52+
if (!orgUid) {
53+
continue;
54+
}
55+
if (organization[fieldNames.isHome]) {
56+
continue;
57+
}
58+
if (allowSet.has(orgUid)) {
59+
continue;
60+
}
61+
const isSubscribed = Boolean(organization[fieldNames.subscribed]);
62+
if (!isSubscribed) {
63+
continue;
64+
}
65+
66+
try {
67+
await unsubscribeFromOrganizationStores(organization);
68+
logger.info(
69+
`[${apiVersionLabel}]: ONLY_CADT_SUBSCRIPTIONS: unsubscribed organization ${orgUid} (removed from governance orgList)`,
70+
);
71+
} catch (error) {
72+
logger.warn(
73+
`[${apiVersionLabel}]: ONLY_CADT_SUBSCRIPTIONS: failed to unsubscribe organization ${orgUid}: ${error.message}. Will retry on next task run.`,
74+
);
75+
}
76+
}
77+
};

0 commit comments

Comments
 (0)