Skip to content

Commit 6ea0d97

Browse files
JoanFMGuyAv46
authored andcommitted
fix: fix concurrency issue on Reducer (#7214)
* fix: fix concurrency issue on Reducer * test: fix test distagg * fix: make it static array * fix: fix proper static * Update src/module-init/module-init.c * Update src/aggregate/reducer.c Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> (cherry picked from commit 7184778)
1 parent e1b6938 commit 6ea0d97

2 files changed

Lines changed: 20 additions & 38 deletions

File tree

src/aggregate/reducer.c

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,52 +11,36 @@ typedef struct {
1111
ReducerFactory fn;
1212
} FuncEntry;
1313

14-
static FuncEntry *globalRegistry = NULL;
14+
// Static registry of all builtin reducers - no runtime registration needed
15+
static const FuncEntry globalRegistry[] = {
16+
{"COUNT", RDCRCount_New},
17+
{"SUM", RDCRSum_New},
18+
{"TOLIST", RDCRToList_New},
19+
{"MIN", RDCRMin_New},
20+
{"MAX", RDCRMax_New},
21+
{"AVG", RDCRAvg_New},
22+
{"COUNT_DISTINCT", RDCRCountDistinct_New},
23+
{"COUNT_DISTINCTISH", RDCRCountDistinctish_New},
24+
{"QUANTILE", RDCRQuantile_New},
25+
{"STDDEV", RDCRStdDev_New},
26+
{"FIRST_VALUE", RDCRFirstValue_New},
27+
{"RANDOM_SAMPLE", RDCRRandomSample_New},
28+
{"HLL", RDCRHLL_New},
29+
{"HLL_SUM", RDCRHLLSum_New}
30+
};
1531

16-
void RDCR_RegisterFactory(const char *name, ReducerFactory factory) {
17-
FuncEntry ent = {.name = name, .fn = factory};
18-
FuncEntry *tail = array_ensure_tail(&globalRegistry, FuncEntry);
19-
*tail = ent;
20-
}
21-
22-
static int isBuiltinsRegistered = 0;
32+
#define REGISTRY_SIZE 14
33+
static_assert(sizeof(globalRegistry) == sizeof(FuncEntry) * REGISTRY_SIZE);
2334

2435
ReducerFactory RDCR_GetFactory(const char *name) {
25-
if (!isBuiltinsRegistered) {
26-
isBuiltinsRegistered = 1;
27-
RDCR_RegisterBuiltins();
28-
}
29-
size_t n = array_len(globalRegistry);
30-
for (size_t ii = 0; ii < n; ++ii) {
36+
for (size_t ii = 0; ii < REGISTRY_SIZE; ++ii) {
3137
if (!strcasecmp(globalRegistry[ii].name, name)) {
3238
return globalRegistry[ii].fn;
3339
}
3440
}
3541
return NULL;
3642
}
3743

38-
#define RDCR_XBUILTIN(X) \
39-
X(RDCRCount_New, "COUNT") \
40-
X(RDCRSum_New, "SUM") \
41-
X(RDCRToList_New, "TOLIST") \
42-
X(RDCRMin_New, "MIN") \
43-
X(RDCRMax_New, "MAX") \
44-
X(RDCRAvg_New, "AVG") \
45-
X(RDCRCountDistinct_New, "COUNT_DISTINCT") \
46-
X(RDCRCountDistinctish_New, "COUNT_DISTINCTISH") \
47-
X(RDCRQuantile_New, "QUANTILE") \
48-
X(RDCRStdDev_New, "STDDEV") \
49-
X(RDCRFirstValue_New, "FIRST_VALUE") \
50-
X(RDCRRandomSample_New, "RANDOM_SAMPLE") \
51-
X(RDCRHLL_New, "HLL") \
52-
X(RDCRHLLSum_New, "HLL_SUM")
53-
54-
void RDCR_RegisterBuiltins(void) {
55-
#define X(fn, n) RDCR_RegisterFactory(n, fn);
56-
RDCR_XBUILTIN(X);
57-
#undef X
58-
}
59-
6044
int ReducerOpts_GetKey(const ReducerOptions *options, const RLookupKey **out) {
6145
ArgsCursor *ac = options->args;
6246
const char *s;

src/aggregate/reducer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,6 @@ Reducer *RDCRHLLSum_New(const ReducerOptions *);
163163

164164
typedef Reducer *(*ReducerFactory)(const ReducerOptions *);
165165
ReducerFactory RDCR_GetFactory(const char *name);
166-
void RDCR_RegisterFactory(const char *name, ReducerFactory factory);
167-
void RDCR_RegisterBuiltins(void);
168166

169167
#ifdef __cplusplus
170168
}

0 commit comments

Comments
 (0)