Skip to content

Commit 00faa15

Browse files
jalvzaxw
authored andcommitted
Add service name to dataset field (#4674)
# Conflicts: # changelogs/head.asciidoc
1 parent 4175425 commit 00faa15

44 files changed

Lines changed: 109 additions & 102 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datastreams/servicename.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import "strings"
2525
// Concretely, this function will lowercase the string and replace any
2626
// reserved characters with "_".
2727
//
28-
// TODO: use when Fleet supports variables in data streams (see #4492)
2928
func NormalizeServiceName(s string) string {
3029
s = strings.ToLower(s)
3130
s = strings.Map(replaceReservedRune, s)

model/error.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve
121121
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
122122
// in Kibana.
123123
fields[datastreams.TypeField] = datastreams.LogsType
124-
fields[datastreams.DatasetField] = ErrorsDataset
124+
dataset := fmt.Sprintf("%s.%s", ErrorsDataset, datastreams.NormalizeServiceName(e.Metadata.Service.Name))
125+
fields[datastreams.DatasetField] = dataset
125126
}
126127

127128
// first set the generic metadata (order is relevant)

model/error_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func TestEvents(t *testing.T) {
308308
Transformable: &Error{Timestamp: timestamp, Metadata: md},
309309
Output: common.MapStr{
310310
"data_stream.type": "logs",
311-
"data_stream.dataset": "apm.error",
311+
"data_stream.dataset": "apm.error.myservice",
312312
"agent": common.MapStr{"name": "go", "version": "1.0"},
313313
"service": common.MapStr{"name": "myservice", "version": "1.0"},
314314
"error": common.MapStr{
@@ -323,7 +323,7 @@ func TestEvents(t *testing.T) {
323323
Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse},
324324
Output: common.MapStr{
325325
"data_stream.type": "logs",
326-
"data_stream.dataset": "apm.error",
326+
"data_stream.dataset": "apm.error.myservice",
327327
"transaction": common.MapStr{"sampled": false},
328328
"agent": common.MapStr{"name": "go", "version": "1.0"},
329329
"service": common.MapStr{"name": "myservice", "version": "1.0"},
@@ -339,7 +339,7 @@ func TestEvents(t *testing.T) {
339339
Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionType: &transactionType},
340340
Output: common.MapStr{
341341
"data_stream.type": "logs",
342-
"data_stream.dataset": "apm.error",
342+
"data_stream.dataset": "apm.error.myservice",
343343
"transaction": common.MapStr{"type": "request"},
344344
"error": common.MapStr{
345345
"grouping_key": "d41d8cd98f00b204e9800998ecf8427e",
@@ -370,7 +370,7 @@ func TestEvents(t *testing.T) {
370370

371371
Output: common.MapStr{
372372
"data_stream.type": "logs",
373-
"data_stream.dataset": "apm.error",
373+
"data_stream.dataset": "apm.error.myservice",
374374
"labels": common.MapStr{"key": true, "label": 101},
375375
"service": common.MapStr{"name": "myservice", "version": "1.0"},
376376
"agent": common.MapStr{"name": "go", "version": "1.0"},

model/metricset.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package model
1919

2020
import (
2121
"context"
22+
"fmt"
2223
"time"
2324

2425
"github.com/elastic/beats/v7/libbeat/beat"
@@ -199,15 +200,16 @@ func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []bea
199200
fields["processor"] = metricsetProcessorEntry
200201

201202
if cfg.DataStreams {
203+
dataset := AppMetricsDataset
202204
// Metrics are stored in "metrics" data streams.
203205
if isInternal {
204206
// Metrics that include well-defined transaction/span fields
205207
// (i.e. breakdown metrics, transaction and span metrics) will
206208
// be stored separately from application and runtime metrics.
207-
fields[datastreams.DatasetField] = InternalMetricsDataset
208-
} else {
209-
fields[datastreams.DatasetField] = AppMetricsDataset
209+
dataset = InternalMetricsDataset
210210
}
211+
dataset += fmt.Sprintf(".%s", datastreams.NormalizeServiceName(me.Metadata.Service.Name))
212+
fields[datastreams.DatasetField] = dataset
211213
fields[datastreams.TypeField] = datastreams.MetricsType
212214
}
213215

model/metricset_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestTransform(t *testing.T) {
6363
Output: []common.MapStr{
6464
{
6565
"data_stream.type": "metrics",
66-
"data_stream.dataset": "apm.app",
66+
"data_stream.dataset": "apm.app.myservice",
6767
"processor": common.MapStr{"event": "metric", "name": "metric"},
6868
"service": common.MapStr{
6969
"name": "myservice",
@@ -91,7 +91,7 @@ func TestTransform(t *testing.T) {
9191
Output: []common.MapStr{
9292
{
9393
"data_stream.type": "metrics",
94-
"data_stream.dataset": "apm.app",
94+
"data_stream.dataset": "apm.app.myservice",
9595
"processor": common.MapStr{"event": "metric", "name": "metric"},
9696
"service": common.MapStr{"name": "myservice"},
9797
"labels": common.MapStr{"a_b": "a.b.value"},
@@ -116,7 +116,7 @@ func TestTransform(t *testing.T) {
116116
Output: []common.MapStr{
117117
{
118118
"data_stream.type": "metrics",
119-
"data_stream.dataset": "apm.internal",
119+
"data_stream.dataset": "apm.internal.myservice",
120120
"processor": common.MapStr{"event": "metric", "name": "metric"},
121121
"service": common.MapStr{"name": "myservice"},
122122
"transaction": common.MapStr{"type": trType, "name": trName},
@@ -154,7 +154,7 @@ func TestTransform(t *testing.T) {
154154
Output: []common.MapStr{
155155
{
156156
"data_stream.type": "metrics",
157-
"data_stream.dataset": "apm.internal",
157+
"data_stream.dataset": "apm.internal.myservice",
158158
"processor": common.MapStr{"event": "metric", "name": "metric"},
159159
"service": common.MapStr{"name": "myservice"},
160160
"event": common.MapStr{"outcome": eventOutcome},
@@ -197,7 +197,7 @@ func TestTransform(t *testing.T) {
197197
Output: []common.MapStr{
198198
{
199199
"data_stream.type": "metrics",
200-
"data_stream.dataset": "apm.internal",
200+
"data_stream.dataset": "apm.internal.myservice",
201201
"processor": common.MapStr{"event": "metric", "name": "metric"},
202202
"service": common.MapStr{"name": "myservice"},
203203
"span": common.MapStr{"type": spType, "subtype": spSubtype,

model/profile.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []b
128128
}
129129
if cfg.DataStreams {
130130
event.Fields[datastreams.TypeField] = datastreams.MetricsType
131-
event.Fields[datastreams.DatasetField] = ProfilesDataset
131+
dataset := fmt.Sprintf("%s.%s", ProfilesDataset, datastreams.NormalizeServiceName(pp.Metadata.Service.Name))
132+
event.Fields[datastreams.DatasetField] = dataset
132133
}
133134
var profileLabels common.MapStr
134135
if len(sample.Label) > 0 {

model/profile_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func TestPprofProfileTransform(t *testing.T) {
9898
Timestamp: timestamp,
9999
Fields: common.MapStr{
100100
"data_stream.type": "metrics",
101-
"data_stream.dataset": "apm.profiling",
101+
"data_stream.dataset": "apm.profiling.myservice",
102102
"processor": common.MapStr{"event": "profile", "name": "profile"},
103103
"service": common.MapStr{
104104
"name": "myService",

model/span.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package model
1919

2020
import (
2121
"context"
22+
"fmt"
2223
"net"
2324
"time"
2425

@@ -198,7 +199,8 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even
198199
if cfg.DataStreams {
199200
// Spans are stored in a "traces" data stream along with transactions.
200201
fields[datastreams.TypeField] = datastreams.TracesType
201-
fields[datastreams.DatasetField] = TracesDataset
202+
dataset := fmt.Sprintf("%s.%s", TracesDataset, datastreams.NormalizeServiceName(e.Metadata.Service.Name))
203+
fields[datastreams.DatasetField] = dataset
202204
}
203205

204206
// first set the generic metadata

model/span_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestSpanTransform(t *testing.T) {
5959
Span: Span{Timestamp: timestamp, Metadata: metadata},
6060
Output: common.MapStr{
6161
"data_stream.type": "traces",
62-
"data_stream.dataset": "apm",
62+
"data_stream.dataset": "apm.myservice",
6363
"processor": common.MapStr{"event": "span", "name": "transaction"},
6464
"service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion},
6565
"span": common.MapStr{
@@ -77,7 +77,7 @@ func TestSpanTransform(t *testing.T) {
7777
Span: Span{Timestamp: timestamp, Metadata: metadata, Outcome: "success"},
7878
Output: common.MapStr{
7979
"data_stream.type": "traces",
80-
"data_stream.dataset": "apm",
80+
"data_stream.dataset": "apm.myservice",
8181
"processor": common.MapStr{"event": "span", "name": "transaction"},
8282
"service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion},
8383
"span": common.MapStr{
@@ -126,7 +126,7 @@ func TestSpanTransform(t *testing.T) {
126126
},
127127
Output: common.MapStr{
128128
"data_stream.type": "traces",
129-
"data_stream.dataset": "apm",
129+
"data_stream.dataset": "apm.myservice",
130130
"span": common.MapStr{
131131
"id": hexID,
132132
"duration": common.MapStr{"us": 1200},

model/transaction.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package model
1919

2020
import (
2121
"context"
22+
"fmt"
2223
"time"
2324

2425
"github.com/elastic/beats/v7/libbeat/beat"
@@ -121,7 +122,8 @@ func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat
121122
if cfg.DataStreams {
122123
// Transactions are stored in a "traces" data stream along with spans.
123124
fields[datastreams.TypeField] = datastreams.TracesType
124-
fields[datastreams.DatasetField] = TracesDataset
125+
dataset := fmt.Sprintf("%s.%s", TracesDataset, datastreams.NormalizeServiceName(e.Metadata.Service.Name))
126+
fields[datastreams.DatasetField] = dataset
125127
}
126128

127129
// first set generic metadata (order is relevant)

0 commit comments

Comments
 (0)