Skip to content

Commit e9d312a

Browse files
committed
ByteStream consumer can write to interface{}
* fix(ByteStreamConsumer): may now write into an interface which underlying type is []byte or string. * feat(ByteStreamConsumer): added support to io.ReaderFrom, preferred over io.Writer if available * feat(ByteStreamProducer): added support to io.WriterTo, preferred over io.Reader if available * refact(ByteStreamProducer): removed redundant case "string" and preferred the more general reflected case (supports aliased strings) * test: refactored ByteStream tests * test: added benchmark for bytestream.Consume * fixes #167 Signed-off-by: Frederic BIDON <fredbi@yahoo.com>
1 parent 248b38c commit e9d312a

2 files changed

Lines changed: 477 additions & 186 deletions

File tree

bytestream.go

Lines changed: 91 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,16 @@ type byteStreamOpts struct {
3838
Close bool
3939
}
4040

41-
// ByteStreamConsumer creates a consumer for byte streams,
42-
// takes a Writer/BinaryUnmarshaler interface or binary slice by reference,
43-
// and reads from the provided reader
41+
// ByteStreamConsumer creates a consumer for byte streams.
42+
//
43+
// The consumer consumes from a provided reader into the data passed by reference.
44+
//
45+
// Supported output underlying types and interfaces, prioritized in this order:
46+
// - io.ReaderFrom (for maximum control)
47+
// - io.Writer (performs io.Copy)
48+
// - encoding.BinaryUnmarshaler
49+
// - *string
50+
// - *[]byte
4451
func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
4552
var vals byteStreamOpts
4653
for _, opt := range opts {
@@ -51,45 +58,70 @@ func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
5158
if reader == nil {
5259
return errors.New("ByteStreamConsumer requires a reader") // early exit
5360
}
61+
if data == nil {
62+
return errors.New("nil destination for ByteStreamConsumer")
63+
}
5464

5565
closer := defaultCloser
5666
if vals.Close {
57-
if cl, ok := reader.(io.Closer); ok {
67+
if cl, isReaderCloser := reader.(io.Closer); isReaderCloser {
5868
closer = cl.Close
5969
}
6070
}
6171
defer func() {
6272
_ = closer()
6373
}()
6474

65-
if wrtr, ok := data.(io.Writer); ok {
66-
_, err := io.Copy(wrtr, reader)
75+
if readerFrom, isReaderFrom := data.(io.ReaderFrom); isReaderFrom {
76+
_, err := readerFrom.ReadFrom(reader)
6777
return err
6878
}
6979

70-
buf := new(bytes.Buffer)
80+
if writer, isDataWriter := data.(io.Writer); isDataWriter {
81+
_, err := io.Copy(writer, reader)
82+
return err
83+
}
84+
85+
// buffers input before writing to data
86+
var buf bytes.Buffer
7187
_, err := buf.ReadFrom(reader)
7288
if err != nil {
7389
return err
7490
}
7591
b := buf.Bytes()
7692

77-
if bu, ok := data.(encoding.BinaryUnmarshaler); ok {
78-
return bu.UnmarshalBinary(b)
79-
}
93+
switch destinationPointer := data.(type) {
94+
case encoding.BinaryUnmarshaler:
95+
return destinationPointer.UnmarshalBinary(b)
96+
case *any:
97+
switch (*destinationPointer).(type) {
98+
case string:
99+
*destinationPointer = string(b)
100+
101+
return nil
102+
103+
case []byte:
104+
*destinationPointer = b
80105

81-
if data != nil {
82-
if str, ok := data.(*string); ok {
83-
*str = string(b)
84106
return nil
85107
}
86-
}
108+
default:
109+
// check for the underlying type to be pointer to []byte or string,
110+
if ptr := reflect.TypeOf(data); ptr.Kind() != reflect.Ptr {
111+
return errors.New("destination must be a pointer")
112+
}
87113

88-
if t := reflect.TypeOf(data); data != nil && t.Kind() == reflect.Ptr {
89114
v := reflect.Indirect(reflect.ValueOf(data))
90-
if t = v.Type(); t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8 {
115+
t := v.Type()
116+
117+
switch {
118+
case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
91119
v.SetBytes(b)
92120
return nil
121+
122+
case t.Kind() == reflect.String:
123+
v.SetString(string(b))
124+
return nil
93125
}
94126
}
95127

@@ -98,68 +130,87 @@ func ByteStreamConsumer(opts ...byteStreamOpt) Consumer {
98130
})
99131
}
100132

101-
// ByteStreamProducer creates a producer for byte streams,
102-
// takes a Reader/BinaryMarshaler interface or binary slice,
103-
// and writes to a writer (essentially a pipe)
133+
// ByteStreamProducer creates a producer for byte streams.
134+
//
135+
// The producer takes input data then writes to an output writer (essentially as a pipe).
136+
//
137+
// Supported input underlying types and interfaces, prioritized in this order:
138+
// - io.WriterTo (for maximum control)
139+
// - io.Reader (performs io.Copy). A ReadCloser is closed before exiting.
140+
// - encoding.BinaryMarshaler
141+
// - error (writes as a string)
142+
// - []byte
143+
// - string
144+
// - struct, other slices: writes as JSON
104145
func ByteStreamProducer(opts ...byteStreamOpt) Producer {
105146
var vals byteStreamOpts
106147
for _, opt := range opts {
107148
opt(&vals)
108149
}
150+
109151
return ProducerFunc(func(writer io.Writer, data interface{}) error {
110152
if writer == nil {
111153
return errors.New("ByteStreamProducer requires a writer") // early exit
112154
}
155+
if data == nil {
156+
return errors.New("nil destination for ByteStreamProducer")
157+
}
158+
113159
closer := defaultCloser
114160
if vals.Close {
115-
if cl, ok := writer.(io.Closer); ok {
161+
if cl, isWriterCloser := writer.(io.Closer); isWriterCloser {
116162
closer = cl.Close
117163
}
118164
}
119165
defer func() {
120166
_ = closer()
121167
}()
122168

123-
if rc, ok := data.(io.ReadCloser); ok {
169+
if rc, isDataCloser := data.(io.ReadCloser); isDataCloser {
124170
defer rc.Close()
125171
}
126172

127-
if rdr, ok := data.(io.Reader); ok {
128-
_, err := io.Copy(writer, rdr)
173+
switch origin := data.(type) {
174+
case io.WriterTo:
175+
_, err := origin.WriteTo(writer)
176+
return err
177+
178+
case io.Reader:
179+
_, err := io.Copy(writer, origin)
129180
return err
130-
}
131181

132-
if bm, ok := data.(encoding.BinaryMarshaler); ok {
133-
bytes, err := bm.MarshalBinary()
182+
case encoding.BinaryMarshaler:
183+
bytes, err := origin.MarshalBinary()
134184
if err != nil {
135185
return err
136186
}
137187

138188
_, err = writer.Write(bytes)
139189
return err
140-
}
141-
142-
if data != nil {
143-
if str, ok := data.(string); ok {
144-
_, err := writer.Write([]byte(str))
145-
return err
146-
}
147190

148-
if e, ok := data.(error); ok {
149-
_, err := writer.Write([]byte(e.Error()))
150-
return err
151-
}
191+
case error:
192+
_, err := writer.Write([]byte(origin.Error()))
193+
return err
152194

195+
default:
153196
v := reflect.Indirect(reflect.ValueOf(data))
154-
if t := v.Type(); t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8 {
197+
t := v.Type()
198+
199+
switch {
200+
case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
155201
_, err := writer.Write(v.Bytes())
156202
return err
157-
}
158-
if t := v.Type(); t.Kind() == reflect.Struct || t.Kind() == reflect.Slice {
203+
204+
case t.Kind() == reflect.String:
205+
_, err := writer.Write([]byte(v.String()))
206+
return err
207+
208+
case t.Kind() == reflect.Struct || t.Kind() == reflect.Slice:
159209
b, err := swag.WriteJSON(data)
160210
if err != nil {
161211
return err
162212
}
213+
163214
_, err = writer.Write(b)
164215
return err
165216
}

0 commit comments

Comments
 (0)