Skip to content

GH-38828: [R] Ensure that streams can be written to socket connections#38897

Merged
paleolimbot merged 4 commits intoapache:mainfrom
paleolimbot:r-socket-connection
Apr 3, 2024
Merged

GH-38828: [R] Ensure that streams can be written to socket connections#38897
paleolimbot merged 4 commits intoapache:mainfrom
paleolimbot:r-socket-connection

Conversation

@paleolimbot
Copy link
Copy Markdown
Member

@paleolimbot paleolimbot commented Nov 27, 2023

Rationale for this change

Currently we can't write to socket connection from R. This is a very useful way to send Arrow data around and should work!

What changes are included in this PR?

Implements Tell() for non-seekable output streams. Apparently some Arrow code calls this to figure out how many bytes have been written.

Are these changes tested?

I'm not quite sure how to test this...all output streams we can easily test are seekable. We could try to spin up a socket server on another thread (like the reprex below) but I'm worried that will be flaky.

Are there any user-facing changes?

Yes (something that should have previously worked now works), although there is no place where we currently document anything about how connections can be used.

tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE

Created on 2023-11-27 with reprex v2.0.2

@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #38828 has been automatically assigned in GitHub to PR creator.

@paleolimbot paleolimbot force-pushed the r-socket-connection branch from adc23cd to c640eef Compare March 13, 2024 12:24
@paleolimbot paleolimbot marked this pull request as ready for review March 13, 2024 14:00
@paleolimbot paleolimbot requested a review from thisisnic as a code owner March 13, 2024 14:00
@amoeba
Copy link
Copy Markdown
Member

amoeba commented Mar 14, 2024

Hi @paleolimbot this looks good to me and I don't think an automated test is critical.

I'd like to see the documentation updated to reflect the support this adds (happy to do that in a follow-up PR). Is it safe to say write_ipc_stream's sink arg can now take any connection and read_ipc_stream's file arg can also now take any connection?

@paleolimbot
Copy link
Copy Markdown
Member Author

In general, we can use a connection in any place where we read or write anything! Most R functions work like this and I'm not sure we documented it anywhere. I'll find a few places where I can add it in while I've got the tab open.

@amoeba amoeba self-requested a review April 2, 2024 17:37
Copy link
Copy Markdown
Member

@amoeba amoeba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @paleolimbot, thanks for the most recent changes. This looks good to me outside the two failing CI jobs. The error seems like it could be related, though I can't reproduce locally. Any ideas?

@paleolimbot paleolimbot force-pushed the r-socket-connection branch from cc5f30f to 0df755b Compare April 2, 2024 18:51
@paleolimbot
Copy link
Copy Markdown
Member Author

@amoeba I think those errors were the duckdb failures (unless I'm looking at the wrong ones). I rebased, but now we have an entirely new set of errors (docker compose vs docker-compose or something).

@amoeba
Copy link
Copy Markdown
Member

amoeba commented Apr 2, 2024

Sounds like #40949 which should be resolved soon.

@paleolimbot paleolimbot force-pushed the r-socket-connection branch from 0df755b to 04e7053 Compare April 3, 2024 12:30
@paleolimbot paleolimbot merged commit e0d73c5 into apache:main Apr 3, 2024
@paleolimbot paleolimbot removed the awaiting committer review Awaiting committer review label Apr 3, 2024
@conbench-apache-arrow
Copy link
Copy Markdown

After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit e0d73c5.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 2 possible false positives for unstable benchmarks that are known to sometimes produce them.

@paleolimbot paleolimbot deleted the r-socket-connection branch April 8, 2024 12:14
vibhatha pushed a commit to vibhatha/arrow that referenced this pull request May 25, 2024
…ections (apache#38897)

### Rationale for this change

Currently we can't write to socket connection from R. This is a very useful way to send Arrow data around and should work!

### What changes are included in this PR?

Implements `Tell()` for non-seekable output streams. Apparently some Arrow code calls this to figure out how many bytes have been written.

### Are these changes tested?

I'm not quite sure how to test this...all output streams we can easily test are seekable. We could try to spin up a socket server on another thread (like the reprex below) but I'm worried that will be flaky.

### Are there any user-facing changes?

Yes (something that should have previously worked now works), although there is no place where we currently document anything about how connections can be used.

``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```

<sup>Created on 2023-11-27 with [reprex v2.0.2](https://reprex.tidyverse.org)</sup>
* Closes: apache#38828
* GitHub Issue: apache#38828

Authored-by: Dewey Dunnington <dewey@voltrondata.com>
Signed-off-by: Dewey Dunnington <dewey@voltrondata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[R] Can't use RecordBatchStreamWriter with Socket connection

2 participants