Skip to content

Commit 7184778

Browse files
JoanFMGuyAv46
andauthored
fix: fix concurrency issue on Reducer (#7214)
* fix: fix concurrency issue on Reducer * test: fix test distagg * fix: make it static array * fix: fix proper static * Update src/module-init/module-init.c * Update src/aggregate/reducer.c Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
1 parent 6f30cc3 commit 7184778

2 files changed

Lines changed: 20 additions & 38 deletions

File tree

src/aggregate/reducer.c

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,36 @@ typedef struct {
1414
ReducerFactory fn;
1515
} FuncEntry;
1616

17-
static FuncEntry *globalRegistry = NULL;
17+
// Static registry of all builtin reducers - no runtime registration needed
18+
static const FuncEntry globalRegistry[] = {
19+
{"COUNT", RDCRCount_New},
20+
{"SUM", RDCRSum_New},
21+
{"TOLIST", RDCRToList_New},
22+
{"MIN", RDCRMin_New},
23+
{"MAX", RDCRMax_New},
24+
{"AVG", RDCRAvg_New},
25+
{"COUNT_DISTINCT", RDCRCountDistinct_New},
26+
{"COUNT_DISTINCTISH", RDCRCountDistinctish_New},
27+
{"QUANTILE", RDCRQuantile_New},
28+
{"STDDEV", RDCRStdDev_New},
29+
{"FIRST_VALUE", RDCRFirstValue_New},
30+
{"RANDOM_SAMPLE", RDCRRandomSample_New},
31+
{"HLL", RDCRHLL_New},
32+
{"HLL_SUM", RDCRHLLSum_New}
33+
};
1834

19-
void RDCR_RegisterFactory(const char *name, ReducerFactory factory) {
20-
FuncEntry ent = {.name = name, .fn = factory};
21-
FuncEntry *tail = array_ensure_tail(&globalRegistry, FuncEntry);
22-
*tail = ent;
23-
}
24-
25-
static int isBuiltinsRegistered = 0;
35+
#define REGISTRY_SIZE 14
36+
static_assert(sizeof(globalRegistry) == sizeof(FuncEntry) * REGISTRY_SIZE);
2637

2738
ReducerFactory RDCR_GetFactory(const char *name) {
28-
if (!isBuiltinsRegistered) {
29-
isBuiltinsRegistered = 1;
30-
RDCR_RegisterBuiltins();
31-
}
32-
size_t n = array_len(globalRegistry);
33-
for (size_t ii = 0; ii < n; ++ii) {
39+
for (size_t ii = 0; ii < REGISTRY_SIZE; ++ii) {
3440
if (!strcasecmp(globalRegistry[ii].name, name)) {
3541
return globalRegistry[ii].fn;
3642
}
3743
}
3844
return NULL;
3945
}
4046

41-
#define RDCR_XBUILTIN(X) \
42-
X(RDCRCount_New, "COUNT") \
43-
X(RDCRSum_New, "SUM") \
44-
X(RDCRToList_New, "TOLIST") \
45-
X(RDCRMin_New, "MIN") \
46-
X(RDCRMax_New, "MAX") \
47-
X(RDCRAvg_New, "AVG") \
48-
X(RDCRCountDistinct_New, "COUNT_DISTINCT") \
49-
X(RDCRCountDistinctish_New, "COUNT_DISTINCTISH") \
50-
X(RDCRQuantile_New, "QUANTILE") \
51-
X(RDCRStdDev_New, "STDDEV") \
52-
X(RDCRFirstValue_New, "FIRST_VALUE") \
53-
X(RDCRRandomSample_New, "RANDOM_SAMPLE") \
54-
X(RDCRHLL_New, "HLL") \
55-
X(RDCRHLLSum_New, "HLL_SUM")
56-
57-
void RDCR_RegisterBuiltins(void) {
58-
#define X(fn, n) RDCR_RegisterFactory(n, fn);
59-
RDCR_XBUILTIN(X);
60-
#undef X
61-
}
62-
6347
int ReducerOpts_GetKey(const ReducerOptions *options, const RLookupKey **out) {
6448
ArgsCursor *ac = options->args;
6549
const char *s;

src/aggregate/reducer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,6 @@ Reducer *RDCRHLLSum_New(const ReducerOptions *);
172172

173173
typedef Reducer *(*ReducerFactory)(const ReducerOptions *);
174174
ReducerFactory RDCR_GetFactory(const char *name);
175-
void RDCR_RegisterFactory(const char *name, ReducerFactory factory);
176-
void RDCR_RegisterBuiltins(void);
177175

178176
#ifdef __cplusplus
179177
}

0 commit comments

Comments
 (0)