Skip to content

Commit ce3f505

Browse files
authored
[Filebeat][httpjson] Add split_events_by config setting (elastic#19246)
1 parent e523380 commit ce3f505

5 files changed

Lines changed: 242 additions & 13 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
5252
- Adds check on `<no value>` config option value for the azure input `resource_manager_endpoint`. {pull}18890[18890]
5353
- Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953]
5454
- Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892]
55+
- Adds `split_events_by` option to httpjson input. {pull}19246[19246]
5556

5657
*Heartbeat*
5758

x-pack/filebeat/docs/inputs/input-httpjson.asciidoc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,73 @@ The config needs to specify `events` as the `json_objects_array` value.
178178
json_objects_array: events
179179
----
180180

181+
[float]
182+
==== `split_events_by`
183+
184+
If the response body contains a JSON object containing an array then this option
185+
specifies the key containing that array. Each object in that array will generate
186+
an event, but will maintain the common fields of the document as well.
187+
188+
["source","json",subs="attributes"]
189+
----
190+
{
191+
"time": "2020-06-02 23:22:32 UTC",
192+
"user": "Bob",
193+
"events": [
194+
{
195+
"timestamp": "2020-05-02 11:10:03 UTC",
196+
"event": {
197+
"category": "authorization"
198+
}
199+
},
200+
{
201+
"timestamp": "2020-05-05 13:03:11 UTC",
202+
"event": {
203+
"category": "authorization"
204+
}
205+
}
206+
]
207+
}
208+
----
209+
210+
The config needs to specify `events` as the `split_events_by` value.
211+
212+
["source","yaml",subs="attributes"]
213+
----
214+
- type: httpjson
215+
split_events_by: events
216+
----
217+
218+
And will output the following events:
219+
220+
["source","json",subs="attributes"]
221+
----
222+
[
223+
{
224+
"time": "2020-06-02 23:22:32 UTC",
225+
"user": "Bob",
226+
"events": {
227+
"timestamp": "2020-05-02 11:10:03 UTC",
228+
"event": {
229+
"category": "authorization"
230+
}
231+
}
232+
},
233+
{
234+
"time": "2020-06-02 23:22:32 UTC",
235+
"user": "Bob",
236+
"events": {
237+
"timestamp": "2020-05-05 13:03:11 UTC",
238+
"event": {
239+
"category": "authorization"
240+
}
241+
}
242+
}
243+
]
244+
----
245+
246+
It can be used in combination with `json_objects_array`, which will look for the field inside each element.
247+
181248
[float]
182249
==== `no_http_body`
183250

x-pack/filebeat/input/httpjson/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type config struct {
2626
HTTPRequestBody common.MapStr `config:"http_request_body"`
2727
Interval time.Duration `config:"interval"`
2828
JSONObjects string `config:"json_objects_array"`
29+
SplitEventsBy string `config:"split_events_by"`
2930
NoHTTPBody bool `config:"no_http_body"`
3031
Pagination *Pagination `config:"pagination"`
3132
RateLimit *RateLimit `config:"rate_limit"`

x-pack/filebeat/input/httpjson/httpjson_test.go

Lines changed: 126 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121

2222
"golang.org/x/sync/errgroup"
2323

24+
"github.com/stretchr/testify/assert"
25+
2426
"github.com/elastic/beats/v7/filebeat/channel"
2527
"github.com/elastic/beats/v7/filebeat/input"
2628
"github.com/elastic/beats/v7/libbeat/beat"
@@ -38,7 +40,6 @@ const (
3840

3941
var (
4042
once sync.Once
41-
url string
4243
)
4344

4445
func testSetup(t *testing.T) {
@@ -91,6 +92,10 @@ func createServer(newServer func(handler http.Handler) *httptest.Server) *httpte
9192
"embedded": map[string]string{
9293
"hello": "world",
9394
},
95+
"list": []map[string]interface{}{
96+
{"foo": "bar"},
97+
{"hello": "world"},
98+
},
9499
}
95100
b, _ := json.Marshal(message)
96101
w.Header().Set("Content-Type", "application/json")
@@ -157,8 +162,24 @@ func createCustomServerWithArrayResponse(newServer func(handler http.Handler) *h
157162
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
158163
w.Header().Set("Content-Type", "application/json")
159164
message := map[string]interface{}{
160-
"hello": []map[string]string{
161-
{"foo": "bar"},
165+
"hello": []map[string]interface{}{
166+
{
167+
"foo": "bar",
168+
"list": []map[string]interface{}{
169+
{"foo": "bar"},
170+
{"hello": "world"},
171+
},
172+
},
173+
{
174+
"foo": "bar",
175+
"list": []map[string]interface{}{
176+
{"foo": "bar"},
177+
},
178+
},
179+
{
180+
"bar": "foo",
181+
"list": []map[string]interface{}{},
182+
},
162183
{"bar": "foo"},
163184
},
164185
}
@@ -604,3 +625,105 @@ func TestOAuth2(t *testing.T) {
604625
}
605626
})
606627
}
628+
629+
func TestSplitResponseWithKey(t *testing.T) {
630+
m := map[string]interface{}{
631+
"http_method": "GET",
632+
"split_events_by": "list",
633+
"interval": 0,
634+
}
635+
ts := createTestServer(HTTPTestServer)
636+
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
637+
group, _ := errgroup.WithContext(context.Background())
638+
group.Go(input.run)
639+
640+
events, ok := out.waitForEvents(2)
641+
if !ok {
642+
t.Fatalf("Expected 2 events, but got %d.", len(events))
643+
}
644+
input.Stop()
645+
646+
if err := group.Wait(); err != nil {
647+
t.Fatal(err)
648+
}
649+
})
650+
}
651+
652+
func TestSplitResponseWithoutKey(t *testing.T) {
653+
m := map[string]interface{}{
654+
"http_method": "GET",
655+
"split_events_by": "not_found",
656+
"interval": 0,
657+
}
658+
ts := createTestServer(HTTPTestServer)
659+
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
660+
group, _ := errgroup.WithContext(context.Background())
661+
group.Go(input.run)
662+
663+
events, ok := out.waitForEvents(1)
664+
if !ok {
665+
t.Fatalf("Expected 1 events, but got %d.", len(events))
666+
}
667+
input.Stop()
668+
669+
if err := group.Wait(); err != nil {
670+
t.Fatal(err)
671+
}
672+
})
673+
}
674+
675+
func TestArrayWithSplitResponse(t *testing.T) {
676+
m := map[string]interface{}{
677+
"http_method": "GET",
678+
"json_objects_array": "hello",
679+
"split_events_by": "list",
680+
"interval": 0,
681+
}
682+
683+
expectedFields := []string{
684+
`{
685+
"foo": "bar",
686+
"list": {
687+
"foo": "bar"
688+
}
689+
}`,
690+
`{
691+
"foo": "bar",
692+
"list": {
693+
"hello": "world"
694+
}
695+
}`,
696+
`{
697+
"foo": "bar",
698+
"list": {
699+
"foo": "bar"
700+
}
701+
}`,
702+
`{
703+
"bar": "foo",
704+
"list": []
705+
}`,
706+
`{"bar": "foo"}`,
707+
}
708+
709+
ts := createTestServer(ArrayResponseServer)
710+
runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
711+
group, _ := errgroup.WithContext(context.Background())
712+
group.Go(input.run)
713+
714+
events, ok := out.waitForEvents(5)
715+
if !ok {
716+
t.Fatalf("Expected 5 events, but got %d.", len(events))
717+
}
718+
input.Stop()
719+
720+
if err := group.Wait(); err != nil {
721+
t.Fatal(err)
722+
}
723+
724+
for i, e := range events {
725+
message, _ := e.GetValue("message")
726+
assert.JSONEq(t, expectedFields[i], message.(string))
727+
}
728+
})
729+
}

