Skip to content

Commit 8b9eb98

Browse files
author
Przemysław Stępień
authored
feat: Add ability to change client project_id (#20767)
#### Summary ⚠️ **If you're contributing to a plugin please read this section of the [contribution guidelines](https://github.com/cloudquery/cloudquery/blob/main/CONTRIBUTING.md#open-core-vs-open-source) 🧑‍🎓 before submitting this PR** ⚠️ In some scenarios users want to execute queries in context of different project than referenced in the query itself. I.e executing pricinipal has permissions to access table in project B only when executing within context of project A. This wasn't possible using only project_id spec. Now default project for execution can also be fetched from the environment/credentials by using `*detect-project-id*` variable.
1 parent 66cbf0a commit 8b9eb98

File tree

6 files changed

+25
-11
lines changed

6 files changed

+25
-11
lines changed

plugins/destination/bigquery/client/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func New(ctx context.Context, logger zerolog.Logger, specBytes []byte, opts plug
5959
return nil, err
6060
}
6161

62-
if err := validateCreds(ctx, c.client, c.spec.DatasetID); err != nil {
62+
if err := validateCreds(ctx, c.client, c.spec.DatasetID, c.spec.ProjectID); err != nil {
6363
return nil, fmt.Errorf("failed to validate credentials: %w", err)
6464
}
6565

@@ -84,7 +84,7 @@ func bqClient(ctx context.Context, s Spec) (*bigquery.Client, error) {
8484
if s.Endpoint != "" {
8585
opts = append(opts, option.WithEndpoint(s.Endpoint))
8686
}
87-
client, err := bigquery.NewClient(ctx, s.ProjectID, opts...)
87+
client, err := bigquery.NewClient(ctx, s.ClientProjectID, opts...)
8888
if err != nil {
8989
return nil, err
9090
}
@@ -94,8 +94,8 @@ func bqClient(ctx context.Context, s Spec) (*bigquery.Client, error) {
9494
return client, nil
9595
}
9696

97-
func validateCreds(ctx context.Context, c *bigquery.Client, datasetID string) error {
98-
datasetRef := c.Dataset(datasetID)
97+
func validateCreds(ctx context.Context, c *bigquery.Client, datasetID, projectID string) error {
98+
datasetRef := c.DatasetInProject(projectID, datasetID)
9999
_, err := datasetRef.Metadata(ctx)
100100
if err != nil {
101101
if isAPINotFoundError(err) {
@@ -124,7 +124,7 @@ func TestConnection(ctx context.Context, _ zerolog.Logger, specBytes []byte) err
124124
return err
125125
}
126126

127-
if err := validateCreds(ctx, c, s.DatasetID); err != nil {
127+
if err := validateCreds(ctx, c, s.DatasetID, s.ProjectID); err != nil {
128128
return err
129129
}
130130

plugins/destination/bigquery/client/migrate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *Client) MigrateTables(ctx context.Context, msgs message.WriteMigrateTab
6363

6464
func (c *Client) doesTableExist(ctx context.Context, client *bigquery.Client, table string) (bool, error) {
6565
c.logger.Debug().Str("dataset", c.spec.DatasetID).Str("table", table).Msg("Checking existence")
66-
tableRef := client.Dataset(c.spec.DatasetID).Table(table)
66+
tableRef := client.DatasetInProject(c.spec.ProjectID, c.spec.DatasetID).Table(table)
6767
md, err := tableRef.Metadata(ctx)
6868
if err != nil {
6969
if isAPINotFoundError(err) {
@@ -105,7 +105,7 @@ func (c *Client) waitForSchemaToMatch(ctx context.Context, client *bigquery.Clie
105105
// require this check to pass 3 times in a row to mitigate getting different responses from different BQ servers
106106
tries := 3
107107
for j := 0; j < tries; j++ {
108-
md, err := client.Dataset(c.spec.DatasetID).Table(table.Name).Metadata(ctx)
108+
md, err := client.DatasetInProject(c.spec.ProjectID, c.spec.DatasetID).Table(table.Name).Metadata(ctx)
109109
if err != nil {
110110
return err
111111
}
@@ -125,7 +125,7 @@ func (c *Client) waitForSchemaToMatch(ctx context.Context, client *bigquery.Clie
125125
}
126126

127127
func (c *Client) autoMigrateTable(ctx context.Context, client *bigquery.Client, table *schema.Table) error {
128-
bqTable := client.Dataset(c.spec.DatasetID).Table(table.Name)
128+
bqTable := client.DatasetInProject(c.spec.ProjectID, c.spec.DatasetID).Table(table.Name)
129129
md, err := bqTable.Metadata(ctx)
130130
if err != nil {
131131
return fmt.Errorf("failed to get metadata for table %q with error: %w", table.Name, err)
@@ -208,7 +208,7 @@ func (c *Client) createTable(ctx context.Context, client *bigquery.Client, table
208208
Schema: bqSchema,
209209
TimePartitioning: c.timePartitioning(),
210210
}
211-
return client.Dataset(c.spec.DatasetID).Table(table.Name).Create(ctx, &tm)
211+
return client.DatasetInProject(c.spec.ProjectID, c.spec.DatasetID).Table(table.Name).Create(ctx, &tm)
212212
}
213213

214214
func (c *Client) timePartitioning() *bigquery.TimePartitioning {

plugins/destination/bigquery/client/schema.json

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/destination/bigquery/client/spec.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Spec struct {
7272

7373
// Maximum interval between batch writes.
7474
BatchTimeout configtype.Duration `json:"batch_timeout"`
75+
76+
// Identifies the project context bq client should execute in. Defaults to the project_id. You can set it to *detect-project-id* to automatically detect project id from credentials in the environment.
77+
ClientProjectID string `json:"client_project_id"`
7578
}
7679

7780
//go:embed schema.json
@@ -90,6 +93,9 @@ func (s *Spec) SetDefaults() {
9093
if s.BatchTimeout.Duration() <= 0 {
9194
s.BatchTimeout = configtype.NewDuration(10 * time.Second)
9295
}
96+
if s.ClientProjectID == "" {
97+
s.ClientProjectID = s.ProjectID
98+
}
9399
}
94100

95101
func (s *Spec) Validate() error {

plugins/destination/bigquery/client/write.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *Client) serializeBatchForError(batch []*item) string {
5555
}
5656

5757
func (c *Client) WriteTableBatch(ctx context.Context, name string, msgs message.WriteInserts) error {
58-
inserter := c.client.Dataset(c.spec.DatasetID).Table(name).Inserter()
58+
inserter := c.client.DatasetInProject(c.spec.ProjectID, c.spec.DatasetID).Table(name).Inserter()
5959
inserter.IgnoreUnknownValues = true
6060
inserter.SkipInvalidRows = false
6161
batch := make([]*item, 0)

plugins/destination/bigquery/docs/_configuration.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ spec:
1818
# batch_size: 10000
1919
# batch_size_bytes: 5242880 # 5 MiB
2020
# batch_timeout: 10s
21+
# client_project_id: "*detect-project-id*"
2122
```
2223

2324
This example above expects the following environment variables to be set:
2425

2526
* `PROJECT_ID` - The Google Cloud Project ID
26-
* `DATASET_ID` - The Google Cloud BigQuery Dataset ID
27+
* `DATASET_ID` - The Google Cloud BigQuery Dataset ID
28+
29+
`client_project_id` variable can be used to run BigQuery queries in a project different from where the destination table is located.
30+
If you set client_project_id to `*detect-project-id*`, it will automatically detect the project ID from the environment variable or application default credentials.

0 commit comments

Comments
 (0)