Replace ExternalStream with new ByteStream type#12774
Replace ExternalStream with new ByteStream type#12774stormasm merged 58 commits intonushell:mainfrom
ExternalStream with new ByteStream type#12774Conversation
…data` to `pipeline`
ExternalStream with new Bytestream typeExternalStream with new ByteStream type
|
That is an impressive chunk of work and a nice improvement! Before we run ahead just looking at the streaming side we need to talk about the error changes, to ensure we don't break our execution semantics in unintended ways.
What is up here? Is there a breakage we need to resolve before merging? |
|
I have the same concern about turning the exit codes into |
|
I'll have a look again, then. Thanks for all of your hard work! |
WindSoilder
left a comment
There was a problem hiding this comment.
Thanks for the improvement! Just leaves some thought about stderr consumer thread.
| (Some(out), Some(err)) => { | ||
| // To avoid deadlocks, we must spawn a separate thread to wait on stderr. | ||
| thread::scope(|s| { | ||
| let err_thread = thread::Builder::new() |
There was a problem hiding this comment.
I think it's good to name the thread, here is a reason: #7879
| let stderr = self | ||
| .stderr | ||
| .take() | ||
| .map(|stderr| thread::Builder::new().spawn(move || consume_pipe(stderr))) |
There was a problem hiding this comment.
I think we can name the thread here
|
Is #12369 still part of this after removing the main error breaking changes? |
|
Yes, this PR still fixes #12369. If there is no exit code (i.e., the process was terminated via signal), then This is sort of a stop-gap measure until we figure out how we want to report this information. |
| // #[test] | ||
| // fn short_stream_binary() { | ||
| // let actual = nu!(r#" | ||
| // nu --testbin repeater (0x[01]) 5 | bytes starts-with 0x[010101] |
|
@devyn so we agreed in the core team meeting today that once you give the green light and have reviewed the PR we will go ahead and land it... Thank you kindly for doing this ! 😄 |
devyn
left a comment
There was a problem hiding this comment.
Looks good. I found one potential issue with bytes starts-with. A lot of the files changed are just into_value() related, with that being a Result now, so this isn't quite as big as it looks.
Didn't have any issues running it yet, and exit code is definitely working now. I'm glad you changed the each behaviour to Chunks and got rid of implicit line buffering.
Great stuff!
| for item in stream { | ||
| let byte_slice = match &item { | ||
| // String and binary data are valid byte patterns | ||
| Ok(Value::String { val, .. }) => val.as_bytes(), |
There was a problem hiding this comment.
Removing this will be a breaking change, as this worked before:
^echo foo | bytes starts-with ("foo" | into binary)In general, I think untyped byte streams should be able to be treated as binary
There was a problem hiding this comment.
That was me being lazy making the PR smaller 😄. After this, I intend to open another PR to restore that command and also make bytes ends-with support byte streams properly.
| // https://github.com/nushell/nushell/pull/9377 contains the reason | ||
| // for not using BufWriter<File> | ||
| // | ||
| // TODO: flag to specify buffered (BufWriter), line buffered (LineWriter), or no buffering (File). |
There was a problem hiding this comment.
I'm not sure this will ever be necessary, because the input stream is generally buffered in some way, even if by the command that produced it. If someone knows they want to use bigger buffers, it might be better to provide a command to adapt the stream with read_exact() or read_until() calls to ensure the desired chunking behaviour. And of course it should already be possible to do | lines | save if you definitely want line buffering.
I like this behaviour though, it means that the output of an external command gets written to file with the flushing behaviour it was intended to have, which is particularly good if saving to something like a FIFO where the interactivity actually matters.
There was a problem hiding this comment.
Oh yeah, that's a good observation! There might be an argument to add the flags for performance reasons (e.g., to avoid the allocations), but it's probably not worth it / it won't make a noticeable difference.
|
Thanks for taking the time to review this @devyn! |
|
@IanManske I am planning on landing this PR in the morning --- its 10pm in Oregon now... Unless someone beats me to it... I checked out your branch and ran all of the tests and it looks good to me... I will await you giving me the green light to let me know if its now ready to go. Based on Devyn's feedback it looks good to me... Thanks for all of your hard work on this PR... |
|
Sounds good, feel free to land it whenever @stormasm. Thanks! |
|
@IanManske cool ! thanks for giving the green light 👍 |
|
Wow, it’s great to see thoughtful improvements to internals like this. Big thanks to Ian and everyone who reviewed this. |
# Description Forgot that I fixed this already on my branch, but when printing without a display output hook, the implicit call to `table` gets its output mangled with newlines (since #12774). This happens when running `nu -c` or a script file. Here's that fix in one PR so it can be merged easily. # Tests + Formatting - 🟢 `toolkit fmt` - 🟢 `toolkit clippy` - 🟢 `toolkit test` - 🟢 `toolkit test stdlib`
# Description
This PR introduces a `ByteStream` type which is a `Read`-able stream of
bytes. Internally, it has an enum over three different byte stream
sources:
```rust
pub enum ByteStreamSource {
Read(Box<dyn Read + Send + 'static>),
File(File),
Child(ChildProcess),
}
```
This is in comparison to the current `RawStream` type, which is an
`Iterator<Item = Vec<u8>>` and has to allocate for each read chunk.
Currently, `PipelineData::ExternalStream` serves a weird dual role where
it is either external command output or a wrapper around `RawStream`.
`ByteStream` makes this distinction more clear (via `ByteStreamSource`)
and replaces `PipelineData::ExternalStream` in this PR:
```rust
pub enum PipelineData {
Empty,
Value(Value, Option<PipelineMetadata>),
ListStream(ListStream, Option<PipelineMetadata>),
ByteStream(ByteStream, Option<PipelineMetadata>),
}
```
The PR is relatively large, but a decent amount of it is just repetitive
changes.
This PR fixes nushell#7017, fixes nushell#10763, and fixes nushell#12369.
This PR also improves performance when piping external commands. Nushell
should, in most cases, have competitive pipeline throughput compared to,
e.g., bash.
| Command | Before (MB/s) | After (MB/s) | Bash (MB/s) |
| -------------------------------------------------- | -------------:|
------------:| -----------:|
| `throughput \| rg 'x'` | 3059 | 3744 | 3739 |
| `throughput \| nu --testbin relay o> /dev/null` | 3508 | 8087 | 8136 |
# User-Facing Changes
- This is a breaking change for the plugin communication protocol,
because the `ExternalStreamInfo` was replaced with `ByteStreamInfo`.
Plugins now only have to deal with a single input stream, as opposed to
the previous three streams: stdout, stderr, and exit code.
- The output of `describe` has been changed for external/byte streams.
- Temporary breaking change: `bytes starts-with` no longer works with
byte streams. This is to keep the PR smaller, and `bytes ends-with`
already does not work on byte streams.
- If a process core dumped, then instead of having a `Value::Error` in
the `exit_code` column of the output returned from `complete`, it now is
a `Value::Int` with the negation of the signal number.
# After Submitting
- Update docs and book as necessary
- Release notes (e.g., plugin protocol changes)
- Adapt/convert commands to work with byte streams (high priority is
`str length`, `bytes starts-with`, and maybe `bytes ends-with`).
- Refactor the `tee` code, Devyn has already done some work on this.
---------
Co-authored-by: Devyn Cairns <devyn.cairns@gmail.com>
) # Description Forgot that I fixed this already on my branch, but when printing without a display output hook, the implicit call to `table` gets its output mangled with newlines (since nushell#12774). This happens when running `nu -c` or a script file. Here's that fix in one PR so it can be merged easily. # Tests + Formatting - 🟢 `toolkit fmt` - 🟢 `toolkit clippy` - 🟢 `toolkit test` - 🟢 `toolkit test stdlib`
|
I'm trying to write tests for I'm currently doing this, but it feels awkward: if let PipelineData::ByteStream(byte_stream, ..) = pipeline_data {
if let ByteStreamSource::Child(child) = byte_stream.into_source() {
if let Some(stderr) = child.stderr.as_mut() {
stderr.read(&mut buf);
}
}
} |
impl ByteStream {
pub fn write_to(self, dest: &mut impl Write) -> Result<Option<ExitStatus>, ShellError> {
// ...
}
}Generic reader/writer functions should take |
This is intentionally verbose. In fact, I should have maybe made it an unsafe API. Reading from
Oh, good to know. I guess |
Rust's safety guarantees doesn't cover deadlocks, so it's probably fine.
Unfortunate indeed. The Rust team tried to change it, but couldn't due to API breakage. |
FYI this tends to lead to bigger binary sizes because people are using both variants. If you pass by mut ref they will make owned types into references. |
Description
This PR introduces a
ByteStreamtype which is aRead-able stream of bytes. Internally, it has an enum over three different byte stream sources:This is in comparison to the current
RawStreamtype, which is anIterator<Item = Vec<u8>>and has to allocate for each read chunk.Currently,
PipelineData::ExternalStreamserves a weird dual role where it is either external command output or a wrapper aroundRawStream.ByteStreammakes this distinction more clear (viaByteStreamSource) and replacesPipelineData::ExternalStreamin this PR:The PR is relatively large, but a decent amount of it is just repetitive changes.
This PR fixes #7017, fixes #10763, and fixes #12369.
This PR also improves performance when piping external commands. Nushell should, in most cases, have competitive pipeline throughput compared to, e.g., bash.
throughput | rg 'x'throughput | nu --testbin relay o> /dev/nullUser-Facing Changes
ExternalStreamInfowas replaced withByteStreamInfo. Plugins now only have to deal with a single input stream, as opposed to the previous three streams: stdout, stderr, and exit code.describehas been changed for external/byte streams.bytes starts-withno longer works with byte streams. This is to keep the PR smaller, andbytes ends-withalready does not work on byte streams.Value::Errorin theexit_codecolumn of the output returned fromcomplete, it now is aValue::Intwith the negation of the signal number.After Submitting
str length,bytes starts-with, and maybebytes ends-with).teecode, Devyn has already done some work on this.