-
Notifications
You must be signed in to change notification settings - Fork 6
Description
In order to be a streaming workflow engine, we need to support the ability to pipe between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:
- stream of query responses from bionode-ncbi can be piped into pipelines
- ease of incorporating node transform streams into pipelines
- separate tools (do not need a container with
AandBto runA | Bfor example) - cool things with
fork, something like:A | fork(B1, B2) | C(however this will be tricky to implement as we will need to create duplicate streams)
Dump from docs:
If either (input or output) is
not provided, it will be assumed the task is then a streaming task - i.e., it
is a duplex stream with writable and/or readable portions. Consider:
const throughCapitalize = through(function (chunk, env, next) {
// through = require('through2') - a helper to create through streams
// takes chunk, its encoding, and a callback to notify when complete pushing
// push a chunk to the readable portion of this through stream with
this.push(chunk.toString().toUpperCase())
// then call next so that next chunk can be handled
next()
You could connect `capitalize` to a readable (`readFile`) and writable
(`writeFile`) file
stream with:
const capitalize = task({
name: 'Capitalize Through Stream'
},
// Here, input is a readable stream that came from the previous task
// Let's return a through stream that takes the input and capitalizes it
({ input }) => input.pipe(throughCapitalize) )const readFile = task({
input: '*.lowercase',
name: 'Read from *.lowercase'
}, ({ input }) => {
const rs = fs.createReadStream(input)
// Add file information to stream object so we have it later
rs.inFile = input
})
const writeFile = task({
output: '*.uppercase',
name: 'Write to *.uppercase'
}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase')))
// Can now connect the three:
join(readFile, capitalize, writeFile)Of course, this could be written as one single task. This is somewhat simpler,
but the power of splitting up the read, transform, and write portions of a task
will become apparent once we can provide multiple sets of parameters to the
transform and observe the effect, without having to manually rewire input and
output filenames. As a single task the above would become:
const capitalize = task({
input: '*.lowercase',
output: '*.uppercase',
name: 'Capitalize *.lowercase -> *.uppercase'
}, ({ input }) =>
fs.createReadStream(input)
.pipe(throughCapitalize)
.pipe(fs.createWriteStream(input.swapExt('lowercase')))
)