Skip to content

Commit c8cea3b

Browse files
committed
wait for output streams on process completion in run_shell task
1 parent f2cb5bd commit c8cea3b

1 file changed

Lines changed: 12 additions & 11 deletions

File tree

  • pkg/coordinator/tasks/run_shell

pkg/coordinator/tasks/run_shell/task.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ func (t *Task) Execute(ctx context.Context) error {
9696
return err
9797
}
9898

99-
stdoutChan := t.readOutputStream(stdout, cmdLogger.WithField("stream", "stdout"))
99+
stdoutChan, stdoutCloseChan := t.readOutputStream(stdout, cmdLogger.WithField("stream", "stdout"))
100100
defer close(stdoutChan)
101101

102-
stderrChan := t.readOutputStream(stderr, cmdLogger.WithField("stream", "stderr"))
102+
stderrChan, stderrCloseChan := t.readOutputStream(stderr, cmdLogger.WithField("stream", "stderr"))
103103
defer close(stderrChan)
104104

105105
// add env vars
@@ -139,7 +139,7 @@ func (t *Task) Execute(ctx context.Context) error {
139139

140140
stdin.Close()
141141

142-
// wait for process
142+
// wait for process & output streams
143143
var execErr error
144144

145145
waitChan := make(chan bool)
@@ -148,9 +148,8 @@ func (t *Task) Execute(ctx context.Context) error {
148148

149149
execErr = command.Wait()
150150

151-
// give stdout/stderr handler some time to parse remaining outputs
152-
// TODO: find a better solution to wait for IO streams before continuing here
153-
time.Sleep(100 * time.Millisecond)
151+
<-stdoutCloseChan
152+
<-stderrCloseChan
154153
}()
155154

156155
// wait for output handler
@@ -179,12 +178,15 @@ cmdloop:
179178
return nil
180179
}
181180

182-
func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) chan string {
183-
resChan := make(chan string)
181+
func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) (readChan chan string, closeChan chan bool) {
182+
readChan = make(chan string)
183+
closeChan = make(chan bool)
184184

185185
go func() {
186186
var err error
187187

188+
defer close(closeChan)
189+
188190
reader := bufio.NewReader(pipe)
189191

190192
for err == nil {
@@ -197,7 +199,6 @@ func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) c
197199
line, isPrefix, err = reader.ReadLine()
198200
if err != nil {
199201
if err == io.EOF {
200-
logger.Errorf("EOF")
201202
break
202203
}
203204

@@ -210,12 +211,12 @@ func (t *Task) readOutputStream(pipe io.ReadCloser, logger logrus.FieldLogger) c
210211
}
211212

212213
if len(ln) > 0 {
213-
resChan <- string(ln)
214+
readChan <- string(ln)
214215
}
215216
}
216217
}()
217218

218-
return resChan
219+
return readChan, closeChan
219220
}
220221

221222
var outputVarPattern = regexp.MustCompile(`^::set-var +([^ ]+) +(.*)$`)

0 commit comments

Comments
 (0)