diff --git a/python/lsst/pipe/tasks/parquetTable.py b/python/lsst/pipe/tasks/parquetTable.py deleted file mode 100644 index 81d9391f9..000000000 --- a/python/lsst/pipe/tasks/parquetTable.py +++ /dev/null @@ -1,333 +0,0 @@ -# This file is part of pipe_tasks. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (https://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -__all__ = ["ParquetTable", "MultilevelParquetTable"] - -""" -Implementation of thin wrappers to pyarrow.ParquetFile. -""" - -import re -import json -from itertools import product -import pyarrow -import pyarrow.parquet -import numpy as np -import pandas as pd - -from deprecated.sphinx import deprecated - - -@deprecated(reason="The ParquetTable interface is from Gen2 i/o and will be removed after v26.", - version="v25", category=FutureWarning) -class ParquetTable(object): - """Thin wrapper to pyarrow's ParquetFile object - - Call `toDataFrame` method to get a `pandas.DataFrame` object, - optionally passing specific columns. - - The main purpose of having this wrapper rather than directly - using `pyarrow.ParquetFile` is to make it nicer to load - selected subsets of columns, especially from dataframes with multi-level - column indices. - - Instantiated with either a path to a parquet file or a dataFrame - - Parameters - ---------- - filename : str, optional - Path to Parquet file. - dataFrame : dataFrame, optional - """ - - def __init__(self, filename=None, dataFrame=None): - self.filename = filename - if filename is not None: - self._pf = pyarrow.parquet.ParquetFile(filename) - self._df = None - self._pandasMd = None - elif dataFrame is not None: - self._df = dataFrame - self._pf = None - else: - raise ValueError("Either filename or dataFrame must be passed.") - - self._columns = None - self._columnIndex = None - - def write(self, filename): - """Write pandas dataframe to parquet - - Parameters - ---------- - filename : str - Path to which to write. - """ - if self._df is None: - raise ValueError("df property must be defined to write.") - table = pyarrow.Table.from_pandas(self._df) - pyarrow.parquet.write_table(table, filename) - - @property - def pandasMd(self): - if self._pf is None: - raise AttributeError("This property is only accessible if ._pf is set.") - if self._pandasMd is None: - self._pandasMd = json.loads(self._pf.metadata.metadata[b"pandas"]) - return self._pandasMd - - @property - def columnIndex(self): - """Columns as a pandas Index - """ - if self._columnIndex is None: - self._columnIndex = self._getColumnIndex() - return self._columnIndex - - def _getColumnIndex(self): - if self._df is not None: - return self._df.columns - else: - return pd.Index(self.columns) - - @property - def columns(self): - """List of column names (or column index if df is set) - - This may either be a list of column names, or a - pandas.Index object describing the column index, depending - on whether the ParquetTable object is wrapping a ParquetFile - or a DataFrame. - """ - if self._columns is None: - self._columns = self._getColumns() - return self._columns - - def _getColumns(self): - if self._df is not None: - return self._sanitizeColumns(self._df.columns) - else: - return self._pf.metadata.schema.names - - def _sanitizeColumns(self, columns): - return [c for c in columns if c in self.columnIndex] - - def toDataFrame(self, columns=None): - """Get table (or specified columns) as a pandas DataFrame - - Parameters - ---------- - columns : list, optional - Desired columns. If `None`, then all columns will be - returned. - """ - if self._pf is None: - if columns is None: - return self._df - else: - return self._df[columns] - - if columns is None: - return self._pf.read().to_pandas() - - df = self._pf.read(columns=columns, use_pandas_metadata=True).to_pandas() - return df - - -@deprecated(reason="The MultilevelParquetTable interface is from Gen2 i/o and will be removed after v26.", - version="v25", category=FutureWarning) -class MultilevelParquetTable(ParquetTable): - """Wrapper to access dataframe with multi-level column index from Parquet - - This subclass of `ParquetTable` to handle the multi-level is necessary - because there is not a convenient way to request specific table subsets - by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`. - - Additionally, pyarrow stores multilevel index information in a very strange - way. Pandas stores it as a tuple, so that one can access a single column - from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for - some reason pyarrow saves these indices as "stringified" tuples, such that - in order to read thissame column from a table written to Parquet, you would - have to do the following: - - pf = pyarrow.ParquetFile(filename) - df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"]) - - See also https://github.com/apache/arrow/issues/1771, where we've raised - this issue. - - As multilevel-indexed dataframes can be very useful to store data like - multiple filters' worth of data in the same table, this case deserves a - wrapper to enable easier access; - that's what this object is for. For example, - - parq = MultilevelParquetTable(filename) - columnDict = {'dataset':'meas', - 'filter':'HSC-G', - 'column':['coord_ra', 'coord_dec']} - df = parq.toDataFrame(columns=columnDict) - - will return just the coordinate columns; the equivalent of calling - `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe, - but without having to load the whole frame into memory---this reads just - those columns from disk. You can also request a sub-table; e.g., - - parq = MultilevelParquetTable(filename) - columnDict = {'dataset':'meas', - 'filter':'HSC-G'} - df = parq.toDataFrame(columns=columnDict) - - and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe. - - Parameters - ---------- - filename : str, optional - Path to Parquet file. - dataFrame : dataFrame, optional - """ - - def __init__(self, *args, **kwargs): - super(MultilevelParquetTable, self).__init__(*args, **kwargs) - - self._columnLevelNames = None - - @property - def columnLevelNames(self): - if self._columnLevelNames is None: - self._columnLevelNames = { - level: list(np.unique(np.array(self.columns)[:, i])) - for i, level in enumerate(self.columnLevels) - } - return self._columnLevelNames - - @property - def columnLevels(self): - """Names of levels in column index - """ - return self.columnIndex.names - - def _getColumnIndex(self): - if self._df is not None: - return super()._getColumnIndex() - else: - levelNames = [f["name"] for f in self.pandasMd["column_indexes"]] - return pd.MultiIndex.from_tuples(self.columns, names=levelNames) - - def _getColumns(self): - if self._df is not None: - return super()._getColumns() - else: - columns = self._pf.metadata.schema.names - n = len(self.pandasMd["column_indexes"]) - pattern = re.compile(", ".join(["'(.*)'"] * n)) - matches = [re.search(pattern, c) for c in columns] - return [m.groups() for m in matches if m is not None] - - def toDataFrame(self, columns=None, droplevels=True): - """Get table (or specified columns) as a pandas DataFrame - - To get specific columns in specified sub-levels: - - parq = MultilevelParquetTable(filename) - columnDict = {'dataset':'meas', - 'filter':'HSC-G', - 'column':['coord_ra', 'coord_dec']} - df = parq.toDataFrame(columns=columnDict) - - Or, to get an entire subtable, leave out one level name: - - parq = MultilevelParquetTable(filename) - columnDict = {'dataset':'meas', - 'filter':'HSC-G'} - df = parq.toDataFrame(columns=columnDict) - - Parameters - ---------- - columns : list or dict, optional - Desired columns. If `None`, then all columns will be - returned. If a list, then the names of the columns must - be *exactly* as stored by pyarrow; that is, stringified tuples. - If a dictionary, then the entries of the dictionary must - correspond to the level names of the column multi-index - (that is, the `columnLevels` attribute). Not every level - must be passed; if any level is left out, then all entries - in that level will be implicitly included. - droplevels : bool - If True drop levels of column index that have just one entry - - """ - if columns is None: - if self._pf is None: - return self._df - else: - return self._pf.read().to_pandas() - - if isinstance(columns, dict): - columns = self._colsFromDict(columns) - - if self._pf is None: - try: - df = self._df[columns] - except (AttributeError, KeyError): - newColumns = [c for c in columns if c in self.columnIndex] - if not newColumns: - raise ValueError("None of the requested columns ({}) are available!".format(columns)) - df = self._df[newColumns] - else: - pfColumns = self._stringify(columns) - try: - df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas() - except (AttributeError, KeyError): - newColumns = [c for c in columns if c in self.columnIndex] - if not newColumns: - raise ValueError("None of the requested columns ({}) are available!".format(columns)) - pfColumns = self._stringify(newColumns) - df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas() - - if droplevels: - # Drop levels of column index that have just one entry - levelsToDrop = [n for lev, n in zip(df.columns.levels, df.columns.names) if len(lev) == 1] - - # Prevent error when trying to drop *all* columns - if len(levelsToDrop) == len(df.columns.names): - levelsToDrop.remove(df.columns.names[-1]) - - df.columns = df.columns.droplevel(levelsToDrop) - - return df - - def _colsFromDict(self, colDict): - new_colDict = {} - for i, lev in enumerate(self.columnLevels): - if lev in colDict: - if isinstance(colDict[lev], str): - new_colDict[lev] = [colDict[lev]] - else: - new_colDict[lev] = colDict[lev] - else: - new_colDict[lev] = self.columnIndex.levels[i] - - levelCols = [new_colDict[lev] for lev in self.columnLevels] - cols = product(*levelCols) - return list(cols) - - def _stringify(self, cols): - return [str(c) for c in cols] diff --git a/tests/test_parquet.py b/tests/test_parquet.py deleted file mode 100644 index f773e448d..000000000 --- a/tests/test_parquet.py +++ /dev/null @@ -1,229 +0,0 @@ -# This file is part of pipe_tasks. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (http://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import warnings -import unittest -import copy -import functools -import pandas as pd -from pandas.testing import assert_frame_equal - -import lsst.utils.tests - -import pyarrow as pa -import pyarrow.parquet as pq - -from lsst.pipe.tasks.parquetTable import ParquetTable, MultilevelParquetTable - - -def setup_module(module): - lsst.utils.tests.init() - - -class ParquetTableTestCase(unittest.TestCase): - """Test case for ParquetTable - """ - - def simulateDF(self): - """Create a simple test DataFrame - """ - df = pd.DataFrame({ - "coord_ra": [3.77654137, 3.77643059, 3.77621148, 3.77611944, 3.77610396], - "coord_dec": [0.01127624, 0.01127787, 0.01127543, 0.01127543, 0.01127543], - "slot_Centroid_flag": [True, True, True, True, True], - "slot_Centroid_x": [16208., 16344., 16613., 16726., 16745.], - "slot_Centroid_y": [15905., 15907., 15904., 15904., 15904.], - "slot_PsfFlux_apCorr": [0.98636465, 0.98437287, 0.97212515, 0.97179828, 0.97182371], - "slot_PsfFlux_apCorrSigma": [0., 0., 0., 0., 0.], - "slot_PsfFlux_flag": [True, True, True, True, True], - "slot_PsfFlux_instFlux": [0.28106412, 1.98260751, 0.08900771, 1.11375753, 1.3835924], - "slot_PsfFlux_instFluxSigma": [0.22967081, 0.25409701, 0.2120654, 0.23031162, 0.24262261], - "calib_psfUsed": [False, False, False, False, False], - "detect_isPatchInner": [False, False, False, False, False], - "detect_isPrimary": [False, False, False, False, False], - "detect_isTractInner": [True, True, True, True, True]}) - return df - - def setUp(self): - self.df = self.simulateDF() - with lsst.utils.tests.getTempFilePath('*.parq') as filename: - table = pa.Table.from_pandas(self.df) - pq.write_table(table, filename) - self.parq, self.dfParq = self.getParq(filename, self.df) - - def tearDown(self): - del self.df - del self.parq - - def getParq(self, filename, df): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - fromFile, fromDF = ParquetTable(filename), ParquetTable(dataFrame=df) - - return fromFile, fromDF - - def testRoundTrip(self): - self.assertTrue(self.parq.toDataFrame().equals(self.df)) - - def testColumns(self): - columns = ['coord_ra', 'coord_dec'] - self.assertTrue(self.parq.toDataFrame(columns=columns).equals(self.df[columns])) - - # TO DO: DM-21976 Confirm this is the behavior we want - # Quietly ignore nonsense columns - self.assertTrue(self.parq.toDataFrame(columns=columns + ['hello']).equals(self.df[columns])) - - -class MultilevelParquetTableTestCase(ParquetTableTestCase): - """Test case for MultilevelParquetTable - """ - - def simulateDF(self): - self.datasets = ['meas', 'ref'] - self.filters = ['G', 'R'] - self.columns = ['coord_ra', 'coord_dec'] - simpleDF = super(MultilevelParquetTableTestCase, self).simulateDF() - dfFilterDSCombos = [] - for ds in self.datasets: - for filterName in self.filters: - df = copy.copy(simpleDF) - df.reindex(sorted(df.columns), axis=1) - df['dataset'] = 'meas' - df['filter'] = filterName - df.columns = pd.MultiIndex.from_tuples([(ds, filterName, c) for c in df.columns], - names=('dataset', 'filter', 'column')) - dfFilterDSCombos.append(df) - - return functools.reduce(lambda d1, d2: d1.join(d2), dfFilterDSCombos) - - def getParq(self, filename, df): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - fromFile = MultilevelParquetTable(filename) - fromDf = MultilevelParquetTable(dataFrame=df) - return fromFile, fromDf - - def testProperties(self): - self.assertTrue(all([x == y for x, y in zip(self.parq.columnLevels, self.df.columns.names)])) - self.assertEqual(len(self.parq.columns), len(self.df.columns)) - - self.assertTrue(all([x == y for x, y in zip(self.dfParq.columnLevels, self.df.columns.names)])) - self.assertEqual(len(self.dfParq.columns), len(self.df.columns)) - - def testColumns(self): - df = self.df - parq = self.parq - - # Case A, each level has multiple values - datasets_A = self.datasets - filters_A = self.filters - columns_A = self.columns - columnDict_A = {'dataset': datasets_A, - 'filter': filters_A, - 'column': columns_A - } - colTuples_A = [(self.datasets[0], self.filters[0], self.columns[0]), - (self.datasets[0], self.filters[0], self.columns[1]), - (self.datasets[0], self.filters[1], self.columns[0]), - (self.datasets[0], self.filters[1], self.columns[1]), - (self.datasets[1], self.filters[0], self.columns[0]), - (self.datasets[1], self.filters[0], self.columns[1]), - (self.datasets[1], self.filters[1], self.columns[0]), - (self.datasets[1], self.filters[1], self.columns[1])] - df_A = df[colTuples_A] - assert_frame_equal(parq.toDataFrame(columns=columnDict_A), df_A) - - # Case A1, add a bogus column and test that it gets ignored - datasets_A1 = self.datasets - filters_A1 = self.filters - columns_A1 = self.columns + ['garbage'] - columnDict_A1 = {'dataset': datasets_A1, - 'filter': filters_A1, - 'column': columns_A1} - colTuples_A1 = [(self.datasets[0], self.filters[0], self.columns[0]), - (self.datasets[0], self.filters[0], self.columns[1]), - (self.datasets[0], self.filters[1], self.columns[0]), - (self.datasets[0], self.filters[1], self.columns[1]), - (self.datasets[1], self.filters[0], self.columns[0]), - (self.datasets[1], self.filters[0], self.columns[1]), - (self.datasets[1], self.filters[1], self.columns[0]), - (self.datasets[1], self.filters[1], self.columns[1])] - df_A1 = df[colTuples_A1] - assert_frame_equal(parq.toDataFrame(columns=columnDict_A1), df_A1) - - # Case B: One level has only a single value - datasets_B = self.datasets[0] - filters_B = self.filters - columns_B = self.columns - columnDict_B = {'dataset': datasets_B, - 'filter': filters_B, - 'column': columns_B} - colTuples_B = [(self.datasets[0], self.filters[0], self.columns[0]), - (self.datasets[0], self.filters[0], self.columns[1]), - (self.datasets[0], self.filters[1], self.columns[0]), - (self.datasets[0], self.filters[1], self.columns[1])] - df_B = df[colTuples_B] - df_B.columns = df_B.columns.droplevel('dataset') - assert_frame_equal(parq.toDataFrame(columns=columnDict_B), df_B) - assert_frame_equal(df_B, parq.toDataFrame(columns=colTuples_B)) - - # When explicit columns are not provided, comparison requires - # first getting the column index in sorted order. Apparently this - # happens by default in parq.toDataFrame(); to be honest, I'm not - # exactly sure how/why. - - # Case C: Two levels have a single value; third is not provided - datasets_C = self.datasets[0] - filters_C = self.filters[0] - columnDict_C = {'dataset': datasets_C, - 'filter': filters_C} - df_C = df[datasets_C][filters_C].sort_index(axis=1) - - self.assertTrue(parq.toDataFrame(columns=columnDict_C).equals(df_C)) - - # Case D: Only one level (first level) is provided - dataset_D = self.datasets[0] - columnDict_D = {'dataset': dataset_D} - df_D = df[dataset_D].sort_index(axis=1) - self.assertTrue(parq.toDataFrame(columns=columnDict_D).equals(df_D)) - - # Case E: Only one level (second level) is provided - filters_E = self.filters[1] - columnDict_E = {'filter': filters_E} - # get second level of multi-index column using .xs() - df_E = df.xs(filters_E, level=1, axis=1).sort_index(axis=1) - self.assertTrue(parq.toDataFrame(columns=columnDict_E).equals(df_E)) - - # Case when all requested columns don't exist - columnDictNonsense = {'dataset': 'meas', 'filter': 'G', 'column': ('hello')} - self.assertRaises(ValueError, parq.toDataFrame, columns=columnDictNonsense) - - # Case when some requested columns don't exist. - # TO DO: DM-21976 Confirm this is the behavior we want - # Quietly ignore nonsense columns - columnDictSomeNonsense = {'dataset': 'meas', 'filter': 'G', 'column': ('coord_ra', 'hello')} - dfGood = pd.DataFrame(df['meas']['G']['coord_ra']) - self.assertTrue(parq.toDataFrame(columns=columnDictSomeNonsense).equals(dfGood)) - - -if __name__ == "__main__": - lsst.utils.tests.init() - unittest.main()