@@ -44,8 +44,9 @@ type DataSource struct {
4444 // OnTimerTransforms maps PtransformIDs to their execution nodes that handle OnTimer callbacks.
4545 OnTimerTransforms map [string ]* ParDo
4646
47- source DataManager
48- state StateReader
47+ source DataManager
48+ state StateReader
49+ curInst string
4950
5051 index int64
5152 splitIdx int64
@@ -97,6 +98,7 @@ func (n *DataSource) Up(ctx context.Context) error {
9798// StartBundle initializes this datasource for the bundle.
9899func (n * DataSource ) StartBundle (ctx context.Context , id string , data DataContext ) error {
99100 n .mu .Lock ()
101+ n .curInst = id
100102 n .source = data .Data
101103 n .state = data .State
102104 n .start = time .Now ()
@@ -106,9 +108,19 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex
106108 return n .Out .StartBundle (ctx , id , data )
107109}
108110
111+ // splitSuccess is a marker error to indicate we've reached the split index.
112+ // Akin to io.EOF.
113+ var splitSuccess = errors .New ("split index reached" )
114+
109115// process handles converting elements from the data source to timers.
116+ //
117+ // The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
118+ // buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
119+ // PTransform is done processing data for this instruction.
110120func (n * DataSource ) process (ctx context.Context , data func (bcr * byteCountReader , ptransformID string ) error , timer func (bcr * byteCountReader , ptransformID , timerFamilyID string ) error ) error {
111- // TODO(riteshghorse): Pass in the PTransformIDs expecting OnTimer calls.
121+ defer func () {
122+ log .Infof (ctx , "%v DataSource.process returning" , n .curInst )
123+ }()
112124 // The SID contains this instruction's expected data processing transform (this one).
113125 elms , err := n .source .OpenElementChan (ctx , n .SID , maps .Keys (n .OnTimerTransforms ))
114126 if err != nil {
@@ -120,35 +132,44 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader
120132
121133 var byteCount int
122134 bcr := byteCountReader {reader : & r , count : & byteCount }
135+
136+ splitPrimaryComplete := map [string ]bool {}
123137 for {
124138 var err error
125139 select {
126140 case e , ok := <- elms :
127141 // Channel closed, so time to exit
128142 if ! ok {
129- log .Infof (ctx , "%v: Data Channel closed" , n )
143+ log .Infof (ctx , "%v Data Channel closed" , n . curInst )
130144 return nil
131145 }
146+ if splitPrimaryComplete [e .PtransformID ] {
147+ log .Infof (ctx , "%v skipping elements for %v, previous split" , n .curInst , e .PtransformID )
148+ continue
149+ }
132150 if len (e .Data ) > 0 {
133151 r .Reset (e .Data )
134152 err = data (& bcr , e .PtransformID )
135153 }
136154 if len (e .Timers ) > 0 {
137- // TODO remove this debug log.
138- log .Infof (ctx , "timer received for %v; %v : %v" , e .PtransformID , e .TimerFamilyID , e .Timers )
155+ log .Infof (ctx , "%v timer received for %v; %v : %v" , n .curInst , e .PtransformID , e .TimerFamilyID , e .Timers )
139156 r .Reset (e .Timers )
140157 err = timer (& bcr , e .PtransformID , e .TimerFamilyID )
141158 }
142- case <- ctx .Done ():
143- return nil
144- }
145159
146- if err != nil {
147- if err != io .EOF {
160+ if err == splitSuccess {
161+ log .Infof (ctx , "%v split success received for %v" , n .curInst , e .PtransformID )
162+ // Returning splitSuccess means we've split, and aren't consuming the remaining buffer.
163+ // We mark the PTransform done to ignore further data.
164+ splitPrimaryComplete [e .PtransformID ] = true
165+ } else if err != nil && err != io .EOF {
148166 return errors .Wrap (err , "source failed" )
149167 }
150- // io.EOF means the reader successfully drained
168+ // io.EOF means the reader successfully drained.
151169 // We're ready for a new buffer.
170+ case <- ctx .Done ():
171+ log .Infof (ctx , "%v context canceled for: %v" , n .curInst , ctx .Err ())
172+ return nil
152173 }
153174 }
154175}
@@ -204,6 +225,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
204225 // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
205226 ws , t , pn , err := DecodeWindowedValueHeader (wc , bcr .reader )
206227 if err != nil {
228+ log .Infof (ctx , "%v decode window error: %v" , n .curInst , err )
207229 return err
208230 }
209231
@@ -216,7 +238,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
216238 pe .Windows = ws
217239 pe .Pane = pn
218240
219- log .Infof (ctx , "%v[%v]: processing %+v,%v" , n , ptransformID , pe .Elm , pe .Elm2 )
241+ log .Infof (ctx , "%v[%v]: processing %+v,%v" , n . curInst , ptransformID , pe .Elm , pe .Elm2 )
220242
221243 var valReStreams []ReStream
222244 for _ , cv := range cvs {
@@ -234,7 +256,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
234256 n .PCol .addSize (int64 (bcr .reset ()))
235257
236258 // Check if there's a continuation and return residuals
237- // Needs to be done immeadiately after processing to not lose the element.
259+ // Needs to be done immediately after processing to not lose the element.
238260 if c := n .getProcessContinuation (); c != nil {
239261 cp , err := n .checkpointThis (ctx , c )
240262 if err != nil {
@@ -247,12 +269,10 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
247269 }
248270 // We've finished processing an element, check if we have finished a split.
249271 if n .incrementIndexAndCheckSplit () {
250- break
272+ log .Infof (ctx , "%v split index reached" , n .curInst )
273+ return splitSuccess
251274 }
252275 }
253- // Signal data loop exit.
254- log .Debugf (ctx , "%v: exiting data loop" , n )
255- return nil
256276 },
257277 func (bcr * byteCountReader , ptransformID , timerFamilyID string ) error {
258278 tmap , err := decodeTimer (cp , wc , bcr )
@@ -526,7 +546,7 @@ func (n *DataSource) checkpointThis(ctx context.Context, pc sdf.ProcessContinuat
526546// The bufSize param specifies the estimated number of elements that will be
527547// sent to this DataSource, and is used to be able to perform accurate splits
528548// even if the DataSource has not yet received all its elements. A bufSize of
529- // 0 or less indicates that its unknown, and so uses the current known size.
549+ // 0 or less indicates that it's unknown, and so uses the current known size.
530550func (n * DataSource ) Split (ctx context.Context , splits []int64 , frac float64 , bufSize int64 ) (SplitResult , error ) {
531551 if n == nil {
532552 return SplitResult {}, fmt .Errorf ("failed to split at requested splits: {%v}, DataSource not initialized" , splits )
0 commit comments