Skip to content

Commit 9b911eb

Browse files
authored
feat(test): Add AllowNull option for test data (#913)
This allows to control whether some of the data types should be left non-nullable. Inspired by ClickHouse and the fact that [Arrays, Maps & Tuples are non-nullable](clickhouse.com/docs/en/sql-reference/data-types/nullable).
1 parent 9a3b4ad commit 9b911eb

9 files changed

Lines changed: 106 additions & 43 deletions

plugins/destination/nulls.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package destination
2+
3+
import (
4+
"github.com/apache/arrow/go/v13/arrow"
5+
"github.com/apache/arrow/go/v13/arrow/array"
6+
"github.com/apache/arrow/go/v13/arrow/memory"
7+
)
8+
9+
func stripNullsFromLists(records []arrow.Record) {
10+
for i := range records {
11+
cols := records[i].Columns()
12+
for c, col := range cols {
13+
if col.DataType().ID() != arrow.LIST {
14+
continue
15+
}
16+
17+
list := col.(*array.List)
18+
bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(*arrow.ListType).Elem())
19+
for j := 0; j < list.Len(); j++ {
20+
if list.IsNull(j) {
21+
bldr.AppendNull()
22+
continue
23+
}
24+
bldr.Append(true)
25+
vBldr := bldr.ValueBuilder()
26+
from, to := list.ValueOffsets(j)
27+
slc := array.NewSlice(list.ListValues(), from, to)
28+
for k := 0; k < int(to-from); k++ {
29+
if slc.IsNull(k) {
30+
continue
31+
}
32+
err := vBldr.AppendValueFromString(slc.ValueStr(k))
33+
if err != nil {
34+
panic(err)
35+
}
36+
}
37+
}
38+
cols[c] = bldr.NewArray()
39+
}
40+
records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows())
41+
}
42+
}
43+
44+
type AllowNullFunc func(arrow.DataType) bool
45+
46+
func (f AllowNullFunc) replaceNullsByEmpty(records []arrow.Record) {
47+
if f == nil {
48+
return
49+
}
50+
for i := range records {
51+
cols := records[i].Columns()
52+
for c, col := range records[i].Columns() {
53+
if col.NullN() == 0 || f(col.DataType()) {
54+
continue
55+
}
56+
57+
builder := array.NewBuilder(memory.DefaultAllocator, records[i].Column(c).DataType())
58+
for j := 0; j < col.Len(); j++ {
59+
if col.IsNull(j) {
60+
builder.AppendEmptyValue()
61+
continue
62+
}
63+
64+
if err := builder.AppendValueFromString(col.ValueStr(j)); err != nil {
65+
panic(err)
66+
}
67+
}
68+
cols[c] = builder.NewArray()
69+
}
70+
records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows())
71+
}
72+
}

