@@ -67,7 +67,7 @@ func (r *TransformRegister) Transform(cfg *proto.UnitExpectedConfig, agentInfo *
6767// CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values
6868// that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/
6969// This also performs the basic task of inserting module-level add_field processors into the inputs/modules.
70- func CreateInputsFromStreams (raw * proto.UnitExpectedConfig , inputType string , agentInfo * client.AgentInfo ) ([]map [string ]interface {}, error ) {
70+ func CreateInputsFromStreams (raw * proto.UnitExpectedConfig , inputType string , agentInfo * client.AgentInfo , defaultProcessors ... mapstr. M ) ([]map [string ]interface {}, error ) {
7171 // should this be an error?
7272 if raw .GetStreams () == nil {
7373 return []map [string ]interface {}{}, nil
@@ -94,7 +94,7 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, ag
9494 }
9595
9696 // 3. stream processors
97- streamSource , err = injectStreamProcessors (raw , inputType , stream , streamSource )
97+ streamSource , err = injectStreamProcessors (raw , inputType , stream , streamSource , defaultProcessors )
9898 if err != nil {
9999 return nil , fmt .Errorf ("Error injecting stream processors: %w" , err )
100100 }
@@ -181,13 +181,20 @@ func injectIndexStream(dataStreamType string, expected *proto.UnitExpectedConfig
181181
182182//injectStreamProcessors is an emulation of the InjectStreamProcessorRule AST code
183183// this adds a variety of processors for metadata related to the dataset and input config.
184- func injectStreamProcessors (expected * proto.UnitExpectedConfig , dataStreamType string , streamExpected * proto.Stream , stream map [string ]interface {}) (map [string ]interface {}, error ) {
184+ func injectStreamProcessors (expected * proto.UnitExpectedConfig , dataStreamType string , streamExpected * proto.Stream , stream map [string ]interface {}, defaultProcessors []mapstr. M ) (map [string ]interface {}, error ) {
185185 //1. start by "repairing" config to add any missing fields
186186 // logic from datastreamTypeFromInputNode
187187 procInputType , procInputDataset , procInputNamespace := metadataFromDatastreamValues (dataStreamType , expected , streamExpected )
188188
189189 var processors = []interface {}{}
190190
191+ for _ , p := range defaultProcessors {
192+ if len (p ) == 0 {
193+ continue
194+ }
195+ processors = append (processors , p )
196+ }
197+
191198 // In V1, AST injects input_id at the input level and not the stream level,
192199 // for reasons I can't understand, as it just ends up shuffling it around
193200 // to individual metricsets anyway, at least on metricbeat
0 commit comments