// Licensed to the Apache Software Foundation (ASF) under one // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // or more contributor license agreements. See the NOTICE file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "./arrow_types.h" #include #include #include #include #include #include #include #include #include "./extension.h" #include "./r_task_group.h" namespace arrow { using internal::checked_cast; using internal::IntegersCanFit; namespace r { class Converter { public: explicit Converter(const std::shared_ptr& chunked_array) : chunked_array_(std::move(chunked_array)) {} virtual ~Converter() {} // Allocate a vector of the right R type for this converter virtual SEXP Allocate(R_xlen_t n) const = 0; // data[ start:(start + n) ] = NA virtual Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const = 0; // ingest the values from the array into data[ start : (start + n)] // // chunk_index indicates which of the chunk is being ingested into data. This is // ignored by most implementations and currently only used with Dictionary // arrays. virtual Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const = 0; // can this run in parallel ? virtual bool Parallel() const { return true; } // converter is passed as self to outlive the scope of Converter::Convert() SEXP ScheduleConvertTasks(RTasks& tasks, std::shared_ptr self) { // try altrep first SEXP alt = altrep::MakeAltrepVector(chunked_array_); if (!Rf_isNull(alt)) { return alt; } // otherwise use the Converter api: // allocating the R vector upfront SEXP out = PROTECT(Allocate(chunked_array_->length())); // for each array, fill the relevant slice of `out`, potentially in parallel R_xlen_t k = 0, i = 0; for (const auto& array : chunked_array_->chunks()) { auto n_chunk = array->length(); tasks.Append(Parallel(), [=] { if (array->null_count() == n_chunk) { return self->Ingest_all_nulls(out, k, n_chunk); } else { return self->Ingest_some_nulls(out, array, k, n_chunk, i); } }); k += n_chunk; i++; } UNPROTECT(1); return out; } // Converter factory static std::shared_ptr Make( const std::shared_ptr& chunked_array); static SEXP LazyConvert(const std::shared_ptr& chunked_array, RTasks& tasks) { auto converter = Make(chunked_array); return converter->ScheduleConvertTasks(tasks, converter); } static SEXP Convert(const std::shared_ptr& chunked_array, bool use_threads) { RTasks tasks(use_threads); SEXP out = PROTECT(Converter::LazyConvert(chunked_array, tasks)); StopIfNotOk(tasks.Finish()); UNPROTECT(1); return out; } static SEXP Convert(const std::shared_ptr& array) { return Convert(std::make_shared(array), false); } SEXP MaybeAltrep() { return altrep::MakeAltrepVector(chunked_array_); } protected: std::shared_ptr chunked_array_; }; template Status IngestSome(const std::shared_ptr& array, R_xlen_t n, SetNonNull&& set_non_null, SetNull&& set_null) { if (array->null_count()) { internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), n); for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next()) { if (bitmap_reader.IsSet()) { RETURN_NOT_OK(set_non_null(i)); } else { RETURN_NOT_OK(set_null(i)); } } } else { for (R_xlen_t i = 0; i < n; i++) { RETURN_NOT_OK(set_non_null(i)); } } return Status::OK(); } template Status IngestSome(const std::shared_ptr& array, R_xlen_t n, SetNonNull&& set_non_null) { auto nothing = [](R_xlen_t i) { return Status::OK(); }; return IngestSome(array, n, std::forward(set_non_null), nothing); } std::shared_ptr CreateEmptyArray(const std::shared_ptr& array_type) { std::unique_ptr builder; StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), array_type, &builder)); std::shared_ptr array; StopIfNotOk(builder->Finish(&array)); return array; } template class Converter_Int : public Converter { using value_type = typename TypeTraits::ArrayType::value_type; public: explicit Converter_Int(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(INTSXP, n); } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(INTEGER(data) + start, n, NA_INTEGER); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_values = array->data()->GetValues(1); if (!p_values) { return Status::Invalid("Invalid data buffer"); } auto p_data = INTEGER(data) + start; auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(p_values[i]); return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_INTEGER; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } }; template class Converter_Double : public Converter { using value_type = typename TypeTraits::ArrayType::value_type; public: explicit Converter_Double(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(REAL(data) + start, n, NA_REAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_values = array->data()->GetValues(1); if (!p_values) { return Status::Invalid("Invalid data buffer"); } auto p_data = REAL(data) + start; auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(p_values[i]); return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_REAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } }; class Converter_Date32 : public Converter { public: explicit Converter_Date32(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { SEXP data = PROTECT(Rf_allocVector(REALSXP, n)); Rf_classgets(data, Rf_mkString("Date")); UNPROTECT(1); return data; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(REAL(data) + start, n, NA_REAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_values = array->data()->GetValues(1); if (!p_values) { return Status::Invalid("Invalid data buffer"); } auto p_data = REAL(data) + start; auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(p_values[i]); return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_REAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } }; template struct Converter_String : public Converter { public: explicit Converter_String(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(STRSXP, n); } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { for (R_xlen_t i = 0; i < n; i++) { SET_STRING_ELT(data, i + start, NA_STRING); } return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_offset = array->data()->GetValues(1); if (!p_offset) { return Status::Invalid("Invalid offset buffer"); } auto p_strings = array->data()->GetValues(2, *p_offset); if (!p_strings) { // There is an offset buffer, but the data buffer is null // There is at least one value in the array and not all the values are null // That means all values are either empty strings or nulls so there is nothing to do if (array->null_count()) { arrow::internal::BitmapReader null_reader(array->null_bitmap_data(), array->offset(), n); for (int i = 0; i < n; i++, null_reader.Next()) { if (null_reader.IsNotSet()) { SET_STRING_ELT(data, start + i, NA_STRING); } } } return Status::OK(); } StringArrayType* string_array = static_cast(array.get()); const bool all_valid = array->null_count() == 0; const bool strip_out_nuls = GetBoolOption("arrow.skip_nul", false); bool nul_was_stripped = false; if (all_valid) { // no need to watch for missing strings cpp11::unwind_protect([&] { if (strip_out_nuls) { for (int i = 0; i < n; i++) { SET_STRING_ELT(data, start + i, r_string_from_view_strip_nul(string_array->GetView(i), &nul_was_stripped)); } return; } for (int i = 0; i < n; i++) { SET_STRING_ELT(data, start + i, r_string_from_view(string_array->GetView(i))); } }); } else { cpp11::unwind_protect([&] { arrow::internal::BitmapReader validity_reader(array->null_bitmap_data(), array->offset(), n); if (strip_out_nuls) { for (int i = 0; i < n; i++, validity_reader.Next()) { if (validity_reader.IsSet()) { SET_STRING_ELT(data, start + i, r_string_from_view_strip_nul(string_array->GetView(i), &nul_was_stripped)); } else { SET_STRING_ELT(data, start + i, NA_STRING); } } return; } for (int i = 0; i < n; i++, validity_reader.Next()) { if (validity_reader.IsSet()) { SET_STRING_ELT(data, start + i, r_string_from_view(string_array->GetView(i))); } else { SET_STRING_ELT(data, start + i, NA_STRING); } } }); } if (nul_was_stripped) { cpp11::safe[Rf_warning]("Stripping '\\0' (nul) from character vector"); } return Status::OK(); } bool Parallel() const { return false; } private: static SEXP r_string_from_view(std::string_view view) { return Rf_mkCharLenCE(view.data(), static_cast(view.size()), CE_UTF8); } static SEXP r_string_from_view_strip_nul(std::string_view view, bool* nul_was_stripped) { const char* old_string = view.data(); std::string stripped_string; size_t stripped_len = 0, nul_count = 0; for (size_t i = 0; i < view.size(); i++) { if (old_string[i] == '\0') { ++nul_count; if (nul_count == 1) { // first nul spotted: allocate stripped string storage stripped_string = std::string(view); stripped_len = i; } // don't copy old_string[i] (which is \0) into stripped_string continue; } if (nul_count > 0) { stripped_string[stripped_len++] = old_string[i]; } } if (nul_count > 0) { *nul_was_stripped = true; stripped_string.resize(stripped_len); return r_string_from_view(stripped_string); } return r_string_from_view(view); } }; class Converter_Boolean : public Converter { public: explicit Converter_Boolean(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(LGLSXP, n); } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(LOGICAL(data) + start, n, NA_LOGICAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_data = LOGICAL(data) + start; auto p_bools = array->data()->GetValues(1, 0); if (!p_bools) { return Status::Invalid("Invalid data buffer"); } arrow::internal::BitmapReader data_reader(p_bools, array->offset(), n); auto ingest_one = [&](R_xlen_t i) { p_data[i] = data_reader.IsSet(); data_reader.Next(); return Status::OK(); }; auto null_one = [&](R_xlen_t i) { data_reader.Next(); p_data[i] = NA_LOGICAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } }; template class Converter_Binary : public Converter { public: using offset_type = typename ArrayType::offset_type; explicit Converter_Binary(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); if (std::is_same::value) { Rf_classgets(res, data::classes_arrow_binary); } else { Rf_classgets(res, data::classes_arrow_large_binary); } UNPROTECT(1); return res; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { const ArrayType* binary_array = checked_cast(array.get()); auto ingest_one = [&](R_xlen_t i) { offset_type ni; auto value = binary_array->GetValue(i, &ni); if (ni > R_XLEN_T_MAX) { return Status::RError("Array too big to be represented as a raw vector"); } SEXP raw = PROTECT(Rf_allocVector(RAWSXP, ni)); std::copy(value, value + ni, RAW(raw)); SET_VECTOR_ELT(data, i + start, raw); UNPROTECT(1); return Status::OK(); }; return IngestSome(array, n, ingest_one); } virtual bool Parallel() const { return false; } }; class Converter_FixedSizeBinary : public Converter { public: explicit Converter_FixedSizeBinary(const std::shared_ptr& chunked_array, int byte_width) : Converter(chunked_array), byte_width_(byte_width) {} SEXP Allocate(R_xlen_t n) const { SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); Rf_classgets(res, data::classes_arrow_fixed_size_binary); Rf_setAttrib(res, symbols::byte_width, Rf_ScalarInteger(byte_width_)); UNPROTECT(1); return res; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { const FixedSizeBinaryArray* binary_array = checked_cast(array.get()); int byte_width = binary_array->byte_width(); auto ingest_one = [&, byte_width](R_xlen_t i) { auto value = binary_array->GetValue(i); SEXP raw = PROTECT(Rf_allocVector(RAWSXP, byte_width)); std::copy(value, value + byte_width, RAW(raw)); SET_VECTOR_ELT(data, i + start, raw); UNPROTECT(1); return Status::OK(); }; return IngestSome(array, n, ingest_one); } virtual bool Parallel() const { return false; } private: int byte_width_; }; bool DictionaryChunkArrayNeedUnification( const std::shared_ptr& chunked_array) { int n = chunked_array->num_chunks(); if (n < 2) { return false; } const auto& arr_first = internal::checked_cast(*chunked_array->chunk(0)); for (int i = 1; i < n; i++) { const auto& arr = internal::checked_cast(*chunked_array->chunk(i)); if (!(arr_first.dictionary()->Equals(arr.dictionary()))) { return true; } } return false; } class Converter_Dictionary : public Converter { private: bool need_unification_; std::unique_ptr unifier_; std::vector> arrays_transpose_; std::shared_ptr out_type_; std::shared_ptr dictionary_; public: explicit Converter_Dictionary(const std::shared_ptr& chunked_array) : Converter(chunked_array), need_unification_(DictionaryChunkArrayNeedUnification(chunked_array)) { if (need_unification_) { const auto& arr_type = checked_cast(*chunked_array->type()); unifier_ = ValueOrStop(DictionaryUnifier::Make(arr_type.value_type())); int n_arrays = chunked_array->num_chunks(); arrays_transpose_.resize(n_arrays); for (int i = 0; i < n_arrays; i++) { const auto& dict_i = *checked_cast(*chunked_array->chunk(i)).dictionary(); StopIfNotOk(unifier_->Unify(dict_i, &arrays_transpose_[i])); } StopIfNotOk(unifier_->GetResult(&out_type_, &dictionary_)); } else { const auto& dict_type = checked_cast(*chunked_array->type()); const auto& indices_type = *dict_type.index_type(); switch (indices_type.id()) { case Type::UINT8: case Type::INT8: case Type::UINT16: case Type::INT16: case Type::INT32: // TODO: also add int64, uint32, uint64 downcasts, if possible break; default: cpp11::stop("Cannot convert Dictionary Array of type `%s` to R", dict_type.ToString().c_str()); } if (chunked_array->num_chunks() > 0) { // DictionaryChunkArrayNeedUnification() returned false so we can safely assume // the dictionary of the first chunk applies everywhere const auto& dict_array = checked_cast(*chunked_array->chunk(0)); dictionary_ = dict_array.dictionary(); } else { dictionary_ = CreateEmptyArray(dict_type.value_type()); } } } SEXP Allocate(R_xlen_t n) const { cpp11::writable::integers data(n); data.attr("levels") = GetLevels(); if (GetOrdered()) { Rf_classgets(data, arrow::r::data::classes_ordered); } else { Rf_classgets(data, arrow::r::data::classes_factor); } return data; } virtual bool Parallel() const { return false; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(INTEGER(data) + start, n, NA_INTEGER); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { const DictionaryArray& dict_array = checked_cast(*array.get()); auto indices = dict_array.indices(); switch (indices->type_id()) { case Type::UINT8: return Ingest_some_nulls_Impl(data, array, start, n, chunk_index); case Type::INT8: return Ingest_some_nulls_Impl(data, array, start, n, chunk_index); case Type::UINT16: return Ingest_some_nulls_Impl(data, array, start, n, chunk_index); case Type::INT16: return Ingest_some_nulls_Impl(data, array, start, n, chunk_index); case Type::INT32: return Ingest_some_nulls_Impl(data, array, start, n, chunk_index); default: break; } return Status::OK(); } private: template Status Ingest_some_nulls_Impl(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { using index_type = typename arrow::TypeTraits::ArrayType::value_type; auto indices = checked_cast(*array).indices(); auto raw_indices = indices->data()->GetValues(1); auto p_data = INTEGER(data) + start; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_INTEGER; return Status::OK(); }; // convert the 0-based indices from the arrow Array // to 1-based indices used in R factors if (need_unification_) { // transpose the indices before converting auto transposed = reinterpret_cast(arrays_transpose_[chunk_index]->data()); auto ingest_one = [&](R_xlen_t i) { p_data[i] = transposed[raw_indices[i]] + 1; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } else { auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(raw_indices[i]) + 1; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } } bool GetOrdered() const { return checked_cast(*chunked_array_->type()).ordered(); } SEXP GetLevels() const { // R factor levels must be type "character" so coerce `dict` to STRSXP // TODO (npr): this coercion should be optional, "dictionariesAsFactors" ;) // Alternative: preserve the logical type of the dictionary values // (e.g. if dict is timestamp, return a POSIXt R vector, not factor) if (dictionary_->type_id() != Type::STRING) { cpp11::safe[Rf_warning]("Coercing dictionary values to R character factor levels"); } SEXP vec = PROTECT(Converter::Convert(dictionary_)); SEXP strings_vec = PROTECT(Rf_coerceVector(vec, STRSXP)); UNPROTECT(2); return strings_vec; } }; class Converter_Struct : public Converter { public: explicit Converter_Struct(const std::shared_ptr& chunked_array) : Converter(chunked_array), converters() { const auto& struct_type = checked_cast(*chunked_array->type()); int nf = struct_type.num_fields(); std::shared_ptr array_as_table = ValueOrStop(arrow::Table::FromChunkedStructArray(chunked_array)); for (int i = 0; i < nf; i++) { converters.push_back(Converter::Make(array_as_table->column(i))); } } SEXP Allocate(R_xlen_t n) const { // allocate a data frame column to host each array // If possible, a column is dealt with directly with altrep auto type = checked_cast(this->chunked_array_->type().get()); auto out = arrow::r::to_r_list(converters, [n](const std::shared_ptr& converter) { SEXP out = converter->MaybeAltrep(); if (Rf_isNull(out)) { out = converter->Allocate(n); } return out; }); auto colnames = arrow::r::to_r_strings( type->fields(), [](const std::shared_ptr& field) { return field->name(); }); out.attr(symbols::row_names) = arrow::r::short_row_names(static_cast(n)); out.attr(R_NamesSymbol) = colnames; out.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; return out; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { int nf = static_cast(converters.size()); for (int i = 0; i < nf; i++) { SEXP data_i = VECTOR_ELT(data, i); // only ingest if the column is not altrep if (!altrep::is_unmaterialized_arrow_altrep(data_i)) { StopIfNotOk(converters[i]->Ingest_all_nulls(data_i, start, n)); } } return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto struct_array = checked_cast(array.get()); int nf = static_cast(converters.size()); // Flatten() deals with merging of nulls auto arrays = ValueOrStop(struct_array->Flatten(gc_memory_pool())); for (int i = 0; i < nf; i++) { SEXP data_i = VECTOR_ELT(data, i); // only ingest if the column is not altrep if (!altrep::is_unmaterialized_arrow_altrep(data_i)) { StopIfNotOk(converters[i]->Ingest_some_nulls(VECTOR_ELT(data, i), arrays[i], start, n, chunk_index)); } } return Status::OK(); } virtual bool Parallel() const { // this can only run in parallel if all the // inner converters can for (const auto& converter : converters) { if (!converter->Parallel()) return false; } return true; } private: std::vector> converters; }; double ms_to_seconds(int64_t ms) { return static_cast(ms) / 1000; } class Converter_Date64 : public Converter { public: explicit Converter_Date64(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); Rf_classgets(data, arrow::r::data::classes_POSIXct); return data; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(REAL(data) + start, n, NA_REAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_data = REAL(data) + start; auto p_values = array->data()->GetValues(1); auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(p_values[i] / 1000); return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_REAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } }; template class Converter_Time : public Converter { public: explicit Converter_Time(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); data.attr("class") = cpp11::writable::strings({"hms", "difftime"}); // hms difftime is always stored as "seconds" data.attr("units") = cpp11::writable::strings({"secs"}); return data; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(REAL(data) + start, n, NA_REAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { int multiplier = TimeUnit_multiplier(array); auto p_data = REAL(data) + start; auto p_values = array->data()->GetValues(1); auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(p_values[i]) / multiplier; return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_REAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } private: int TimeUnit_multiplier(const std::shared_ptr& array) const { // hms difftime is always "seconds", so multiply based on the Array's TimeUnit switch (static_cast(array->type().get())->unit()) { case TimeUnit::SECOND: return 1; case TimeUnit::MILLI: return 1000; case TimeUnit::MICRO: return 1000000; case TimeUnit::NANO: return 1000000000; default: return 0; } } }; template class Converter_Duration : public Converter { public: explicit Converter_Duration(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); data.attr("class") = "difftime"; // difftime is always stored as "seconds" data.attr("units") = cpp11::writable::strings({"secs"}); return data; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(REAL(data) + start, n, NA_REAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { int multiplier = TimeUnit_multiplier(array); auto p_data = REAL(data) + start; auto p_values = array->data()->GetValues(1); auto ingest_one = [&](R_xlen_t i) { p_data[i] = static_cast(p_values[i]) / multiplier; return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_REAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } private: int TimeUnit_multiplier(const std::shared_ptr& array) const { // difftime is always "seconds", so multiply based on the Array's TimeUnit switch (static_cast(array->type().get())->unit()) { case TimeUnit::SECOND: return 1; case TimeUnit::MILLI: return 1000; case TimeUnit::MICRO: return 1000000; case TimeUnit::NANO: return 1000000000; default: return 0; } } }; template class Converter_Timestamp : public Converter_Time { public: explicit Converter_Timestamp(const std::shared_ptr& chunked_array) : Converter_Time(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); Rf_classgets(data, arrow::r::data::classes_POSIXct); auto array_type = checked_cast(this->chunked_array_->type().get()); std::string tzone = array_type->timezone(); if (tzone.size() > 0) { data.attr("tzone") = tzone; } return data; } }; template class Converter_Decimal : public Converter { public: explicit Converter_Decimal(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { std::fill_n(REAL(data) + start, n, NA_REAL); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { using DecimalArray = typename TypeTraits::ArrayType; auto p_data = REAL(data) + start; const auto& decimals_arr = checked_cast(*array); auto ingest_one = [&](R_xlen_t i) { p_data[i] = std::stod(decimals_arr.FormatValue(i).c_str()); return Status::OK(); }; auto null_one = [&](R_xlen_t i) { p_data[i] = NA_REAL; return Status::OK(); }; return IngestSome(array, n, ingest_one, null_one); } }; template class Converter_List : public Converter { private: std::shared_ptr value_type_; public: explicit Converter_List(const std::shared_ptr& chunked_array, const std::shared_ptr& value_type) : Converter(chunked_array), value_type_(value_type) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::list res(n); if (std::is_same::value) { res.attr(R_ClassSymbol) = arrow::r::data::classes_arrow_list; } else if (std::is_same::value) { res.attr(R_ClassSymbol) = arrow::r::data::classes_arrow_list; } else { res.attr(R_ClassSymbol) = arrow::r::data::classes_arrow_large_list; } std::shared_ptr array = CreateEmptyArray(value_type_); // convert to an R object to store as the list' ptype res.attr(arrow::r::symbols::ptype) = Converter::Convert(array); return res; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { // nothing to do, list contain NULL by default return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto list_array = checked_cast(array.get()); auto values_array = list_array->values(); auto ingest_one = [&](R_xlen_t i) { auto slice = list_array->value_slice(i); SET_VECTOR_ELT(data, i + start, Converter::Convert(slice)); return Status::OK(); }; return IngestSome(array, n, ingest_one); } bool Parallel() const { return false; } }; class Converter_FixedSizeList : public Converter { private: std::shared_ptr value_type_; int list_size_; public: explicit Converter_FixedSizeList(const std::shared_ptr& chunked_array, const std::shared_ptr& value_type, int list_size) : Converter(chunked_array), value_type_(value_type), list_size_(list_size) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::list res(n); Rf_classgets(res, arrow::r::data::classes_arrow_fixed_size_list); res.attr(arrow::r::symbols::list_size) = Rf_ScalarInteger(list_size_); std::shared_ptr array = CreateEmptyArray(value_type_); // convert to an R object to store as the list' ptype res.attr(arrow::r::symbols::ptype) = Converter::Convert(array); return res; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { // nothing to do, list contain NULL by default return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { const auto& fixed_size_list_array = checked_cast(*array); auto values_array = fixed_size_list_array.values(); auto ingest_one = [&](R_xlen_t i) { auto slice = fixed_size_list_array.value_slice(i); SET_VECTOR_ELT(data, i + start, Converter::Convert(slice)); return Status::OK(); }; return IngestSome(array, n, ingest_one); } bool Parallel() const { return false; } }; class Converter_Int64 : public Converter { public: explicit Converter_Int64(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); data.attr("class") = "integer64"; return data; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { auto p_data = reinterpret_cast(REAL(data)) + start; std::fill_n(p_data, n, NA_INT64); return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { auto p_values = array->data()->GetValues(1); if (!p_values) { return Status::Invalid("Invalid data buffer"); } auto p_data = reinterpret_cast(REAL(data)) + start; if (array->null_count()) { internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), n); for (R_xlen_t i = 0; i < n; i++, bitmap_reader.Next(), ++p_data) { *p_data = bitmap_reader.IsSet() ? p_values[i] : NA_INT64; } } else { std::copy_n(p_values, n, p_data); } return Status::OK(); } }; class Converter_Null : public Converter { public: explicit Converter_Null(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { SEXP data = PROTECT(Rf_allocVector(LGLSXP, n)); std::fill_n(LOGICAL(data), n, NA_LOGICAL); Rf_classgets(data, Rf_mkString("vctrs_unspecified")); UNPROTECT(1); return data; } Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { return Status::OK(); } }; // Unlike other types, conversion of ExtensionType (chunked) arrays occurs at // R level via the ExtensionType (or subclass) R6 instance. We do this via Allocate, // since it is called once per ChunkedArray. class Converter_Extension : public Converter { public: explicit Converter_Extension(const std::shared_ptr& chunked_array) : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { auto extension_type = dynamic_cast(chunked_array_->type().get()); if (extension_type == nullptr) { Rf_error("Converter_Extension can't be used with a non-R extension type"); } return extension_type->Convert(chunked_array_); } // At this point we have already done the conversion Status Ingest_all_nulls(SEXP data, R_xlen_t start, R_xlen_t n) const { return Status::OK(); } Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n, size_t chunk_index) const { return Status::OK(); } }; bool ArraysCanFitInteger(ArrayVector arrays) { bool all_can_fit = true; auto i32 = arrow::int32(); for (const auto& array : arrays) { if (all_can_fit) { all_can_fit = arrow::IntegersCanFit(*array->data(), *i32).ok(); } } return all_can_fit; } bool GetBoolOption(const std::string& name, bool default_) { SEXP getOption = Rf_install("getOption"); cpp11::sexp call = Rf_lang2(getOption, Rf_mkString(name.c_str())); cpp11::sexp res = Rf_eval(call, R_BaseEnv); if (TYPEOF(res) == LGLSXP) { return LOGICAL(res)[0] == TRUE; } else { return default_; } } std::shared_ptr Converter::Make( const std::shared_ptr& chunked_array) { const auto& type = chunked_array->type(); switch (type->id()) { // direct support case Type::INT32: return std::make_shared>(chunked_array); case Type::DOUBLE: return std::make_shared>( chunked_array); // need to handle 1-bit case case Type::BOOL: return std::make_shared(chunked_array); case Type::BINARY: return std::make_shared>( chunked_array); case Type::LARGE_BINARY: return std::make_shared>( chunked_array); case Type::FIXED_SIZE_BINARY: return std::make_shared( chunked_array, checked_cast(*type).byte_width()); // handle memory dense strings case Type::STRING: return std::make_shared>( chunked_array); case Type::LARGE_STRING: return std::make_shared>( chunked_array); case Type::DICTIONARY: return std::make_shared(chunked_array); case Type::DATE32: return std::make_shared(chunked_array); case Type::DATE64: return std::make_shared(chunked_array); // promotions to integer vector case Type::INT8: return std::make_shared>(chunked_array); case Type::UINT8: return std::make_shared>(chunked_array); case Type::INT16: return std::make_shared>(chunked_array); case Type::UINT16: return std::make_shared>(chunked_array); // promotions to numeric vector, if they don't fit into int32 case Type::UINT32: if (ArraysCanFitInteger(chunked_array->chunks())) { return std::make_shared>( chunked_array); } else { return std::make_shared>( chunked_array); } case Type::UINT64: if (ArraysCanFitInteger(chunked_array->chunks())) { return std::make_shared>( chunked_array); } else { return std::make_shared>( chunked_array); } case Type::HALF_FLOAT: return std::make_shared>( chunked_array); case Type::FLOAT: return std::make_shared>( chunked_array); // time32 and time64 case Type::TIME32: return std::make_shared>(chunked_array); case Type::TIME64: return std::make_shared>(chunked_array); case Type::DURATION: return std::make_shared>(chunked_array); case Type::TIMESTAMP: return std::make_shared>(chunked_array); case Type::INT64: // Prefer integer if it fits, unless option arrow.int64_downcast is `false` if (GetBoolOption("arrow.int64_downcast", true) && ArraysCanFitInteger(chunked_array->chunks())) { return std::make_shared>(chunked_array); } else { return std::make_shared(chunked_array); } case Type::DECIMAL32: return std::make_shared>(chunked_array); case Type::DECIMAL64: return std::make_shared>(chunked_array); case Type::DECIMAL128: return std::make_shared>(chunked_array); case Type::DECIMAL256: return std::make_shared>(chunked_array); // nested case Type::STRUCT: return std::make_shared(chunked_array); case Type::LIST: return std::make_shared>( chunked_array, checked_cast(type.get())->value_type()); case Type::LARGE_LIST: return std::make_shared>( chunked_array, checked_cast(type.get())->value_type()); case Type::FIXED_SIZE_LIST: return std::make_shared( chunked_array, checked_cast(*type).value_type(), checked_cast(*type).list_size()); case Type::MAP: return std::make_shared>( chunked_array, checked_cast(*type).value_type()); case Type::NA: return std::make_shared(chunked_array); case Type::EXTENSION: return std::make_shared(chunked_array); default: break; } cpp11::stop("cannot handle Array of type <%s>", type->name().c_str()); } std::shared_ptr to_chunks(const std::shared_ptr& array) { return std::make_shared(array); } std::shared_ptr to_chunks( const std::shared_ptr& chunked_array) { return chunked_array; } template cpp11::writable::list to_data_frame(const std::shared_ptr& data, bool use_threads) { int64_t nc = data->num_columns(); int64_t nr = data->num_rows(); cpp11::writable::strings names(nc); arrow::r::RTasks tasks(use_threads); cpp11::writable::list tbl(nc); for (int i = 0; i < nc; i++) { names[i] = data->schema()->field(i)->name(); tbl[i] = Converter::LazyConvert(to_chunks(data->column(i)), tasks); } StopIfNotOk(tasks.Finish()); tbl.attr(R_NamesSymbol) = names; tbl.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; tbl.attr(R_RowNamesSymbol) = arrow::r::short_row_names(static_cast(nr)); return tbl; } } // namespace r } // namespace arrow // [[arrow::export]] SEXP Array__as_vector(const std::shared_ptr& array) { return arrow::r::Converter::Convert(array); } // [[arrow::export]] SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array, bool use_threads = false) { return arrow::r::Converter::Convert(chunked_array, use_threads); } // [[arrow::export]] cpp11::writable::list RecordBatch__to_dataframe( const std::shared_ptr& batch, bool use_threads) { return arrow::r::to_data_frame(batch, use_threads); } // [[arrow::export]] cpp11::writable::list Table__to_dataframe(const std::shared_ptr& table, bool use_threads) { return arrow::r::to_data_frame(table, use_threads); }