Skip to content

Commit ba69c91

Browse files
authored
feat: Handle panics in handlers (#575)
Handle panics in handlers by converting panics to `error`
1 parent 6232a33 commit ba69c91

3 files changed

Lines changed: 163 additions & 4 deletions

File tree

stream.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package filetypes
22

33
import (
4+
"errors"
45
"fmt"
56
"io"
67

@@ -54,11 +55,22 @@ func (cl *Client) StartStream(table *schema.Table, uploadFunc func(io.Reader) er
5455
}
5556

5657
// Write to the stream opened with StartStream.
57-
func (s *Stream) Write(records []arrow.Record) error {
58+
func (s *Stream) Write(records []arrow.Record) (retErr error) {
5859
if len(records) == 0 {
5960
return nil
6061
}
6162

63+
defer func() {
64+
if msg := recover(); msg != nil {
65+
switch v := msg.(type) {
66+
case error:
67+
retErr = fmt.Errorf("panic: %w [recovered]", v)
68+
default:
69+
retErr = fmt.Errorf("panic: %v [recovered]", msg)
70+
}
71+
}
72+
}()
73+
6274
return s.h.WriteContent(records)
6375
}
6476

@@ -74,11 +86,11 @@ func (s *Stream) FinishWithError(finishError error) error {
7486
return <-s.done
7587
}
7688

77-
if err := s.h.WriteFooter(); err != nil {
89+
if err := s.writeFooter(); err != nil {
7890
if !s.wc.closed {
7991
_ = s.wc.CloseWithError(err)
8092
}
81-
return fmt.Errorf("failed to write footer: %w", <-s.done)
93+
return fmt.Errorf("failed to write footer: %w", errors.Join(err, <-s.done))
8294
}
8395

8496
// ParquetWriter likes to close the underlying writer, so we need to check if it's already closed
@@ -90,3 +102,18 @@ func (s *Stream) FinishWithError(finishError error) error {
90102

91103
return <-s.done
92104
}
105+
106+
func (s *Stream) writeFooter() (retErr error) {
107+
defer func() {
108+
if msg := recover(); msg != nil {
109+
switch v := msg.(type) {
110+
case error:
111+
retErr = fmt.Errorf("panic: %w [recovered]", v)
112+
default:
113+
retErr = fmt.Errorf("panic: %v [recovered]", msg)
114+
}
115+
}
116+
}()
117+
118+
return s.h.WriteFooter()
119+
}

stream_panic_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package filetypes
2+
3+
import (
4+
"errors"
5+
"io"
6+
"testing"
7+
8+
"github.com/apache/arrow/go/v17/arrow"
9+
"github.com/apache/arrow/go/v17/arrow/array"
10+
"github.com/apache/arrow/go/v17/arrow/memory"
11+
"github.com/cloudquery/filetypes/v4/types"
12+
"github.com/cloudquery/plugin-sdk/v4/schema"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestPanicOnHeader(t *testing.T) {
17+
r := require.New(t)
18+
cl := &Client{
19+
spec: &FileSpec{
20+
Compression: CompressionTypeNone,
21+
},
22+
filetype: &customWriter{
23+
PanicOnHeader: true,
24+
},
25+
}
26+
27+
stream, err := cl.StartStream(&schema.Table{}, func(io.Reader) error {
28+
return nil
29+
})
30+
r.Nil(stream)
31+
r.Error(err)
32+
r.ErrorContains(err, "panic:")
33+
}
34+
35+
func TestPanicOnWrite(t *testing.T) {
36+
r := require.New(t)
37+
cl := &Client{
38+
spec: &FileSpec{
39+
Compression: CompressionTypeNone,
40+
},
41+
filetype: &customWriter{
42+
PanicOnWrite: true,
43+
},
44+
}
45+
46+
table := &schema.Table{
47+
Name: "test",
48+
Columns: []schema.Column{
49+
{Name: "name", Type: arrow.BinaryTypes.String},
50+
},
51+
}
52+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
53+
bldr.Field(0).(*array.StringBuilder).Append("foo")
54+
bldr.Field(0).(*array.StringBuilder).Append("bar")
55+
record := bldr.NewRecord()
56+
57+
stream, err := cl.StartStream(table, func(io.Reader) error {
58+
return nil
59+
})
60+
r.NoError(err)
61+
err = stream.Write([]arrow.Record{record})
62+
r.Error(err)
63+
r.ErrorContains(err, "panic:")
64+
65+
r.NoError(stream.Finish())
66+
}
67+
68+
func TestPanicOnClose(t *testing.T) {
69+
r := require.New(t)
70+
cl := &Client{
71+
spec: &FileSpec{
72+
Compression: CompressionTypeNone,
73+
},
74+
filetype: &customWriter{
75+
PanicOnClose: true,
76+
},
77+
}
78+
79+
stream, err := cl.StartStream(&schema.Table{}, func(io.Reader) error {
80+
return nil
81+
})
82+
r.NoError(err)
83+
r.NoError(stream.Write(nil))
84+
85+
err = stream.Finish()
86+
r.Error(err)
87+
r.ErrorContains(err, "panic:")
88+
}
89+
90+
type customWriter struct {
91+
PanicOnHeader bool
92+
PanicOnWrite bool
93+
PanicOnClose bool
94+
}
95+
type customHandle struct {
96+
w *customWriter
97+
}
98+
99+
func (w *customWriter) WriteHeader(io.Writer, *schema.Table) (types.Handle, error) {
100+
if w.PanicOnHeader {
101+
panic("test panic")
102+
}
103+
return &customHandle{w: w}, nil
104+
}
105+
106+
func (*customWriter) Read(types.ReaderAtSeeker, *schema.Table, chan<- arrow.Record) error {
107+
return errors.New("not implemented")
108+
}
109+
110+
func (h *customHandle) WriteContent([]arrow.Record) error {
111+
if h.w.PanicOnWrite {
112+
panic("test panic")
113+
}
114+
return nil
115+
}
116+
func (h *customHandle) WriteFooter() error {
117+
if h.w.PanicOnClose {
118+
panic("test panic")
119+
}
120+
return nil
121+
}

write.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,18 @@ func (cl *Client) WriteTableBatchFile(w io.Writer, table *schema.Table, records
1414
return types.WriteAll(cl, w, table, records)
1515
}
1616

17-
func (cl *Client) WriteHeader(w io.Writer, t *schema.Table) (types.Handle, error) {
17+
func (cl *Client) WriteHeader(w io.Writer, t *schema.Table) (h types.Handle, retErr error) {
18+
defer func() {
19+
if msg := recover(); msg != nil {
20+
switch v := msg.(type) {
21+
case error:
22+
retErr = fmt.Errorf("panic: %w [recovered]", v)
23+
default:
24+
retErr = fmt.Errorf("panic: %v [recovered]", msg)
25+
}
26+
}
27+
}()
28+
1829
switch cl.spec.Compression {
1930
case CompressionTypeNone:
2031
return cl.filetype.WriteHeader(w, t)

0 commit comments

Comments
 (0)