plugins/destination/plugin_testing.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,25 @@ func getTestLogger(t *testing.T) zerolog.Logger {
101101
type NewPluginFunc func() *Plugin
102102

103103
type PluginTestSuiteRunnerOptions struct {
104-
IgnoreNullsInLists bool // strip nulls from lists before checking equality. Destination setups that don't support nulls in lists should set this to true.
104+
// IgnoreNullsInLists allows stripping null values from lists before comparison.
105+
// Destination setups that don't support nulls in lists should set this to true.
106+
IgnoreNullsInLists bool
107+
108+
// AllowNull is a custom func to determine whether a data type may be correctly represented as null.
109+
// Destinations that have problems representing some data types should provide a custom implementation here.
110+
// If this param is empty, the default is to allow all data types to be nullable.
111+
// When the value returned by this func is `true` the comparison is made with the empty value instead of null.
112+
AllowNull AllowNullFunc
113+
105114
schema.TestSourceOptions
106115
}
107116

117+
func WithTestSourceAllowNull(allowNull func(arrow.DataType) bool) func(o *PluginTestSuiteRunnerOptions) {
118+
return func(o *PluginTestSuiteRunnerOptions) {
119+
o.AllowNull = allowNull
120+
}
121+
}
122+
108123
func WithTestIgnoreNullsInLists() func(o *PluginTestSuiteRunnerOptions) {
109124
return func(o *PluginTestSuiteRunnerOptions) {
110125
o.IgnoreNullsInLists = true

plugins/destination/plugin_testing_migrate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
5454
if err := p.writeAll(ctx, sourceSpec, syncTime, resource2); err != nil {
5555
return fmt.Errorf("failed to write one after migration: %w", err)
5656
}
57+
58+
testOpts.AllowNull.replaceNullsByEmpty(resource2)
5759
if testOpts.IgnoreNullsInLists {
5860
stripNullsFromLists(resource2)
5961
}

plugins/destination/plugin_testing_overwrite.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
4444
return fmt.Errorf("failed to write all: %w", err)
4545
}
4646
sortRecordsBySyncTime(table, resources)
47+
testOpts.AllowNull.replaceNullsByEmpty(resources)
4748
if testOpts.IgnoreNullsInLists {
4849
stripNullsFromLists(resources)
4950
}
@@ -85,6 +86,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
8586
return fmt.Errorf("failed to write one second time: %w", err)
8687
}
8788

89+
testOpts.AllowNull.replaceNullsByEmpty(updatedResource)
8890
if testOpts.IgnoreNullsInLists {
8991
stripNullsFromLists(updatedResource)
9092
}

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
6161
if len(resourcesRead) != 2 {
6262
return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead))
6363
}
64+
testOpts.AllowNull.replaceNullsByEmpty(resources)
6465
if testOpts.IgnoreNullsInLists {
6566
stripNullsFromLists(resources)
6667
}
@@ -111,6 +112,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
111112
if len(resourcesRead) != 1 {
112113
return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead))
113114
}
115+
testOpts.AllowNull.replaceNullsByEmpty(resources)
114116
if testOpts.IgnoreNullsInLists {
115117
stripNullsFromLists(resources)
116118
}
@@ -128,6 +130,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
128130
}
129131

130132
// we expect the only resource returned to match the updated resource we wrote
133+
testOpts.AllowNull.replaceNullsByEmpty(updatedResources)
131134
if testOpts.IgnoreNullsInLists {
132135
stripNullsFromLists(updatedResources)
133136
}

plugins/destination/plugin_testing_write_append.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
7070
return fmt.Errorf("expected %d resources, got %d", expectedResource, len(resourcesRead))
7171
}
7272

73+
testOpts.AllowNull.replaceNullsByEmpty(record1)
74+
testOpts.AllowNull.replaceNullsByEmpty(record2)
7375
if testOpts.IgnoreNullsInLists {
7476
stripNullsFromLists(record1)
7577
stripNullsFromLists(record2)

plugins/destination/strip_nulls.go

Lines changed: 0 additions & 42 deletions
This file was deleted.

types/inet.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ func NewInetBuilder(builder *array.ExtensionBuilder) *InetBuilder {
2020
return &InetBuilder{ExtensionBuilder: builder}
2121
}
2222

23+
func (b *InetBuilder) AppendEmptyValue() {
24+
const zeroIPNet = "0.0.0.0/0"
25+
b.ExtensionBuilder.Builder.(*array.BinaryBuilder).Append([]byte(zeroIPNet))
26+
}
27+
2328
func (b *InetBuilder) Append(v *net.IPNet) {
2429
if v == nil {
2530
b.AppendNull()

types/mac.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ func NewMACBuilder(builder *array.ExtensionBuilder) *MACBuilder {
2020
return &MACBuilder{ExtensionBuilder: builder}
2121
}
2222

23+
func (b *MACBuilder) AppendEmptyValue() {
24+
b.Append(make(net.HardwareAddr, 6))
25+
}
26+
2327
func (b *MACBuilder) Append(v net.HardwareAddr) {
2428
b.ExtensionBuilder.Builder.(*array.BinaryBuilder).Append(v[:])
2529
}

0 commit comments

Comments
 (0)