Skip to content

Commit 72ac737

Browse files
authored
fix: Use logger instead of stdout (#20007)
#### Summary We should be using the logger instead of `fmt.Println`. Also don't think an array should be an error so made it an `Info`. On AWS if you run with `tables: ["*"]` you get quite a lot of arrays: ![image](https://github.com/user-attachments/assets/9648ad09-f7ae-4017-ae42-c324b37a3a7e)
1 parent ac0d254 commit 72ac737

5 files changed

Lines changed: 28 additions & 18 deletions

File tree

plugins/transformer/jsonflattener/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func New(_ context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClie
3737
return nil, err
3838
}
3939

40-
tf, err := transformers.NewFromSpec(c.spec)
40+
tf, err := transformers.NewFromSpec(logger, c.spec)
4141
if err != nil {
4242
return nil, err
4343
}

plugins/transformer/jsonflattener/client/recordupdater/record_updater.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,23 @@ import (
1010
"github.com/cloudquery/cloudquery/plugins/transformer/jsonflattener/client/util"
1111
"github.com/cloudquery/plugin-sdk/v4/schema"
1212
"github.com/cloudquery/plugin-sdk/v4/types"
13+
"github.com/rs/zerolog"
1314
)
1415

1516
// RecordUpdater takes an `arrow.Record` and knows how to make simple subsequent changes to it.
1617
// It doesn't know which table it belongs to or if the changes make sense.
1718
type RecordUpdater struct {
19+
logger zerolog.Logger
1820
record arrow.Record
1921
schemaUpdater *schemaupdater.SchemaUpdater
2022
tableName string
2123
}
2224

23-
func New(record arrow.Record) *RecordUpdater {
25+
func New(logger zerolog.Logger, record arrow.Record) *RecordUpdater {
2426
tableName, _ := record.Schema().Metadata().GetValue(schema.MetadataTableName)
2527

2628
return &RecordUpdater{
29+
logger: logger,
2730
record: record,
2831
schemaUpdater: schemaupdater.New(record.Schema()),
2932
tableName: tableName,
@@ -38,14 +41,18 @@ func (r *RecordUpdater) FlattenJSONFields() (arrow.Record, error) {
3841
if !ok || rawTypeSchema == "" {
3942
continue
4043
}
41-
var unprocessedTypeSchema map[string]any
44+
var unprocessedTypeSchema any
4245
if err := json.Unmarshal([]byte(rawTypeSchema), &unprocessedTypeSchema); err != nil {
43-
// In this case it can be an array
44-
fmt.Println("failed to unmarshal type schema", rawTypeSchema)
46+
r.logger.Error().Err(err).Msg("failed to unmarshal type schema")
4547
continue
4648
}
47-
typeSchema := preprocessTypeSchema(unprocessedTypeSchema)
48-
fieldTypeSchemas[field.Name] = typeSchema
49+
switch s := unprocessedTypeSchema.(type) {
50+
case map[string]any:
51+
typeSchema := preprocessTypeSchema(s)
52+
fieldTypeSchemas[field.Name] = typeSchema
53+
default:
54+
r.logger.Info().Msgf("skipping unsupported type schema: %T", unprocessedTypeSchema)
55+
}
4956
}
5057

5158
if len(fieldTypeSchemas) == 0 {

plugins/transformer/jsonflattener/client/recordupdater/record_updater_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/apache/arrow-go/v18/arrow/array"
99
"github.com/cloudquery/plugin-sdk/v4/schema"
1010
"github.com/cloudquery/plugin-sdk/v4/types"
11+
"github.com/rs/zerolog"
1112
"github.com/stretchr/testify/require"
1213
)
1314

@@ -17,7 +18,7 @@ func TestFlattenJSONFields(t *testing.T) {
1718
map[string]string{"col1": `{"key_a": "utf8", "key_b": "int64", "key_c": "bool"}`},
1819
[]arrow.Array{buildJSONColumn([]*any{toP(`{"key_a": "value", "key_b": 2, "key_c": true}`)})},
1920
)
20-
updater := New(record)
21+
updater := New(zerolog.Nop(), record)
2122

2223
updatedRecord, err := updater.FlattenJSONFields()
2324
require.NoError(t, err)
@@ -39,7 +40,7 @@ func TestFlattenJSONFieldsWithTimestamp(t *testing.T) {
3940
map[string]string{"col1": `{"key_a": "timestamp[us, tz=UTC]"}`},
4041
[]arrow.Array{buildJSONColumn([]*any{toP(`{"key_a": "2024-01-02T03:04:05.006Z"}`)})},
4142
)
42-
updater := New(record)
43+
updater := New(zerolog.Nop(), record)
4344

4445
updatedRecord, err := updater.FlattenJSONFields()
4546
require.NoError(t, err)
@@ -57,7 +58,7 @@ func TestFlattenJSONFieldsDoesntFlattenFieldsKeyedUTF8(t *testing.T) {
5758
map[string]string{"col1": `{"key_a": "utf8", "key_b": "int64", "utf8": "any"}`},
5859
[]arrow.Array{buildJSONColumn([]*any{toP(`{"key_a": "value", "key_b": 2, "utf8": "any"}`)})},
5960
)
60-
updater := New(record)
61+
updater := New(zerolog.Nop(), record)
6162

6263
updatedRecord, err := updater.FlattenJSONFields()
6364
require.NoError(t, err)
@@ -77,7 +78,7 @@ func TestNestedJSONFlattenedToFirstLevel(t *testing.T) {
7778
map[string]string{"col1": `{"nested": {"key_a": "utf8", "key_b": "int64", "key_c": "bool"}}`},
7879
[]arrow.Array{buildJSONColumn([]*any{toP(`{"nested": {"key_a": "value", "key_b": 2, "key_c": true}}`)})},
7980
)
80-
updater := New(record)
81+
updater := New(zerolog.Nop(), record)
8182

8283
updatedRecord, err := updater.FlattenJSONFields()
8384
require.NoError(t, err)
@@ -97,7 +98,7 @@ func TestDifferentCasingWorks(t *testing.T) {
9798
map[string]string{"col": `{"subcolumn_one": "utf8"}`}, // Note the different casing
9899
[]arrow.Array{buildJSONColumn([]*any{toP(`{"subcolumnOne": "value", "unknownColumn": 2}`)})},
99100
)
100-
updater := New(record)
101+
updater := New(zerolog.Nop(), record)
101102

102103
updatedRecord, err := updater.FlattenJSONFields()
103104
require.NoError(t, err)
@@ -118,7 +119,7 @@ func TestDifferentCasingWorksEvenWhenFirstRowIsNull(t *testing.T) {
118119
// Note first 3 rows are nil
119120
[]arrow.Array{buildJSONColumn([]*any{nil, nil, nil, toP(`{"subcolumnOne": "value", "unknownColumn": 2}`)})},
120121
)
121-
updater := New(record)
122+
updater := New(zerolog.Nop(), record)
122123

123124
updatedRecord, err := updater.FlattenJSONFields()
124125
require.NoError(t, err)

plugins/transformer/jsonflattener/client/transformers/transformers.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cloudquery/cloudquery/plugins/transformer/jsonflattener/client/recordupdater"
88
"github.com/cloudquery/cloudquery/plugins/transformer/jsonflattener/client/spec"
99
"github.com/cloudquery/cloudquery/plugins/transformer/jsonflattener/client/tablematcher"
10+
"github.com/rs/zerolog"
1011
)
1112

1213
type TransformationFn = func(arrow.Record) (arrow.Record, error)
@@ -18,10 +19,10 @@ type Transformer struct {
1819
schemaFn SchemaTransformationFn
1920
}
2021

21-
func NewFromSpec(sp spec.Spec) (*Transformer, error) {
22+
func NewFromSpec(logger zerolog.Logger, sp spec.Spec) (*Transformer, error) {
2223
tr := &Transformer{matcher: tablematcher.New(sp.Tables)}
2324

24-
tr.fn = FlattenJSONFields()
25+
tr.fn = FlattenJSONFields(logger)
2526
tr.schemaFn = transformSchema(tr.fn)
2627

2728
return tr, nil
@@ -58,9 +59,9 @@ func (tr *Transformer) TransformSchema(schema *arrow.Schema) (*arrow.Schema, err
5859
return tr.schemaFn(schema)
5960
}
6061

61-
func FlattenJSONFields() TransformationFn {
62+
func FlattenJSONFields(logger zerolog.Logger) TransformationFn {
6263
return func(record arrow.Record) (arrow.Record, error) {
63-
record, err := recordupdater.New(record).FlattenJSONFields()
64+
record, err := recordupdater.New(logger, record).FlattenJSONFields()
6465
if err != nil {
6566
return nil, err
6667
}

plugins/transformer/jsonflattener/client/transformers/transformers_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cloudquery/plugin-sdk/v4/schema"
1212
"github.com/cloudquery/plugin-sdk/v4/types"
1313
"github.com/goccy/go-json"
14+
"github.com/rs/zerolog"
1415
"github.com/stretchr/testify/require"
1516
)
1617

@@ -43,7 +44,7 @@ func TestTransform(t *testing.T) {
4344

4445
for _, tt := range tests {
4546
t.Run(tt.name, func(t *testing.T) {
46-
transformer, err := NewFromSpec(tt.spec)
47+
transformer, err := NewFromSpec(zerolog.Nop(), tt.spec)
4748
require.NoError(t, err, "NewFromSpec() should not return an error")
4849

4950
transformedRecord, err := transformer.Transform(tt.record)

0 commit comments

Comments
 (0)