|
5 | 5 |
|
6 | 6 | "github.com/apache/arrow-go/v18/arrow" |
7 | 7 | "github.com/apache/arrow-go/v18/arrow/array" |
8 | | - "github.com/apache/arrow-go/v18/arrow/memory" |
| 8 | + "github.com/cloudquery/cloudquery/cli/v6/internal/env" |
9 | 9 | "github.com/cloudquery/plugin-sdk/v4/schema" |
10 | 10 | ) |
11 | 11 |
|
@@ -138,30 +138,29 @@ func (t *RecordTransformer) Transform(record arrow.Record) arrow.Record { |
138 | 138 | nRows := int(record.NumRows()) |
139 | 139 |
|
140 | 140 | cols := make([]arrow.Array, 0, len(sc.Fields())+t.internalColumns) |
141 | | - if t.withSyncTime && !sc.HasField(cqSyncTime) { |
142 | | - ts, _ := arrow.TimestampFromTime(t.syncTime, arrow.Microsecond) |
143 | | - syncTimeBldr := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"}) |
144 | | - syncTimeBldr.Reserve(nRows) |
145 | | - for i := 0; i < nRows; i++ { |
146 | | - syncTimeBldr.Append(ts) |
| 141 | + if t.withSyncTime && (!sc.HasField(cqSyncTime) || env.IsCloud()) { |
| 142 | + syncTimeArray, _ := schema.TimestampArrayFromTime(t.syncTime, arrow.Microsecond, "UTC", nRows) |
| 143 | + if !sc.HasField(cqSyncTime) { |
| 144 | + cols = append(cols, syncTimeArray) |
| 145 | + } else { |
| 146 | + record, _ = schema.ReplaceFieldInRecord(record, cqSyncTime, syncTimeArray) |
147 | 147 | } |
148 | | - cols = append(cols, syncTimeBldr.NewArray()) |
149 | 148 | } |
150 | | - if t.withSourceName && !sc.HasField(cqSourceName) { |
151 | | - sourceBldr := array.NewStringBuilder(memory.DefaultAllocator) |
152 | | - sourceBldr.Reserve(nRows) |
153 | | - for i := 0; i < nRows; i++ { |
154 | | - sourceBldr.Append(t.sourceName) |
| 149 | + if t.withSourceName && (!sc.HasField(cqSourceName) || env.IsCloud()) { |
| 150 | + sourceNameArray := schema.StringArrayFromValue(t.sourceName, nRows) |
| 151 | + if !sc.HasField(cqSourceName) { |
| 152 | + cols = append(cols, sourceNameArray) |
| 153 | + } else { |
| 154 | + record, _ = schema.ReplaceFieldInRecord(record, cqSourceName, sourceNameArray) |
155 | 155 | } |
156 | | - cols = append(cols, sourceBldr.NewArray()) |
157 | 156 | } |
158 | | - if t.withSyncGroupID && !sc.HasField(cqSyncGroupId) { |
159 | | - syncGroupIdBldr := array.NewStringBuilder(memory.DefaultAllocator) |
160 | | - syncGroupIdBldr.Reserve(nRows) |
161 | | - for i := 0; i < nRows; i++ { |
162 | | - syncGroupIdBldr.Append(t.syncGroupId) |
| 157 | + if t.withSyncGroupID && (!sc.HasField(cqSyncGroupId) || env.IsCloud()) { |
| 158 | + syncGroupIdArray := schema.StringArrayFromValue(t.syncGroupId, nRows) |
| 159 | + if !sc.HasField(cqSyncGroupId) { |
| 160 | + cols = append(cols, syncGroupIdArray) |
| 161 | + } else { |
| 162 | + record, _ = schema.ReplaceFieldInRecord(record, cqSyncGroupId, syncGroupIdArray) |
163 | 163 | } |
164 | | - cols = append(cols, syncGroupIdBldr.NewArray()) |
165 | 164 | } |
166 | 165 |
|
167 | 166 | cols = append(cols, record.Columns()...) |
|
0 commit comments