Skip to content

Commit 5c05152

Browse files
committed
It's all working, and correct, and safe! Exporting to a sepearate PR.
1 parent aeaa0f3 commit 5c05152

7 files changed

Lines changed: 621 additions & 114 deletions

File tree

sdks/go/examples/streaming_wordcap/wordcap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func main() {
245245
}, imp)
246246
// out = beam.WindowInto(s, window.NewFixedWindows(10*time.Second), out)
247247
str := beam.ParDo(s, func(b int64) string {
248-
return fmt.Sprintf("element%03d", b)
248+
return fmt.Sprintf("%03d", b)
249249
}, out)
250250

251251
keyed := beam.ParDo(s, func(ctx context.Context, ts beam.EventTime, s string) (string, string) {

sdks/go/pkg/beam/core/runtime/exec/datasource.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ type DataSource struct {
4444
// OnTimerTransforms maps PtransformIDs to their execution nodes that handle OnTimer callbacks.
4545
OnTimerTransforms map[string]*ParDo
4646

47-
source DataManager
48-
state StateReader
47+
source DataManager
48+
state StateReader
49+
curInst string
4950

5051
index int64
5152
splitIdx int64
@@ -97,6 +98,7 @@ func (n *DataSource) Up(ctx context.Context) error {
9798
// StartBundle initializes this datasource for the bundle.
9899
func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error {
99100
n.mu.Lock()
101+
n.curInst = id
100102
n.source = data.Data
101103
n.state = data.State
102104
n.start = time.Now()
@@ -106,9 +108,19 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex
106108
return n.Out.StartBundle(ctx, id, data)
107109
}
108110

111+
// splitSuccess is a marker error to indicate we've reached the split index.
112+
// Akin to io.EOF.
113+
var splitSuccess = errors.New("split index reached")
114+
109115
// process handles converting elements from the data source to timers.
116+
//
117+
// The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
118+
// buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
119+
// PTransform is done processing data for this instruction.
110120
func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, ptransformID, timerFamilyID string) error) error {
111-
// TODO(riteshghorse): Pass in the PTransformIDs expecting OnTimer calls.
121+
defer func() {
122+
log.Infof(ctx, "%v DataSource.process returning", n.curInst)
123+
}()
112124
// The SID contains this instruction's expected data processing transform (this one).
113125
elms, err := n.source.OpenElementChan(ctx, n.SID, maps.Keys(n.OnTimerTransforms))
114126
if err != nil {
@@ -120,35 +132,44 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader
120132

121133
var byteCount int
122134
bcr := byteCountReader{reader: &r, count: &byteCount}
135+
136+
splitPrimaryComplete := map[string]bool{}
123137
for {
124138
var err error
125139
select {
126140
case e, ok := <-elms:
127141
// Channel closed, so time to exit
128142
if !ok {
129-
log.Infof(ctx, "%v: Data Channel closed", n)
143+
log.Infof(ctx, "%v Data Channel closed", n.curInst)
130144
return nil
131145
}
146+
if splitPrimaryComplete[e.PtransformID] {
147+
log.Infof(ctx, "%v skipping elements for %v, previous split", n.curInst, e.PtransformID)
148+
continue
149+
}
132150
if len(e.Data) > 0 {
133151
r.Reset(e.Data)
134152
err = data(&bcr, e.PtransformID)
135153
}
136154
if len(e.Timers) > 0 {
137-
// TODO remove this debug log.
138-
log.Infof(ctx, "timer received for %v; %v : %v", e.PtransformID, e.TimerFamilyID, e.Timers)
155+
log.Infof(ctx, "%v timer received for %v; %v : %v", n.curInst, e.PtransformID, e.TimerFamilyID, e.Timers)
139156
r.Reset(e.Timers)
140157
err = timer(&bcr, e.PtransformID, e.TimerFamilyID)
141158
}
142-
case <-ctx.Done():
143-
return nil
144-
}
145159

146-
if err != nil {
147-
if err != io.EOF {
160+
if err == splitSuccess {
161+
log.Infof(ctx, "%v split success received for %v", n.curInst, e.PtransformID)
162+
// Returning splitSuccess means we've split, and aren't consuming the remaining buffer.
163+
// We mark the PTransform done to ignore further data.
164+
splitPrimaryComplete[e.PtransformID] = true
165+
} else if err != nil && err != io.EOF {
148166
return errors.Wrap(err, "source failed")
149167
}
150-
// io.EOF means the reader successfully drained
168+
// io.EOF means the reader successfully drained.
151169
// We're ready for a new buffer.
170+
case <-ctx.Done():
171+
log.Infof(ctx, "%v context canceled for: %v", n.curInst, ctx.Err())
172+
return nil
152173
}
153174
}
154175
}
@@ -204,6 +225,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
204225
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
205226
ws, t, pn, err := DecodeWindowedValueHeader(wc, bcr.reader)
206227
if err != nil {
228+
log.Infof(ctx, "%v decode window error: %v", n.curInst, err)
207229
return err
208230
}
209231

@@ -216,7 +238,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
216238
pe.Windows = ws
217239
pe.Pane = pn
218240

219-
log.Infof(ctx, "%v[%v]: processing %+v,%v", n, ptransformID, pe.Elm, pe.Elm2)
241+
log.Infof(ctx, "%v[%v]: processing %+v,%v", n.curInst, ptransformID, pe.Elm, pe.Elm2)
220242

221243
var valReStreams []ReStream
222244
for _, cv := range cvs {
@@ -234,7 +256,7 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
234256
n.PCol.addSize(int64(bcr.reset()))
235257

236258
// Check if there's a continuation and return residuals
237-
// Needs to be done immeadiately after processing to not lose the element.
259+
// Needs to be done immediately after processing to not lose the element.
238260
if c := n.getProcessContinuation(); c != nil {
239261
cp, err := n.checkpointThis(ctx, c)
240262
if err != nil {
@@ -247,12 +269,10 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
247269
}
248270
// We've finished processing an element, check if we have finished a split.
249271
if n.incrementIndexAndCheckSplit() {
250-
break
272+
log.Infof(ctx, "%v split index reached", n.curInst)
273+
return splitSuccess
251274
}
252275
}
253-
// Signal data loop exit.
254-
log.Debugf(ctx, "%v: exiting data loop", n)
255-
return nil
256276
},
257277
func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {
258278
tmap, err := decodeTimer(cp, wc, bcr)
@@ -526,7 +546,7 @@ func (n *DataSource) checkpointThis(ctx context.Context, pc sdf.ProcessContinuat
526546
// The bufSize param specifies the estimated number of elements that will be
527547
// sent to this DataSource, and is used to be able to perform accurate splits
528548
// even if the DataSource has not yet received all its elements. A bufSize of
529-
// 0 or less indicates that its unknown, and so uses the current known size.
549+
// 0 or less indicates that it's unknown, and so uses the current known size.
530550
func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {
531551
if n == nil {
532552
return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)

0 commit comments

Comments
 (0)