Skip to content

Commit cb5f6bb

Browse files
authored
fix: Refactor glob filters (#488)
This simplifies our glob filtering and makes sure Tables are always stored in a tree like structured apart from when returned from `FlattenTables`. This solves a lot of potential bugs and places where we need to do things like `if parent != nil`. I think this also closes #475 as there is no need for another data structure. The reasoning for this is that our tables are tree like structured and it's easier imo to always keep it like this everywhere across the code and not have multiple data structures with similar methods and so on. This required a small update to our filtering logic which just works as a DFS and in-place filtering (for that to work I had to add a copy method which I think can be useful for other things in the future).
1 parent 1e2b708 commit cb5f6bb

7 files changed

Lines changed: 281 additions & 416 deletions

File tree

plugins/source.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,20 +119,11 @@ func (p *SourcePlugin) TablesForSpec(spec specs.Source) (schema.Tables, error) {
119119
if err := spec.Validate(); err != nil {
120120
return nil, fmt.Errorf("invalid spec: %w", err)
121121
}
122-
tables, err := p.listAndValidateTables(spec.Tables, spec.SkipTables)
122+
tables, err := p.tables.FilterDfs(spec.Tables, spec.SkipTables)
123123
if err != nil {
124-
return nil, err
124+
return nil, fmt.Errorf("failed to filter tables: %w", err)
125125
}
126-
// listAndValidateTables returns a flattened list - we only want to return
127-
// the top-level tables from this function.
128-
var topLevelTables schema.Tables
129-
for _, t := range tables {
130-
if t.Parent != nil {
131-
continue
132-
}
133-
topLevelTables = append(topLevelTables, t)
134-
}
135-
return topLevelTables, nil
126+
return tables, nil
136127
}
137128

138129
// Name return the name of this plugin
@@ -155,10 +146,12 @@ func (p *SourcePlugin) Sync(ctx context.Context, spec specs.Source, res chan<- *
155146
if err := spec.Validate(); err != nil {
156147
return fmt.Errorf("invalid spec: %w", err)
157148
}
158-
// flattens all tables and relations
159-
tables, err := p.listAndValidateTables(spec.Tables, spec.SkipTables)
149+
tables, err := p.tables.FilterDfs(spec.Tables, spec.SkipTables)
160150
if err != nil {
161-
return err
151+
return fmt.Errorf("failed to filter tables: %w", err)
152+
}
153+
if len(tables) == 0 {
154+
return fmt.Errorf("no tables to sync - please check your spec 'tables' and 'skip_tables' settings")
162155
}
163156

164157
c, err := p.newExecutionClient(ctx, p.logger, spec)

plugins/source_scheduler_dfs.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,6 @@ func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client sc
4040
// differences between this run and the next one.
4141
preInitialisedClients := make([][]schema.ClientMeta, len(tables))
4242
for i, table := range tables {
43-
if table.Parent != nil {
44-
// skip descendent tables here - they are handled in initWithClients
45-
continue
46-
}
4743
clients := []schema.ClientMeta{client}
4844
if table.Multiplex != nil {
4945
clients = table.Multiplex(client)
@@ -56,11 +52,6 @@ func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client sc
5652

5753
var wg sync.WaitGroup
5854
for i, table := range tables {
59-
if table.Parent != nil {
60-
// skip descendent tables here - they will be handled by the recursive
61-
// depth-first-search later.
62-
continue
63-
}
6455
table := table
6556
clients := preInitialisedClients[i]
6657
for _, client := range clients {
@@ -76,14 +67,14 @@ func (p *SourcePlugin) syncDfs(ctx context.Context, spec specs.Source, client sc
7667
defer p.tableSems[0].Release(1)
7768
// not checking for error here as nothing much todo.
7869
// the error is logged and this happens when context is cancelled
79-
p.resolveTableDfs(ctx, tables, table, client, nil, resolvedResources, 0)
70+
p.resolveTableDfs(ctx, table, client, nil, resolvedResources, 0)
8071
}()
8172
}
8273
}
8374
wg.Wait()
8475
}
8576

86-
func (p *SourcePlugin) resolveTableDfs(ctx context.Context, allIncludedTables schema.Tables, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) {
77+
func (p *SourcePlugin) resolveTableDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) {
8778
clientName := client.ID()
8879
logger := p.logger.With().Str("table", table.Name).Str("client", clientName).Logger()
8980

@@ -114,7 +105,7 @@ func (p *SourcePlugin) resolveTableDfs(ctx context.Context, allIncludedTables sc
114105
}()
115106

116107
for r := range res {
117-
p.resolveResourcesDfs(ctx, allIncludedTables, table, client, parent, r, resolvedResources, depth)
108+
p.resolveResourcesDfs(ctx, table, client, parent, r, resolvedResources, depth)
118109
}
119110

120111
// we don't need any waitgroups here because we are waiting for the channel to close
@@ -123,7 +114,7 @@ func (p *SourcePlugin) resolveTableDfs(ctx context.Context, allIncludedTables sc
123114
}
124115
}
125116

126-
func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, allIncludedTables schema.Tables, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources interface{}, resolvedResources chan<- *schema.Resource, depth int) {
117+
func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources interface{}, resolvedResources chan<- *schema.Resource, depth int) {
127118
resourcesSlice := helpers.InterfaceSlice(resources)
128119
if len(resourcesSlice) == 0 {
129120
return
@@ -161,11 +152,6 @@ func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, allIncludedTable
161152
resolvedResources <- resource
162153
for _, relation := range resource.Table.Relations {
163154
relation := relation
164-
if allIncludedTables.GetTopLevel(relation.Name) == nil {
165-
// this indicates that child table is skipped by user config,
166-
// so we should not sync it
167-
continue
168-
}
169155
if err := p.tableSems[depth].Acquire(ctx, 1); err != nil {
170156
// This means context was cancelled
171157
wg.Wait()
@@ -175,7 +161,7 @@ func (p *SourcePlugin) resolveResourcesDfs(ctx context.Context, allIncludedTable
175161
go func() {
176162
defer wg.Done()
177163
defer p.tableSems[depth].Release(1)
178-
p.resolveTableDfs(ctx, allIncludedTables, relation, client, resource, resolvedResources, depth+1)
164+
p.resolveTableDfs(ctx, relation, client, resource, resolvedResources, depth+1)
179165
}()
180166
}
181167
}

plugins/source_test.go

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -50,31 +50,6 @@ func testTableSuccess() *schema.Table {
5050
}
5151
}
5252

53-
func testTableWithChild() *schema.Table {
54-
return &schema.Table{
55-
Name: "test_table_parent",
56-
Resolver: testResolverSuccess,
57-
Columns: []schema.Column{
58-
{
59-
Name: "test_column",
60-
Type: schema.TypeInt,
61-
},
62-
},
63-
Relations: []*schema.Table{
64-
{
65-
Name: "test_table_child",
66-
Resolver: testResolverSuccess,
67-
Columns: []schema.Column{
68-
{
69-
Name: "test_column",
70-
Type: schema.TypeInt,
71-
},
72-
},
73-
},
74-
},
75-
}
76-
}
77-
7853
func testTableResolverPanic() *schema.Table {
7954
return &schema.Table{
8055
Name: "test_table_resolver_panic",
@@ -322,59 +297,6 @@ func testSyncTable(t *testing.T, tc syncTestCase) {
322297
}
323298
}
324299

325-
func TestTablesForSpec(t *testing.T) {
326-
tables := []*schema.Table{
327-
testTableWithChild(),
328-
testTableResolverPanic(),
329-
}
330-
plugin := NewSourcePlugin(
331-
"testSourcePlugin",
332-
"1.0.0",
333-
tables,
334-
newTestExecutionClient,
335-
)
336-
plugin.SetLogger(zerolog.New(zerolog.NewTestWriter(t)))
337-
t.Run("success case", func(t *testing.T) {
338-
spec := specs.Source{
339-
Name: "testSource",
340-
Path: "cloudquery/testSource",
341-
Tables: []string{
342-
"test_table_parent",
343-
},
344-
Version: "v1.0.0",
345-
Destinations: []string{"test"},
346-
}
347-
got, err := plugin.TablesForSpec(spec)
348-
if err != nil {
349-
t.Fatalf("unexpected error: %v", err)
350-
}
351-
if len(got) != 1 {
352-
t.Errorf("got %d tables, want %d", len(got), 1)
353-
}
354-
if got[0] != tables[0] {
355-
t.Errorf("got table %v, want %v", got[0].Name, tables[0].Name)
356-
}
357-
})
358-
t.Run("error case", func(t *testing.T) {
359-
spec := specs.Source{
360-
Name: "testSource",
361-
Path: "cloudquery/testSource",
362-
Tables: []string{
363-
"invalid_table",
364-
},
365-
Version: "v1.0.0",
366-
Destinations: []string{"test"},
367-
}
368-
_, err := plugin.TablesForSpec(spec)
369-
if err == nil {
370-
t.Fatalf("got no error, expected error indicating invalid table name")
371-
}
372-
if err.Error() != "tables entry matches no known tables: \"invalid_table\"" {
373-
t.Fatalf("got error = %v, expected %v", err.Error(), "tables entry matches no known tables: \"invalid_table\"")
374-
}
375-
})
376-
}
377-
378300
func TestIgnoredColumns(t *testing.T) {
379301
validateResources(t, schema.Resources{{
380302
Item: struct{ A *string }{},

plugins/source_validate.go

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ package plugins
22

33
import (
44
"fmt"
5-
"strings"
6-
7-
"github.com/cloudquery/plugin-sdk/schema"
85
)
96

107
func (p *SourcePlugin) validate() error {
@@ -26,75 +23,3 @@ func (p *SourcePlugin) validate() error {
2623

2724
return nil
2825
}
29-
30-
// listAndValidateTables returns all the tables matched by the `tables` and `skip_tables` config settings.
31-
// It will return ALL tables, including descendent tables. Callers should take care to only use the top-level
32-
// tables if that is what they need.
33-
func (p *SourcePlugin) listAndValidateTables(tables, skipTables []string) (schema.Tables, error) {
34-
if len(tables) == 0 {
35-
return nil, fmt.Errorf("list of tables is empty")
36-
}
37-
38-
var includedTables schema.Tables
39-
for _, t := range tables {
40-
tt := p.tables.GlobMatch(t)
41-
42-
// return an error if a table pattern doesn't match any known tables
43-
if len(tt) == 0 {
44-
return nil, fmt.Errorf("tables entry matches no known tables: %q", t)
45-
}
46-
for _, ttt := range tt {
47-
if includedTables.GetTopLevel(ttt.Name) != nil {
48-
// prevent duplicates
49-
continue
50-
}
51-
includedTables = append(includedTables, ttt)
52-
}
53-
}
54-
55-
// return an error if skip tables doesn't match any known tables
56-
var skippedTables schema.Tables
57-
skippedTableMap := map[string]bool{}
58-
for _, t := range skipTables {
59-
tt := p.tables.GlobMatch(t)
60-
if len(tt) == 0 {
61-
return nil, fmt.Errorf("skip_tables entry matches no known tables: %q", t)
62-
}
63-
for _, ttt := range tt {
64-
if skippedTables.GetTopLevel(ttt.Name) != nil {
65-
// prevent duplicates
66-
continue
67-
}
68-
skippedTables = append(skippedTables, ttt)
69-
}
70-
for _, st := range tt {
71-
skippedTableMap[st.Name] = true
72-
}
73-
}
74-
75-
// return an error if a table is both explicitly included and skipped
76-
var remainingTables schema.Tables
77-
for _, included := range includedTables {
78-
if skippedTableMap[included.Name] {
79-
continue
80-
}
81-
remainingTables = append(remainingTables, included)
82-
}
83-
84-
// return an error if child table is included without its parent
85-
for _, t := range remainingTables {
86-
var missingParents []string
87-
pt := t
88-
for pt.Parent != nil {
89-
if includedTables.GetTopLevel(pt.Parent.Name) == nil {
90-
missingParents = append(missingParents, pt.Parent.Name)
91-
}
92-
pt = pt.Parent
93-
}
94-
if len(missingParents) > 0 {
95-
return nil, fmt.Errorf("table %s is a descendant table and cannot be included without %s", t.Name, strings.Join(missingParents, ", "))
96-
}
97-
}
98-
99-
return remainingTables, nil
100-
}

0 commit comments

Comments
 (0)