44 "context"
55 "fmt"
66 "strings"
7- "time"
87
98 gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver"
109 "github.com/cenkalti/backoff/v4"
@@ -44,19 +43,21 @@ func (c *Client) WriteTableBatch(ctx context.Context, tableName string, msgs mes
4443 pks := table .PrimaryKeys ()
4544 if len (pks ) == 0 {
4645 // If no primary keys are defined, use all columns
47- for i := range table .Columns {
48- pks = append (pks , table .Columns [i ].Name )
49- }
46+ pks = table .Columns .Names ()
5047 }
51- nonPKs := make (map [string ]struct {})
52- for _ , c := range table .Columns {
53- if ! c .PrimaryKey {
54- nonPKs [c .Name ] = struct {}{}
48+ valueColumns := make ([]string , 0 , len (table .Columns )- len (pks ))
49+ if len (table .Columns )- len (pks ) > 0 {
50+ // not all columns are a part of "pk", so we need to account for the values
51+ for _ , col := range table .Columns {
52+ if ! col .PrimaryKey {
53+ valueColumns = append (valueColumns , col .Name )
54+ }
5555 }
5656 }
5757
5858 g := gremlingo .Traversal_ ().WithRemote (session ).V ().HasLabel (tableName )
5959 for i := range rows {
60+ g = g .V ().HasLabel (tableName )
6061 for _ , colName := range pks {
6162 g = g .Has (colName , rows [i ][colName ])
6263 }
@@ -71,41 +72,25 @@ func (c *Client) WriteTableBatch(ctx context.Context, tableName string, msgs mes
7172 ins ,
7273 )
7374
74- for colName := range nonPKs {
75+ for _ , colName := range valueColumns {
7576 g = g .Property (gremlingo .Cardinality .Single , colName , rows [i ][colName ])
7677 }
7778 }
7879
79- bo := backoff .NewExponentialBackOff ()
80- retryCount := 0
81-
82- for retryCount <= c .spec .MaxRetries {
83- retryCount ++
84-
80+ bo := backoff .WithContext (
81+ backoff .WithMaxRetries (backoff .NewExponentialBackOff (), uint64 (c .spec .MaxRetries )),
82+ ctx ,
83+ )
84+ return backoff .Retry (func () error {
8585 err = <- g .Iterate ()
8686 if err == nil {
8787 return nil
8888 }
89-
9089 if ! strings .Contains (err .Error (), "ConcurrentModificationException" ) {
91- return fmt .Errorf ("Iterate: %w" , err )
92- }
93-
94- if retryCount > c .spec .MaxRetries {
95- break
90+ return backoff .Permanent (fmt .Errorf ("Iterate: %w" , err ))
9691 }
97-
98- nb := bo .NextBackOff ()
99- c .logger .Debug ().Err (err ).Str ("backoff_duration" , nb .String ()).Msg ("Iterate failed, retrying" )
100-
101- select {
102- case <- ctx .Done ():
103- return ctx .Err ()
104- case <- time .After (nb ):
105- }
106- }
107-
108- return fmt .Errorf ("Max retries (%d) reached. Iterate: %w" , c .spec .MaxRetries , err )
92+ return err
93+ }, bo )
10994}
11095
11196func (c * Client ) Write (ctx context.Context , msgs <- chan message.WriteMessage ) error {
0 commit comments