x-pack/filebeat/input/httpjson/input.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -181,24 +181,61 @@ func (in *HttpjsonInput) createHTTPRequest(ctx context.Context, ri *RequestInfo)
181181

182182
// processEventArray publishes an event for each object contained in the array. It returns the last object in the array and an error if any.
183183
func (in *HttpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) {
184-
var m map[string]interface{}
184+
var last map[string]interface{}
185185
for _, t := range events {
186186
switch v := t.(type) {
187187
case map[string]interface{}:
188-
m = v
189-
d, err := json.Marshal(v)
190-
if err != nil {
191-
return nil, errors.Wrapf(err, "failed to marshal %+v", v)
192-
}
193-
ok := in.outlet.OnEvent(makeEvent(string(d)))
194-
if !ok {
195-
return nil, errors.New("function OnEvent returned false")
188+
for _, e := range in.splitEvent(v) {
189+
last = e
190+
d, err := json.Marshal(e)
191+
if err != nil {
192+
return nil, errors.Wrapf(err, "failed to marshal %+v", e)
193+
}
194+
ok := in.outlet.OnEvent(makeEvent(string(d)))
195+
if !ok {
196+
return nil, errors.New("function OnEvent returned false")
197+
}
196198
}
197199
default:
198200
return nil, errors.Errorf("expected only JSON objects in the array but got a %T", v)
199201
}
200202
}
201-
return m, nil
203+
return last, nil
204+
}
205+
206+
func (in *HttpjsonInput) splitEvent(event map[string]interface{}) []map[string]interface{} {
207+
m := common.MapStr(event)
208+
209+
hasSplitKey, _ := m.HasKey(in.config.SplitEventsBy)
210+
if in.config.SplitEventsBy == "" || !hasSplitKey {
211+
return []map[string]interface{}{event}
212+
}
213+
214+
splitOnIfc, _ := m.GetValue(in.config.SplitEventsBy)
215+
splitOn, ok := splitOnIfc.([]interface{})
216+
// if not an array or is empty, we do nothing
217+
if !ok || len(splitOn) == 0 {
218+
return []map[string]interface{}{event}
219+
}
220+
221+
var events []map[string]interface{}
222+
for _, split := range splitOn {
223+
s, ok := split.(map[string]interface{})
224+
// if not an object, we do nothing
225+
if !ok {
226+
return []map[string]interface{}{event}
227+
}
228+
229+
mm := m.Clone()
230+
_, err := mm.Put(in.config.SplitEventsBy, s)
231+
if err != nil {
232+
return []map[string]interface{}{event}
233+
}
234+
235+
events = append(events, mm)
236+
}
237+
238+
return events
202239
}
203240

204241
// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response

0 commit comments

Comments
 (0)