Skip to content

Commit f97f7d7

Browse files
committed
fix(V2): audit and fix CSV batch upload and XLSX import
Fixes race condition in csvtojson async subscribe by collecting rows synchronously then processing sequentially in a transaction. Adds merge-with-existing for UPDATE rows, org ownership verification, FK existence checks, unknown column stripping, and snake_case header normalization. Removes child record parsing from CSV path (CSV is flat parent-only; XLSX handles multi-entity imports). Fixes transformMetaUid regex to match multi-digit NEW-X placeholders. Addresses CRITICAL-1 (race condition), CRITICAL-2 (no merge), CRITICAL-3 (child parsing), HIGH-1 (no ownership check), HIGH-2 (bulkCreate duplicates), HIGH-3 (single-digit regex), MEDIUM-1b (missing FK checks), and MEDIUM-2 (unknown column pollution).
1 parent d3044c6 commit f97f7d7

10 files changed

Lines changed: 836 additions & 192 deletions

File tree

docs/cadt_rpc_api_v2.md

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2416,6 +2416,16 @@ Response
24162416

24172417
#### Batch upload projects from CSV
24182418

2419+
CSV batch upload is for **flat parent records only** — each row represents one project. Child records (locations, estimations, ratings, co-benefits, validations, verifications, etc.) must be created via their own REST endpoints or via the [XLSX multi-sheet import](#update-projects-from-xlsx-file) workflow. Unrecognized columns (including child entity names like `locations` or `estimations`) are silently stripped.
2420+
2421+
CSV headers may use either camelCase attribute names (e.g., `cadTrustProjectId`) or snake_case DB column names (e.g., `cad_trust_project_id`).
2422+
2423+
**Update behavior**: When a row includes a `cadTrustProjectId` that matches an existing project, the CSV row is **merged** with the existing DB record — fields present in the CSV overwrite the existing values; fields absent from the CSV retain their current values. This differs from the REST `PUT` endpoint, which requires all fields.
2424+
2425+
**Validation**: Rows are validated for foreign key existence (`cadTrustProgramId` must reference an existing program or a staged program record). Rows that fail validation are skipped and their errors are returned in the response. The CSV validation is intentionally more lenient than the REST API — fields like `projectLink` and `projectStatusDate` that are required in the REST schema are optional in CSV batch upload.
2426+
2427+
**Ownership**: UPDATE rows are verified to belong to the home organization. Attempting to update a project owned by another organization will produce an error for that row.
2428+
24192429
**Array Field Formatting**: For array fields like `projectType` and `projectSector`, the CSV can use any of these formats:
24202430
- **Single value**: `Solar` → becomes `["Solar"]`
24212431
- **JSON array**: `["Solar","Wind"]` → becomes `["Solar","Wind"]`
@@ -2434,14 +2444,25 @@ Request
24342444
curl --location --request POST 'http://localhost:31310/v2/project/batch' --form 'csv=@"./createProject.csv"'
24352445
```
24362446

2437-
Response
2447+
Response (success, no row-level errors)
24382448
```json
24392449
{
2440-
"message":"CSV processing complete, your records have been added to the staging table.",
2450+
"message": "CSV processing complete, your records have been added to the staging table.",
24412451
"success": true
24422452
}
24432453
```
24442454

2455+
Response (success with row-level errors — valid rows are still staged)
2456+
```json
2457+
{
2458+
"message": "CSV processing complete, your records have been added to the staging table.",
2459+
"success": true,
2460+
"errors": [
2461+
{ "row": 3, "error": "cadTrustProgramId 'invalid-id' does not exist" }
2462+
]
2463+
}
2464+
```
2465+
24452466
---
24462467

24472468
<a id="project-put-examples"></a>
@@ -3689,19 +3710,42 @@ Response
36893710

36903711
#### Batch upload units from CSV
36913712

3713+
CSV batch upload is for **flat parent records only** — each row represents one unit. Child records (unit labels, etc.) must be created via their own REST endpoints or via the [XLSX multi-sheet import](#update-units-from-xlsx-file) workflow. Unrecognized columns are silently stripped.
3714+
3715+
CSV headers may use either camelCase attribute names (e.g., `cadTrustUnitId`) or snake_case DB column names (e.g., `cad_trust_unit_id`).
3716+
3717+
**Update behavior**: When a row includes a `cadTrustUnitId` that matches an existing unit, the CSV row is **merged** with the existing DB record — fields present in the CSV overwrite the existing values; fields absent from the CSV retain their current values. This differs from the REST `PUT` endpoint, which requires all fields.
3718+
3719+
**Serial ID derivation**: If `unitSerialId` is not provided but `unitStartBlock` and `unitEndBlock` are, the serial ID is automatically derived as `<unitStartBlock>-<unitEndBlock>`.
3720+
3721+
**Validation**: Rows are validated for foreign key existence (`cadTrustIssuanceId` must reference an existing issuance or a staged issuance record). Rows that fail validation are skipped and their errors are returned in the response.
3722+
3723+
**Ownership**: UPDATE rows are verified to belong to the home organization. Attempting to update a unit owned by another organization will produce an error for that row.
3724+
36923725
Request
36933726
```shell
36943727
curl --location --request POST 'http://localhost:31310/v2/unit/batch' --form 'csv=@"./createUnit.csv"'
36953728
```
36963729

3697-
Response
3730+
Response (success, no row-level errors)
36983731
```json
36993732
{
3700-
"message":"CSV processing complete, your records have been added to the staging table.",
3733+
"message": "CSV processing complete, your records have been added to the staging table.",
37013734
"success": true
37023735
}
37033736
```
37043737

3738+
Response (success with row-level errors — valid rows are still staged)
3739+
```json
3740+
{
3741+
"message": "CSV processing complete, your records have been added to the staging table.",
3742+
"success": true,
3743+
"errors": [
3744+
{ "row": 4, "error": "cadTrustIssuanceId 'invalid-id' does not exist" }
3745+
]
3746+
}
3747+
```
3748+
37053749
---
37063750

37073751
<a id="unit-put-examples"></a>

src/controllers/v2/project-v2.controller.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -958,14 +958,19 @@ export const batchUpload = async (req, res) => {
958958
});
959959
}
960960

961-
// Use the model's batchUpload method
962-
await ProjectV2.batchUpload({ data: req.file.buffer });
961+
const result = await ProjectV2.batchUpload({ data: req.file.buffer });
963962

964-
res.json({
963+
const response = {
965964
message:
966965
'CSV processing complete, your records have been added to the staging table.',
967966
success: true,
968-
});
967+
};
968+
969+
if (result.errors && result.errors.length > 0) {
970+
response.errors = result.errors;
971+
}
972+
973+
res.json(response);
969974
} catch (error) {
970975
loggerV2.error('[v2]: Batch Upload Failed.', error);
971976
res.status(400).json({

src/controllers/v2/unit-v2.controller.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -938,14 +938,19 @@ export const batchUpload = async (req, res) => {
938938
});
939939
}
940940

941-
// Use the model's batchUpload method
942-
await UnitV2.batchUpload({ data: req.file.buffer });
941+
const result = await UnitV2.batchUpload({ data: req.file.buffer });
943942

944-
res.json({
943+
const response = {
945944
message:
946945
'CSV processing complete, your records have been added to the staging table.',
947946
success: true,
948-
});
947+
};
948+
949+
if (result.errors && result.errors.length > 0) {
950+
response.errors = result.errors;
951+
}
952+
953+
res.json(response);
949954
} catch (error) {
950955
loggerV2.error('[v2]: Batch Upload Failed.', error);
951956
res.status(400).json({

src/models/v2/project-v2.model.js

Lines changed: 106 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@ import {
1010
createXlsFromSequelizeResults,
1111
transformFullXslsToChangeList,
1212
} from '../../utils/xls.js';
13-
import { parseV2Xlsx, stageV2XlsRecords } from '../../utils/v2-xls.js';
13+
import {
14+
parseV2Xlsx,
15+
stageV2XlsRecords,
16+
normalizeCsvHeaders,
17+
toDbFieldNames,
18+
stripUnknownDbFields,
19+
} from '../../utils/v2-xls.js';
20+
import { assertRecordExistanceOrStaged } from '../../utils/v2-data-assertions.js';
21+
import { ProgramV2 } from './program-v2.model.js';
1422
import { getDeletedItems } from '../../utils/model-utils.js';
1523
import { keyValueToChangeList } from '../../utils/datalayer-utils.js';
1624
import { LocationV2 } from './location-v2.model.js';
@@ -353,92 +361,128 @@ class ProjectV2 extends Model {
353361
}
354362

355363
/**
356-
* Batch upload projects from CSV file
357-
* Parses CSV and creates staging records
364+
* Batch upload projects from CSV file.
365+
*
366+
* Collects all CSV rows synchronously, then processes each row sequentially
367+
* inside a transaction. For each row the pipeline is:
368+
* 1. Normalize snake_case headers → camelCase attribute names
369+
* 2. Parse array fields (projectType / projectSector)
370+
* 3. Determine INSERT vs UPDATE, merge with existing record on UPDATE
371+
* 4. Validate ownership and FK references
372+
* 5. Convert to DB field names, strip unknown keys
373+
* 6. Upsert into StagingV2
374+
*
358375
* @param {Object} csvFile - CSV file object with data buffer
359-
* @returns {Promise<void>}
360-
* @throws {Error} If CSV parsing or staging fails
376+
* @returns {Promise<Object>} Result with errors array (may be empty)
361377
*/
362378
static async batchUpload(csvFile) {
363379
const buffer = csvFile.data;
364380
const stream = Readable.from(buffer.toString('utf8'));
365381

366-
const recordsToCreate = [];
382+
const rawRows = [];
367383

368-
return new Promise((resolve, reject) => {
384+
await new Promise((resolve, reject) => {
369385
csv()
370386
.fromStream(stream)
371-
.subscribe(async (newRecord) => {
372-
let action = 'UPDATE';
387+
.subscribe((row) => {
388+
rawRows.push(row);
389+
})
390+
.on('error', (error) => reject(error))
391+
.on('done', () => resolve());
392+
});
393+
394+
if (rawRows.length === 0) {
395+
throw new Error('There were no valid records to parse');
396+
}
397+
398+
const homeOrg = await OrganizationsV2.getHomeOrg();
399+
if (!homeOrg) {
400+
throw new Error('No home organization found');
401+
}
402+
const orgUid = homeOrg.org_uid;
373403

374-
// Convert camelCase to snake_case for V2
375-
const projectId = newRecord.cadTrustProjectId || newRecord.cad_trust_project_id;
404+
const errors = [];
405+
406+
await sequelizeV2.transaction(async (transaction) => {
407+
for (let i = 0; i < rawRows.length; i++) {
408+
const rowNum = i + 2; // +2: 1-indexed + header row
409+
try {
410+
let row = normalizeCsvHeaders(rawRows[i], ProjectV2);
411+
412+
ProjectV2.parseProjectArrayFields(row);
413+
414+
const projectId = row.cadTrustProjectId;
415+
let action;
416+
let mergedRecord;
376417

377418
if (projectId) {
378-
// Check if project exists
379-
const possibleExistingRecord = await ProjectV2.findByPk(projectId);
380-
381-
if (!possibleExistingRecord) {
382-
reject(
383-
new Error(
384-
`Project with cadTrustProjectId ${projectId} does not exist`,
385-
),
386-
);
387-
return;
419+
const existing = await ProjectV2.findByPk(projectId);
420+
if (!existing) {
421+
errors.push({ row: rowNum, error: `Project with cadTrustProjectId ${projectId} does not exist` });
422+
continue;
388423
}
389-
390-
// Verify it belongs to home org (for updates)
391-
const homeOrg = await OrganizationsV2.getHomeOrg();
392-
if (!homeOrg) {
393-
reject(new Error('No home organization found'));
394-
return;
424+
if (existing.orgUid !== orgUid) {
425+
errors.push({ row: rowNum, error: `Cannot update project ${projectId}: belongs to a different organization` });
426+
continue;
395427
}
428+
action = 'UPDATE';
429+
mergedRecord = { ...existing.toJSON(), ...row };
396430
} else {
397-
// New project - generate UUID
398-
newRecord.cadTrustProjectId = uuidv4();
399-
const homeOrg = await OrganizationsV2.getHomeOrg();
400-
if (!homeOrg) {
401-
reject(new Error('No home organization found'));
402-
return;
403-
}
431+
row.cadTrustProjectId = uuidv4();
404432
action = 'INSERT';
433+
mergedRecord = { ...row };
405434
}
406435

407-
// Update project properties (handle child records)
408-
ProjectV2.updateProjectPropertiesV2(newRecord);
409-
410-
const stagedData = {
411-
uuid: newRecord.cadTrustProjectId,
412-
action: action,
413-
table: 'project',
414-
data: JSON.stringify([newRecord]),
415-
};
416-
417-
recordsToCreate.push(stagedData);
418-
})
419-
.on('error', (error) => {
420-
reject(error);
421-
})
422-
.on('done', async () => {
423-
if (recordsToCreate.length) {
424-
await StagingV2.bulkCreate(recordsToCreate, {
425-
logging: (msg) => loggerV2.info(msg),
426-
});
427-
428-
resolve();
429-
} else {
430-
reject(new Error('There were no valid records to parse'));
436+
// FK existence check for cadTrustProgramId
437+
if (mergedRecord.cadTrustProgramId) {
438+
try {
439+
await assertRecordExistanceOrStaged(
440+
ProgramV2,
441+
mergedRecord.cadTrustProgramId,
442+
`cadTrustProgramId '${mergedRecord.cadTrustProgramId}' does not exist`,
443+
);
444+
} catch (err) {
445+
errors.push({ row: rowNum, error: err.message });
446+
continue;
447+
}
431448
}
432-
});
449+
450+
// Remove timestamps (managed by Sequelize)
451+
delete mergedRecord.createdAt;
452+
delete mergedRecord.updatedAt;
453+
delete mergedRecord.created_at;
454+
delete mergedRecord.updated_at;
455+
456+
const dbRecord = toDbFieldNames(mergedRecord, ProjectV2);
457+
const cleaned = stripUnknownDbFields(dbRecord, ProjectV2, loggerV2);
458+
459+
cleaned.org_uid = orgUid;
460+
461+
await StagingV2.upsert(
462+
{
463+
uuid: mergedRecord.cadTrustProjectId,
464+
action,
465+
table: 'project',
466+
data: JSON.stringify([cleaned]),
467+
},
468+
{ transaction },
469+
);
470+
} catch (err) {
471+
errors.push({ row: rowNum, error: err.message });
472+
}
473+
}
433474
});
475+
476+
return { errors };
434477
}
435478

436479
/**
437-
* Helper to update project properties from CSV
438-
* Handles conversion of string fields to arrays for batch upload
480+
* Parse projectType and projectSector from CSV string representations
481+
* into arrays. These are JSON array columns on the project table, not
482+
* child entities — they belong in the flat CSV row.
439483
* @private
440484
*/
441-
static updateProjectPropertiesV2(project) {
485+
static parseProjectArrayFields(project) {
442486
if (typeof project !== 'object') return;
443487

444488
const arrayFields = ['projectType', 'projectSector'];
@@ -469,25 +513,6 @@ class ProjectV2 extends Model {
469513
}
470514
}
471515
});
472-
473-
const childRecordKeys = ['locations', 'estimations', 'ratings', 'coBenefits'];
474-
childRecordKeys.forEach((key) => {
475-
if (project[key] && typeof project[key] === 'string') {
476-
try {
477-
project[key] = JSON.parse(project[key]);
478-
} catch {
479-
// If not JSON, leave as is
480-
}
481-
}
482-
483-
if (Array.isArray(project[key])) {
484-
project[key].forEach((item) => {
485-
if (!item.cadTrustProjectId) {
486-
item.cadTrustProjectId = project.cadTrustProjectId;
487-
}
488-
});
489-
}
490-
});
491516
}
492517

493518
/**

0 commit comments

Comments
 (0)