Transform and handle your data, line by line.
go get go.bobheadxi.dev/streamlinestreamline offers a variety of primitives to make working with data line by line a breeze:
streamline.Streamoffers the ability to add hooks that handle anio.Readerline-by-line with(*Stream).Stream,(*Stream).StreamBytes, and other utilities.pipeline.Pipelineoffers a way to build pipelines that transform the data in astreamline.Stream, such as cleaning, filtering, mapping, or sampling data.jq.Pipelinecan be used to map every line to the output of a JQ query, for example.streamline.Streamimplements standardiointerfaces likeio.Reader, sopipeline.Pipelinecan be used for general-purpose data manipulation as well.
pipe.NewStreamoffers a way to create a buffered pipe between a writer and aStream.streamexec.Startuses this to attach aStreamto anexec.Cmdto work with command output.
When working with data streams in Go, you typically get an io.Reader, which is great for arbitrary data - but in many cases, especially when scripting, it's common to either end up with data and outputs that are structured line by line, or want to handle data line by line, for example to send to a structured logging library. You can set up a bufio.Reader or bufio.Scanner to do this, but for cases like exec.Cmd you will also need boilerplate to configure the command and set up pipes, and for additional functionality like transforming, filtering, or sampling output you will need to write your own additional handlers. streamline aims to provide succint ways to do all of the above and more.
bufio.Scanner |
streamline/streamexec |
|---|---|
func PrefixOutput(cmd *exec.Cmd) error {
reader, writer := io.Pipe()
cmd.Stdout = writer
cmd.Stderr = writer
if err := cmd.Start(); err != nil {
return err
}
errC := make(chan error)
go func() {
err := cmd.Wait()
writer.Close()
errC <- err
}()
s := bufio.NewScanner(reader)
for s.Scan() {
println("PREFIX: ", s.Text())
}
if err := s.Err(); err != nil {
return err
}
return <-errC
} |
func PrefixOutput(cmd *exec.Cmd) error {
stream, err := streamexec.Start(cmd)
if err != nil {
return err
}
return stream.Stream(func(line string) {
println("PREFIX: ", line)
})
} |
bufio.Scanner |
streamline |
|---|---|
func GetMessages(r io.Reader) error {
s := bufio.NewScanner(r)
for s.Scan() {
var result bytes.Buffer
cmd := exec.Command("jq", ".msg")
cmd.Stdin = bytes.NewReader(s.Bytes())
cmd.Stdout = &result
if err := cmd.Run(); err != nil {
return err
}
print(result.String())
}
return s.Err()
} |
func GetMessages(r io.Reader) error {
return streamline.New(r).
WithPipeline(jq.Pipeline(".msg")).
Stream(func(line string) {
println(line)
})
} |
bufio.Scanner |
streamline |
|---|---|
func PrintEvery10th(r io.Reader) error {
s := bufio.NewScanner(r)
var count int
for s.Scan() {
count++
if count%10 != 0 {
continue
}
println(s.Text())
}
return s.Err()
} |
func PrintEvery10th(r io.Reader) error {
return streamline.New(r).
WithPipeline(pipeline.Sample(10)).
Stream(func(line string) {
println(line)
})
} |
This particular example is a somewhat realistic one - GCP Cloud SQL cannot accept pgdump output that contains certain EXTENSION-related statements, so to pgdump a PostgreSQL database and upload the dump in a bucket for import into Cloud SQL, one must pre-process their dumps to remove offending statements.
bufio.Scanner |
streamline |
|---|---|
var unwanted = []byte("COMMENT ON EXTENSION")
func Upload(pgdump *os.File, dst io.Writer) error {
s := bufio.NewScanner(pgdump)
for s.Scan() {
line := s.Bytes()
var err error
if bytes.Contains(line, unwanted) {
_, err = dst.Write(
// comment out this line
append([]byte("-- "), line...))
} else {
_, err = dst.Write(line)
}
if err != nil {
return err
}
}
return s.Err()
} |
var unwanted = []byte("COMMENT ON EXTENSION")
func Upload(pgdump *os.File, dst io.Writer) error {
_, err := streamline.New(pgdump).
WithPipeline(pipeline.Map(func(line []byte) []byte {
if bytes.Contains(line, unwanted) {
// comment out this line
return append([]byte("-- "), line...)
}
return line
})).
WriteTo(dst)
return err
} |
Some of the ideas in this package started in sourcegraph/run, which started as a project trying to build utilities that made it easier to write bash-esque scripts using Go - namely being able to do things you would often to in scripts such as grepping and iterating over lines. streamline generalizes on the ideas used in sourcegraph/run for working with command output to work on arbitrary inputs, and sourcegraph/run now uses streamline internally.