@@ -50,7 +50,6 @@ type cursorPublisher struct {
5050// instances can add update operations to be executed after already pending
5151// update operations from older inputs instances that have been shutdown.
5252type updateOp struct {
53- store * store
5453 resource * resource
5554
5655 // state updates to persist
@@ -59,6 +58,18 @@ type updateOp struct {
5958 delta interface {}
6059}
6160
61+ func newUpdateOp (resource * resource , ts time.Time , delta interface {}) * updateOp {
62+ return & updateOp {
63+ resource : resource ,
64+ timestamp : ts ,
65+ delta : delta ,
66+ }
67+ }
68+
69+ func (op * updateOp ) Key () string {
70+ return op .resource .key
71+ }
72+
6273// Publish publishes an event. Publish returns false if the inputs cancellation context has been marked as done.
6374// If cursorUpdate is not nil, Publish updates the in memory state and create and updateOp for the pending update.
6475// It overwrite event.Private with the update operation, before finally sending the event.
@@ -69,7 +80,7 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er
6980 return c .forward (event )
7081 }
7182
72- op , err := createUpdateOp (c .cursor .store , c . cursor . resource , cursorUpdate )
83+ op , err := createUpdateOp (c .cursor .resource , cursorUpdate )
7384 if err != nil {
7485 return err
7586 }
@@ -86,43 +97,27 @@ func (c *cursorPublisher) forward(event beat.Event) error {
8697 return c .canceler .Err ()
8798}
8899
89- func createUpdateOp (store * store , resource * resource , updates interface {}) (* updateOp , error ) {
100+ func createUpdateOp (resource * resource , updates interface {}) (* updateOp , error ) {
90101 ts := time .Now ()
91102
92103 resource .stateMutex .Lock ()
93104 defer resource .stateMutex .Unlock ()
94105
95- cursor := resource .pendingCursor
96- if resource .activeCursorOperations == 0 {
97- var tmp interface {}
98- typeconv .Convert (& tmp , cursor )
99- resource .pendingCursor = tmp
100- cursor = tmp
101- }
102- if err := typeconv .Convert (& cursor , updates ); err != nil {
103- return nil , err
104- }
105- resource .pendingCursor = cursor
106+ resource .pendingUpdate = updates
106107
107108 resource .Retain ()
108109 resource .activeCursorOperations ++
109- return & updateOp {
110- resource : resource ,
111- store : store ,
112- timestamp : ts ,
113- delta : updates ,
114- }, nil
110+ return newUpdateOp (resource , ts , updates ), nil
115111}
116112
117113// done releases resources held by the last N updateOps.
118114func (op * updateOp ) done (n uint ) {
119115 op .resource .UpdatesReleaseN (n )
120116 op .resource = nil
121- * op = updateOp {}
122117}
123118
124119// Execute updates the persistent store with the scheduled changes and releases the resource.
125- func (op * updateOp ) Execute (n uint ) {
120+ func (op * updateOp ) Execute (store * store , n uint ) {
126121 resource := op .resource
127122
128123 resource .stateMutex .Lock ()
@@ -135,8 +130,8 @@ func (op *updateOp) Execute(n uint) {
135130 defer op .done (n )
136131 resource .activeCursorOperations -= n
137132 if resource .activeCursorOperations == 0 {
138- resource .cursor = resource .pendingCursor
139- resource .pendingCursor = nil
133+ resource .cursor = resource .pendingCursor ()
134+ resource .pendingCursorValue = nil
140135 } else {
141136 typeconv .Convert (& resource .cursor , op .delta )
142137 }
@@ -145,13 +140,12 @@ func (op *updateOp) Execute(n uint) {
145140 resource .internalState .Updated = op .timestamp
146141 }
147142
148- err := op . store .persistentStore .Set (resource .key , resource .inSyncStateSnapshot ())
143+ err := store .persistentStore .Set (resource .key , resource .inSyncStateSnapshot ())
149144 if err != nil {
150145 if ! statestore .IsClosed (err ) {
151- op . store .log .Errorf ("Failed to update state in the registry for '%v'" , resource .key )
146+ store .log .Errorf ("Failed to update state in the registry for '%v'" , resource .key )
152147 }
153148 } else {
154- resource .internalInSync = true
155149 resource .stored = true
156150 }
157151}
0 commit comments