Skip to content

[R] Can't use RecordBatchStreamWriter with Socket connection #38828

@pinduzera

Description

@pinduzera

Not sure if there is anything that can be done about it, seems to be an R limitation regarding socket connection and seek().

Let's create an R session to listen to a socket connection (this is just a simulation, can be any other language reading a socket).

library(arrow)

server <- function() {
  while (TRUE) {
    writeLines("Listening...")
    con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                            server = TRUE, open = "r+b")
    socketTimeout(con, 3600)
    
    data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
    print(head(as.data.frame(data)))

    }
}

server()

Now, in another session lets send a stream:

library(arrow)
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                 port = "6011",
                 blocking = TRUE,
                 server = FALSE,
                 open = "w+b")

outputStream <- arrow:::make_output_stream(socketDriver) ## couldn't find a better alternative as well

writer <- arrow::RecordBatchStreamWriter$create(outputStream, rb$schema)

writer$write_batch(rb) # ideally would loop, but can't even write a single batch
Error in seek.connection(12L) : 'seek' not enabled for this connection

If I write all at once it works, but then it defeats the purpose. The idea is to write in batches to avoid reading big tables all at once:

library(arrow)
socketDriver <- socketConnection(host = "localhost", 
                            port = "6011",
                            blocking = TRUE,
                            server = FALSE,
                            open = "w+b")

rawTbl <- arrow::write_to_raw(arrow::as_arrow_table(x = iris),
                    format = "stream")
writeBin(rawTbl, socketDriver )

Component(s)

R

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions