Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version) {
case flatbuf::MetadataVersion_V4:
// Arrow >= 0.8
return MetadataVersion::V4;
// Add cases as other versions become available
// Add cases as other versions become available
default:
return MetadataVersion::V4;
}
Expand Down
52 changes: 15 additions & 37 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import ast
import collections
import json
import re

import numpy as np
import pandas as pd
Expand All @@ -29,13 +28,6 @@
from pyarrow.compat import PY2, zip_longest # noqa


INDEX_LEVEL_NAME_REGEX = re.compile(r'^__index_level_\d+__$')


def is_unnamed_index_level(name):
return INDEX_LEVEL_NAME_REGEX.match(name) is not None


def infer_dtype(column):
try:
return pd.api.types.infer_dtype(column)
Expand Down Expand Up @@ -143,7 +135,7 @@ def get_column_metadata(column, name, arrow_type):

Parameters
----------
column : pandas.Series
column : pandas.Series or pandas.Index
name : str
arrow_type : pyarrow.DataType

Expand All @@ -161,7 +153,7 @@ def get_column_metadata(column, name, arrow_type):
}
string_dtype = 'object'

if not isinstance(name, six.string_types):
if name is not None and not isinstance(name, six.string_types):
raise TypeError(
'Column name must be a string. Got column {} of type {}'.format(
name, type(name).__name__
Expand All @@ -176,23 +168,7 @@ def get_column_metadata(column, name, arrow_type):
}


def index_level_name(index, i):
"""Return the name of an index level or a default name if `index.name` is
None.

Parameters
----------
index : pandas.Index
i : int

Returns
-------
name : str
"""
if index.name is not None:
return index.name
else:
return '__index_level_{:d}__'.format(i)
index_level_name = '__index_level_{:d}__'.format


def construct_metadata(df, column_names, index_levels, preserve_index, types):
Expand Down Expand Up @@ -222,11 +198,11 @@ def construct_metadata(df, column_names, index_levels, preserve_index, types):
]

if preserve_index:
index_column_names = [index_level_name(level, i)
for i, level in enumerate(index_levels)]
index_column_names = list(map(
index_level_name, range(len(index_levels))
))
index_column_metadata = [
get_column_metadata(level, name=index_level_name(level, i),
arrow_type=arrow_type)
get_column_metadata(level, name=level.name, arrow_type=arrow_type)
for i, (level, arrow_type) in enumerate(
zip(index_levels, index_types)
)
Expand Down Expand Up @@ -317,7 +293,7 @@ def dataframe_to_arrays(df, schema, preserve_index, nthreads=1):
for i, column in enumerate(index_columns):
columns_to_convert.append(column)
convert_types.append(None)
names.append(index_level_name(column, i))
names.append(index_level_name(i))

# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
Expand Down Expand Up @@ -378,6 +354,7 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
import pyarrow.lib as lib

index_columns = []
columns = []
column_indexes = []
index_arrays = []
index_names = []
Expand All @@ -390,18 +367,19 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
if has_pandas_metadata:
pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
index_columns = pandas_metadata['index_columns']
columns = pandas_metadata['columns']
column_indexes = pandas_metadata.get('column_indexes', [])
table = _add_any_metadata(table, pandas_metadata)

block_table = table

# Build up a list of index columns and names while removing those columns
# from the original table
for name in index_columns:
i = schema.get_field_index(name)
logical_index_names = [c['name'] for c in columns[-len(index_columns):]]
for raw_name, logical_name in zip(index_columns, logical_index_names):
i = schema.get_field_index(raw_name)
if i != -1:
col = table.column(i)
index_name = None if is_unnamed_index_level(name) else name
col_pandas = col.to_pandas()
values = col_pandas.values
if not values.flags.writeable:
Expand All @@ -410,9 +388,9 @@ def table_to_blockmanager(options, table, memory_pool, nthreads=1):
values = values.copy()

index_arrays.append(pd.Series(values, dtype=col_pandas.dtype))
index_names.append(index_name)
index_names.append(logical_name)
block_table = block_table.remove_column(
block_table.schema.get_field_index(name)
block_table.schema.get_field_index(raw_name)
)

# Convert an arrow table to Block from the internal pandas API
Expand Down
43 changes: 33 additions & 10 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,8 @@ def test_dataset_read_pandas(tmpdir):


@parquet
def test_dataset_read_pandas_common_metadata(tmpdir):
@pytest.mark.parametrize('preserve_index', [True, False])
def test_dataset_read_pandas_common_metadata(tmpdir, preserve_index):
# ARROW-1103
import pyarrow.parquet as pq

Expand All @@ -1186,15 +1187,11 @@ def test_dataset_read_pandas_common_metadata(tmpdir):
paths = []
for i in range(nfiles):
df = _test_dataframe(size, seed=i)
df.index = pd.Index(np.arange(i * size, (i + 1) * size))
df.index.name = 'index'
df.index = pd.Index(np.arange(i * size, (i + 1) * size), name='index')

path = pjoin(dirpath, '{0}.parquet'.format(i))
path = pjoin(dirpath, '{:d}.parquet'.format(i))

df_ex_index = df.reset_index(drop=True)
df_ex_index['index'] = df.index
table = pa.Table.from_pandas(df_ex_index,
preserve_index=False)
table = pa.Table.from_pandas(df, preserve_index=preserve_index)

# Obliterate metadata
table = table.replace_schema_metadata(None)
Expand All @@ -1206,15 +1203,17 @@ def test_dataset_read_pandas_common_metadata(tmpdir):
paths.append(path)

# Write _metadata common file
table_for_metadata = pa.Table.from_pandas(df)
table_for_metadata = pa.Table.from_pandas(
df, preserve_index=preserve_index
)
pq.write_metadata(table_for_metadata.schema,
pjoin(dirpath, '_metadata'))

dataset = pq.ParquetDataset(dirpath)
columns = ['uint8', 'strings']
result = dataset.read_pandas(columns=columns).to_pandas()
expected = pd.concat([x[columns] for x in frames])

expected.index.name = df.index.name if preserve_index else None
tm.assert_frame_equal(result, expected)


Expand Down Expand Up @@ -1387,3 +1386,27 @@ def test_large_table_int32_overflow():
table = pa.Table.from_arrays([parr], names=['one'])
f = io.BytesIO()
_write_table(table, f)


def test_index_column_name_duplicate(tmpdir):
data = {
'close': {
pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
pd.Timestamp('2017-06-30 01:32:00'): 154.99958999999998,
},
'time': {
pd.Timestamp('2017-06-30 01:31:00'): pd.Timestamp(
'2017-06-30 01:31:00'
),
pd.Timestamp('2017-06-30 01:32:00'): pd.Timestamp(
'2017-06-30 01:32:00'
),
}
}
path = str(tmpdir / 'data.parquet')
dfx = pd.DataFrame(data).set_index('time', drop=False)
tdfx = pa.Table.from_pandas(dfx)
_write_table(tdfx, path)
arrow_table = _read_table(path)
result_df = arrow_table.to_pandas()
tm.assert_frame_equal(result_df, dfx)