Skip to content

Conversation

@marianogappa
Copy link
Contributor

@marianogappa marianogappa commented Jul 5, 2024

fixes cloudquery/cloudquery#15543

This PR adds the State Client to the Python SDK, which enables Incremental syncs for Python Plugins.

Using: state_client = StateClientBuilder.build(backend_options=None)

✓ Code/test-typeform-postgres-sync  $ cli sync cloudquery-config                                ⏱ 11:48:39
Loading spec(s) from cloudquery-config
Starting sync for: typeform (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 457, Errors: 0, Warnings: 0, Time: 2s

Using: state_client = StateClientBuilder.build(backend_options=options.backend_options)

✓ Code/test-typeform-postgres-sync  $ cli sync cloudquery-config                                ⏱ 11:48:53
Loading spec(s) from cloudquery-config
Starting sync for: typeform (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 20, Errors: 0, Warnings: 0, Time: 3s
✓ Code/test-typeform-postgres-sync  $

(note 20 resources instead of 457)

Which with this configuration:

kind: source
# Common source-plugin configuration
spec:
  name: typeform
  registry: grpc
  path: localhost:7777
  tables: ["*"]
  destinations: ["postgresql"]
  backend_options:
    table_name: "cq_state_typeform"
    connection: "@@plugins.postgresql.connection"

Creates the following state table:

Screenshot 2024-07-05 at 13 14 16

Example usage:

# Create state client and make sure it flushes after sync finishes
state_client = StateClientBuilder.build(backend_options=options.backend_options)
self._scheduler.set_post_sync_hook(state_client.flush)

...

# Set state_client on all resolvers
for resolver in resolvers:
    resolver.set_state_client(state_client)
    for resolver in resolver.child_resolvers:
        resolver.set_state_client(state_client)

Then, in the resolver (example):

since = self.state_client.get_key("typeform_form_responses_since")
...
for result in api.response:
   if not since or result["submitted_at"] > since:
          self.state_client.set_key("typeform_form_responses_since", form_response["submitted_at"])
          since = result["submitted_at"]
   ...

Note that I had to add this function: self._scheduler.set_post_sync_hook because all plugins end their sync with return self._scheduler.sync(...). So there's no way to flush otherwise.

@marianogappa marianogappa requested review from a team and erezrokah and removed request for a team July 5, 2024 12:31
@marianogappa marianogappa requested review from hermanschaaf and removed request for erezrokah July 5, 2024 12:35
skip_tables=request.skip_tables,
tables=request.tables,
backend_options=None,
backend_options=request.backend, # TODO: shouldn't this be "backend_options"? I put "backend" based on live breakpoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still relevant? I guess if it works, it works? 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it works, it works ® Removed.

Copy link
Member

@hermanschaaf hermanschaaf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! 🚀

@marianogappa marianogappa merged commit 3193879 into main Jul 8, 2024
@marianogappa marianogappa deleted the mariano/state-client branch July 8, 2024 08:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Support incremental syncs in Python SDK

3 participants