@@ -14,14 +14,16 @@ import (
1414 "github.com/aws/aws-sdk-go-v2/aws/arn"
1515 "github.com/aws/aws-sdk-go-v2/service/firehose"
1616 "github.com/aws/aws-sdk-go-v2/service/firehose/types"
17- "github.com/cloudquery/plugin-sdk/v2 /schema"
17+ "github.com/cloudquery/plugin-sdk/v3 /schema"
1818)
1919
20- const MaxRecordSizeBytes = 1024000
21- const MaxBatchRecords = 500
22- const MaxBatchSizeBytes = 4194000
20+ const (
21+ MaxRecordSizeBytes = 1024000
22+ MaxBatchRecords = 500
23+ MaxBatchSizeBytes = 4194000
24+ )
2325
24- func (c * Client ) Write (ctx context.Context , tables schema.Schemas , record <- chan arrow.Record ) error {
26+ func (c * Client ) Write (ctx context.Context , tables schema.Tables , record <- chan arrow.Record ) error {
2527 parsedARN , err := arn .Parse (c .pluginSpec .StreamARN )
2628 if err != nil {
2729 c .logger .Error ().Err (err ).Msg ("invalid firehose stream ARN" )
@@ -38,10 +40,14 @@ func (c *Client) Write(ctx context.Context, tables schema.Schemas, record <-chan
3840 batchSize := 0
3941
4042 for rec := range record {
41- tableName := schema .TableName (rec .Schema ())
42- table := tables .SchemaByName (tableName )
43+ tableName , ok := rec .Schema ().Metadata ().GetValue (schema .MetadataTableName )
44+ if ! ok {
45+ return fmt .Errorf ("%q metadata key not found" , schema .MetadataTableName )
46+ }
47+
48+ table := tables .Get (tableName )
4349 if table == nil {
44- panic ( fmt .Errorf ("table %s not found" , tableName ) )
50+ return fmt .Errorf ("table %s not found" , tableName )
4551 }
4652
4753 for row := int64 (0 ); row < rec .NumRows (); row ++ {
0 commit comments