Skip to content

Commit 6a53a7c

Browse files
committed
more fixes
1 parent 7980967 commit 6a53a7c

2 files changed

Lines changed: 17 additions & 5 deletions

File tree

internal/servers/destination/v0/destinations.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,21 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
135135
}
136136
return status.Errorf(codes.InvalidArgument, "failed to unmarshal resource: %v", err)
137137
}
138+
table := tablesV3.Get(origResource.TableName)
139+
if table == nil {
140+
close(resources)
141+
if wgErr := eg.Wait(); wgErr != nil {
142+
return status.Errorf(codes.InvalidArgument, "failed to get table: %s and write failed: %v", origResource.TableName, wgErr)
143+
}
144+
return status.Errorf(codes.InvalidArgument, "failed to get table: %s", origResource.TableName)
145+
}
146+
138147
// this is a check to keep backward compatible for sources that are not adding
139148
// source and sync time
140-
if len(origResource.Data) < len(tables.Get(origResource.TableName).Columns) {
149+
if len(origResource.Data) < len(table.Columns) {
141150
origResource.Data = append([]schemav2.CQType{sourceColumn, syncTimeColumn}, origResource.Data...)
142151
}
143-
tablesv2 := TablesV2ToV3(tables)
144-
convertedResource := CQTypesToRecord(memory.DefaultAllocator, []schemav2.CQTypes{origResource.Data}, tablesv2.Get(origResource.TableName).ToArrowSchema())
152+
convertedResource := CQTypesToRecord(memory.DefaultAllocator, []schemav2.CQTypes{origResource.Data}, table.ToArrowSchema())
145153
select {
146154
case resources <- convertedResource:
147155
case <-ctx.Done():

internal/servers/destination/v0/schemav1v2.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/cloudquery/plugin-sdk/v3/types"
1313
)
1414

15-
func TablesV2ToV3(tables []*schemav2.Table) schema.Tables {
15+
func TablesV2ToV3(tables schemav2.Tables) schema.Tables {
1616
res := make(schema.Tables, len(tables))
1717
for i, t := range tables {
1818
res[i] = TableV2ToV3(t)
@@ -21,13 +21,17 @@ func TablesV2ToV3(tables []*schemav2.Table) schema.Tables {
2121
}
2222

2323
func TableV2ToV3(table *schemav2.Table) *schema.Table {
24-
return &schema.Table{
24+
newTable := &schema.Table{
2525
Name: table.Name,
2626
Description: table.Description,
2727
Columns: ColumnsV2ToV3(table.Columns),
2828
IgnoreInTests: table.IgnoreInTests,
2929
IsIncremental: table.IsIncremental,
3030
}
31+
if len(table.Relations) > 0 {
32+
newTable.Relations = TablesV2ToV3(table.Relations)
33+
}
34+
return newTable
3135
}
3236

3337
func ColumnsV2ToV3(columns []schemav2.Column) []schema.Column {

0 commit comments

Comments
 (0)