@@ -10,7 +10,7 @@ import (
1010 "github.com/apache/arrow/go/v13/arrow"
1111 "github.com/apache/arrow/go/v13/arrow/array"
1212 "github.com/apache/arrow/go/v13/arrow/memory"
13- "github.com/cloudquery/plugin-sdk/v2 /schema"
13+ "github.com/cloudquery/plugin-sdk/v3 /schema"
1414)
1515
1616const (
@@ -24,67 +24,111 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
2424 }
2525 switch b := bldr .(type ) {
2626 case * array.BooleanBuilder :
27- b .Append (val .(bool ))
27+ if boolVal , ok := val .(bool ); ok {
28+ b .Append (boolVal )
29+ return nil
30+ }
31+ return b .AppendValueFromString (val .(string ))
2832 case * array.Int8Builder :
2933 u , err := strconv .ParseInt (val .(string ), 10 , 8 )
3034 if err != nil {
31- return err
35+ return fmt . Errorf ( "failed to parse int8: %w" , err )
3236 }
3337 b .Append (int8 (u ))
3438 case * array.Int16Builder :
3539 u , err := strconv .ParseInt (val .(string ), 10 , 16 )
3640 if err != nil {
37- return err
41+ return fmt . Errorf ( "failed to parse int16: %w" , err )
3842 }
3943 b .Append (int16 (u ))
4044 case * array.Int32Builder :
4145 u , err := strconv .ParseInt (val .(string ), 10 , 32 )
4246 if err != nil {
43- return err
47+ return fmt . Errorf ( "failed to parse int32: %w" , err )
4448 }
4549 b .Append (int32 (u ))
4650 case * array.Int64Builder :
4751 u , err := strconv .ParseInt (val .(string ), 10 , 64 )
4852 if err != nil {
49- return err
53+ return fmt . Errorf ( "failed to parse int64: %w" , err )
5054 }
5155 b .Append (u )
5256 case * array.Uint8Builder :
5357 u , err := strconv .ParseUint (val .(string ), 10 , 8 )
5458 if err != nil {
55- return err
59+ return fmt . Errorf ( "failed to parse uint8: %w" , err )
5660 }
5761 b .Append (uint8 (u ))
5862 case * array.Uint16Builder :
5963 u , err := strconv .ParseUint (val .(string ), 10 , 16 )
6064 if err != nil {
61- return err
65+ return fmt . Errorf ( "failed to parse uint16: %w" , err )
6266 }
6367 b .Append (uint16 (u ))
6468 case * array.Uint32Builder :
6569 u , err := strconv .ParseUint (val .(string ), 10 , 32 )
6670 if err != nil {
67- return err
71+ return fmt . Errorf ( "failed to parse uint32: %w" , err )
6872 }
6973 b .Append (uint32 (u ))
7074 case * array.Uint64Builder :
7175 u , err := strconv .ParseUint (val .(string ), 10 , 64 )
7276 if err != nil {
73- return err
77+ return fmt . Errorf ( "failed to parse uint64: %w" , err )
7478 }
7579 b .Append (u )
7680 case * array.Float32Builder :
77- b .Append (val .(float32 ))
81+ if floatVal , ok := val .(float64 ); ok {
82+ b .Append (float32 (floatVal ))
83+ } else {
84+ floatVal , err := strconv .ParseFloat (val .(string ), 32 )
85+ if err != nil {
86+ return fmt .Errorf ("failed to parse float32: %w" , err )
87+ }
88+ b .Append (float32 (floatVal ))
89+ }
7890 case * array.Float64Builder :
79- b .Append (val .(float64 ))
91+ if floatVal , ok := val .(float64 ); ok {
92+ b .Append (floatVal )
93+ } else {
94+ floatVal , err := strconv .ParseFloat (val .(string ), 64 )
95+ if err != nil {
96+ return fmt .Errorf ("failed to parse float64: %w" , err )
97+ }
98+ b .Append (floatVal )
99+ }
80100 case * array.StringBuilder :
81101 b .Append (val .(string ))
82102 case * array.LargeStringBuilder :
83103 b .Append (val .(string ))
84104 case * array.BinaryBuilder :
85105 b .Append (val .([]uint8 ))
86106 case * array.TimestampBuilder :
87- b .Append (arrow .Timestamp (val .(time.Time ).UnixMicro ()))
107+ var timeVal time.Time
108+ // nolint:revive
109+ if t , ok := val .(time.Time ); ok {
110+ timeVal = t
111+ } else {
112+ t , err := arrow .TimestampFromString (val .(string ), b .Type ().(* arrow.TimestampType ).Unit )
113+ if err != nil {
114+ return fmt .Errorf ("failed to parse timestamp: %w" , err )
115+ }
116+ b .Append (t )
117+ return nil
118+ }
119+
120+ switch b .Type ().(* arrow.TimestampType ).Unit {
121+ case arrow .Second :
122+ b .Append (arrow .Timestamp (timeVal .UTC ().Unix ()))
123+ case arrow .Millisecond :
124+ b .Append (arrow .Timestamp (timeVal .UTC ().UnixMilli ()))
125+ case arrow .Microsecond :
126+ b .Append (arrow .Timestamp (timeVal .UTC ().UnixMicro ()))
127+ case arrow .Nanosecond :
128+ b .Append (arrow .Timestamp (timeVal .UTC ().UnixNano ()))
129+ default :
130+ return fmt .Errorf ("unsupported timestamp unit %s" , f .Type .(* arrow.TimestampType ).Unit )
131+ }
88132 case array.ListLikeBuilder :
89133 b .Append (true )
90134 valBuilder := b .ValueBuilder ()
@@ -110,17 +154,18 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
110154 return fmt .Errorf ("unsupported type %T with builder %T" , val , bldr )
111155 }
112156 if err := bldr .AppendValueFromString (v ); err != nil {
113- return err
157+ return fmt . Errorf ( "failed to AppendValueFromString %s: %w" , v , err )
114158 }
115159 }
116160 return nil
117161}
118162
119- func (c * Client ) reverseTransformer (sc * arrow.Schema , values []any ) (arrow.Record , error ) {
163+ func (c * Client ) reverseTransformer (table * schema.Table , values []any ) (arrow.Record , error ) {
164+ sc := table .ToArrowSchema ()
120165 bldr := array .NewRecordBuilder (memory .DefaultAllocator , sc )
121166 for i , f := range sc .Fields () {
122167 if err := c .reverseTransform (f , bldr .Field (i ), * values [i ].(* any )); err != nil {
123- return nil , err
168+ return nil , fmt . Errorf ( "failed to transform field %s: %w" , f . Name , err )
124169 }
125170 }
126171 rec := bldr .NewRecord ()
@@ -153,10 +198,10 @@ func snowflakeStrToArray(val string) []string {
153198 return strs
154199}
155200
156- func (c * Client ) Read (ctx context.Context , table * arrow. Schema , sourceName string , res chan <- arrow.Record ) error {
157- tableName := schema . TableName ( table )
158- colNames := make ([]string , 0 , len (table .Fields () ))
159- for _ , col := range table .Fields () {
201+ func (c * Client ) Read (ctx context.Context , table * schema. Table , sourceName string , res chan <- arrow.Record ) error {
202+ tableName := table . Name
203+ colNames := make ([]string , 0 , len (table .Columns ))
204+ for _ , col := range table .Columns {
160205 colNames = append (colNames , `"` + col .Name + `"` )
161206 }
162207 cols := strings .Join (colNames , ", " )
@@ -167,7 +212,7 @@ func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName strin
167212 }
168213 defer rows .Close ()
169214 for rows .Next () {
170- values := make ([]any , len (table .Fields () ))
215+ values := make ([]any , len (table .Columns ))
171216 for i := range values {
172217 values [i ] = new (any )
173218 }
0 commit comments