diff --git a/python/lsst/pipe/tasks/parquetTable.py b/python/lsst/pipe/tasks/parquetTable.py
deleted file mode 100644
index 81d9391f90..0000000000
--- a/python/lsst/pipe/tasks/parquetTable.py
+++ /dev/null
@@ -1,333 +0,0 @@
-# This file is part of pipe_tasks.
-#
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (https://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-__all__ = ["ParquetTable", "MultilevelParquetTable"]
-
-"""
-Implementation of thin wrappers to pyarrow.ParquetFile.
-"""
-
-import re
-import json
-from itertools import product
-import pyarrow
-import pyarrow.parquet
-import numpy as np
-import pandas as pd
-
-from deprecated.sphinx import deprecated
-
-
-@deprecated(reason="The ParquetTable interface is from Gen2 i/o and will be removed after v26.",
- version="v25", category=FutureWarning)
-class ParquetTable(object):
- """Thin wrapper to pyarrow's ParquetFile object
-
- Call `toDataFrame` method to get a `pandas.DataFrame` object,
- optionally passing specific columns.
-
- The main purpose of having this wrapper rather than directly
- using `pyarrow.ParquetFile` is to make it nicer to load
- selected subsets of columns, especially from dataframes with multi-level
- column indices.
-
- Instantiated with either a path to a parquet file or a dataFrame
-
- Parameters
- ----------
- filename : str, optional
- Path to Parquet file.
- dataFrame : dataFrame, optional
- """
-
- def __init__(self, filename=None, dataFrame=None):
- self.filename = filename
- if filename is not None:
- self._pf = pyarrow.parquet.ParquetFile(filename)
- self._df = None
- self._pandasMd = None
- elif dataFrame is not None:
- self._df = dataFrame
- self._pf = None
- else:
- raise ValueError("Either filename or dataFrame must be passed.")
-
- self._columns = None
- self._columnIndex = None
-
- def write(self, filename):
- """Write pandas dataframe to parquet
-
- Parameters
- ----------
- filename : str
- Path to which to write.
- """
- if self._df is None:
- raise ValueError("df property must be defined to write.")
- table = pyarrow.Table.from_pandas(self._df)
- pyarrow.parquet.write_table(table, filename)
-
- @property
- def pandasMd(self):
- if self._pf is None:
- raise AttributeError("This property is only accessible if ._pf is set.")
- if self._pandasMd is None:
- self._pandasMd = json.loads(self._pf.metadata.metadata[b"pandas"])
- return self._pandasMd
-
- @property
- def columnIndex(self):
- """Columns as a pandas Index
- """
- if self._columnIndex is None:
- self._columnIndex = self._getColumnIndex()
- return self._columnIndex
-
- def _getColumnIndex(self):
- if self._df is not None:
- return self._df.columns
- else:
- return pd.Index(self.columns)
-
- @property
- def columns(self):
- """List of column names (or column index if df is set)
-
- This may either be a list of column names, or a
- pandas.Index object describing the column index, depending
- on whether the ParquetTable object is wrapping a ParquetFile
- or a DataFrame.
- """
- if self._columns is None:
- self._columns = self._getColumns()
- return self._columns
-
- def _getColumns(self):
- if self._df is not None:
- return self._sanitizeColumns(self._df.columns)
- else:
- return self._pf.metadata.schema.names
-
- def _sanitizeColumns(self, columns):
- return [c for c in columns if c in self.columnIndex]
-
- def toDataFrame(self, columns=None):
- """Get table (or specified columns) as a pandas DataFrame
-
- Parameters
- ----------
- columns : list, optional
- Desired columns. If `None`, then all columns will be
- returned.
- """
- if self._pf is None:
- if columns is None:
- return self._df
- else:
- return self._df[columns]
-
- if columns is None:
- return self._pf.read().to_pandas()
-
- df = self._pf.read(columns=columns, use_pandas_metadata=True).to_pandas()
- return df
-
-
-@deprecated(reason="The MultilevelParquetTable interface is from Gen2 i/o and will be removed after v26.",
- version="v25", category=FutureWarning)
-class MultilevelParquetTable(ParquetTable):
- """Wrapper to access dataframe with multi-level column index from Parquet
-
- This subclass of `ParquetTable` to handle the multi-level is necessary
- because there is not a convenient way to request specific table subsets
- by level via Parquet through pyarrow, as there is with a `pandas.DataFrame`.
-
- Additionally, pyarrow stores multilevel index information in a very strange
- way. Pandas stores it as a tuple, so that one can access a single column
- from a pandas dataframe as `df[('ref', 'HSC-G', 'coord_ra')]`. However, for
- some reason pyarrow saves these indices as "stringified" tuples, such that
- in order to read thissame column from a table written to Parquet, you would
- have to do the following:
-
- pf = pyarrow.ParquetFile(filename)
- df = pf.read(columns=["('ref', 'HSC-G', 'coord_ra')"])
-
- See also https://github.com/apache/arrow/issues/1771, where we've raised
- this issue.
-
- As multilevel-indexed dataframes can be very useful to store data like
- multiple filters' worth of data in the same table, this case deserves a
- wrapper to enable easier access;
- that's what this object is for. For example,
-
- parq = MultilevelParquetTable(filename)
- columnDict = {'dataset':'meas',
- 'filter':'HSC-G',
- 'column':['coord_ra', 'coord_dec']}
- df = parq.toDataFrame(columns=columnDict)
-
- will return just the coordinate columns; the equivalent of calling
- `df['meas']['HSC-G'][['coord_ra', 'coord_dec']]` on the total dataframe,
- but without having to load the whole frame into memory---this reads just
- those columns from disk. You can also request a sub-table; e.g.,
-
- parq = MultilevelParquetTable(filename)
- columnDict = {'dataset':'meas',
- 'filter':'HSC-G'}
- df = parq.toDataFrame(columns=columnDict)
-
- and this will be the equivalent of `df['meas']['HSC-G']` on the total dataframe.
-
- Parameters
- ----------
- filename : str, optional
- Path to Parquet file.
- dataFrame : dataFrame, optional
- """
-
- def __init__(self, *args, **kwargs):
- super(MultilevelParquetTable, self).__init__(*args, **kwargs)
-
- self._columnLevelNames = None
-
- @property
- def columnLevelNames(self):
- if self._columnLevelNames is None:
- self._columnLevelNames = {
- level: list(np.unique(np.array(self.columns)[:, i]))
- for i, level in enumerate(self.columnLevels)
- }
- return self._columnLevelNames
-
- @property
- def columnLevels(self):
- """Names of levels in column index
- """
- return self.columnIndex.names
-
- def _getColumnIndex(self):
- if self._df is not None:
- return super()._getColumnIndex()
- else:
- levelNames = [f["name"] for f in self.pandasMd["column_indexes"]]
- return pd.MultiIndex.from_tuples(self.columns, names=levelNames)
-
- def _getColumns(self):
- if self._df is not None:
- return super()._getColumns()
- else:
- columns = self._pf.metadata.schema.names
- n = len(self.pandasMd["column_indexes"])
- pattern = re.compile(", ".join(["'(.*)'"] * n))
- matches = [re.search(pattern, c) for c in columns]
- return [m.groups() for m in matches if m is not None]
-
- def toDataFrame(self, columns=None, droplevels=True):
- """Get table (or specified columns) as a pandas DataFrame
-
- To get specific columns in specified sub-levels:
-
- parq = MultilevelParquetTable(filename)
- columnDict = {'dataset':'meas',
- 'filter':'HSC-G',
- 'column':['coord_ra', 'coord_dec']}
- df = parq.toDataFrame(columns=columnDict)
-
- Or, to get an entire subtable, leave out one level name:
-
- parq = MultilevelParquetTable(filename)
- columnDict = {'dataset':'meas',
- 'filter':'HSC-G'}
- df = parq.toDataFrame(columns=columnDict)
-
- Parameters
- ----------
- columns : list or dict, optional
- Desired columns. If `None`, then all columns will be
- returned. If a list, then the names of the columns must
- be *exactly* as stored by pyarrow; that is, stringified tuples.
- If a dictionary, then the entries of the dictionary must
- correspond to the level names of the column multi-index
- (that is, the `columnLevels` attribute). Not every level
- must be passed; if any level is left out, then all entries
- in that level will be implicitly included.
- droplevels : bool
- If True drop levels of column index that have just one entry
-
- """
- if columns is None:
- if self._pf is None:
- return self._df
- else:
- return self._pf.read().to_pandas()
-
- if isinstance(columns, dict):
- columns = self._colsFromDict(columns)
-
- if self._pf is None:
- try:
- df = self._df[columns]
- except (AttributeError, KeyError):
- newColumns = [c for c in columns if c in self.columnIndex]
- if not newColumns:
- raise ValueError("None of the requested columns ({}) are available!".format(columns))
- df = self._df[newColumns]
- else:
- pfColumns = self._stringify(columns)
- try:
- df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
- except (AttributeError, KeyError):
- newColumns = [c for c in columns if c in self.columnIndex]
- if not newColumns:
- raise ValueError("None of the requested columns ({}) are available!".format(columns))
- pfColumns = self._stringify(newColumns)
- df = self._pf.read(columns=pfColumns, use_pandas_metadata=True).to_pandas()
-
- if droplevels:
- # Drop levels of column index that have just one entry
- levelsToDrop = [n for lev, n in zip(df.columns.levels, df.columns.names) if len(lev) == 1]
-
- # Prevent error when trying to drop *all* columns
- if len(levelsToDrop) == len(df.columns.names):
- levelsToDrop.remove(df.columns.names[-1])
-
- df.columns = df.columns.droplevel(levelsToDrop)
-
- return df
-
- def _colsFromDict(self, colDict):
- new_colDict = {}
- for i, lev in enumerate(self.columnLevels):
- if lev in colDict:
- if isinstance(colDict[lev], str):
- new_colDict[lev] = [colDict[lev]]
- else:
- new_colDict[lev] = colDict[lev]
- else:
- new_colDict[lev] = self.columnIndex.levels[i]
-
- levelCols = [new_colDict[lev] for lev in self.columnLevels]
- cols = product(*levelCols)
- return list(cols)
-
- def _stringify(self, cols):
- return [str(c) for c in cols]
diff --git a/tests/test_parquet.py b/tests/test_parquet.py
deleted file mode 100644
index f773e448da..0000000000
--- a/tests/test_parquet.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# This file is part of pipe_tasks.
-#
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (http://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-import warnings
-import unittest
-import copy
-import functools
-import pandas as pd
-from pandas.testing import assert_frame_equal
-
-import lsst.utils.tests
-
-import pyarrow as pa
-import pyarrow.parquet as pq
-
-from lsst.pipe.tasks.parquetTable import ParquetTable, MultilevelParquetTable
-
-
-def setup_module(module):
- lsst.utils.tests.init()
-
-
-class ParquetTableTestCase(unittest.TestCase):
- """Test case for ParquetTable
- """
-
- def simulateDF(self):
- """Create a simple test DataFrame
- """
- df = pd.DataFrame({
- "coord_ra": [3.77654137, 3.77643059, 3.77621148, 3.77611944, 3.77610396],
- "coord_dec": [0.01127624, 0.01127787, 0.01127543, 0.01127543, 0.01127543],
- "slot_Centroid_flag": [True, True, True, True, True],
- "slot_Centroid_x": [16208., 16344., 16613., 16726., 16745.],
- "slot_Centroid_y": [15905., 15907., 15904., 15904., 15904.],
- "slot_PsfFlux_apCorr": [0.98636465, 0.98437287, 0.97212515, 0.97179828, 0.97182371],
- "slot_PsfFlux_apCorrSigma": [0., 0., 0., 0., 0.],
- "slot_PsfFlux_flag": [True, True, True, True, True],
- "slot_PsfFlux_instFlux": [0.28106412, 1.98260751, 0.08900771, 1.11375753, 1.3835924],
- "slot_PsfFlux_instFluxSigma": [0.22967081, 0.25409701, 0.2120654, 0.23031162, 0.24262261],
- "calib_psfUsed": [False, False, False, False, False],
- "detect_isPatchInner": [False, False, False, False, False],
- "detect_isPrimary": [False, False, False, False, False],
- "detect_isTractInner": [True, True, True, True, True]})
- return df
-
- def setUp(self):
- self.df = self.simulateDF()
- with lsst.utils.tests.getTempFilePath('*.parq') as filename:
- table = pa.Table.from_pandas(self.df)
- pq.write_table(table, filename)
- self.parq, self.dfParq = self.getParq(filename, self.df)
-
- def tearDown(self):
- del self.df
- del self.parq
-
- def getParq(self, filename, df):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- fromFile, fromDF = ParquetTable(filename), ParquetTable(dataFrame=df)
-
- return fromFile, fromDF
-
- def testRoundTrip(self):
- self.assertTrue(self.parq.toDataFrame().equals(self.df))
-
- def testColumns(self):
- columns = ['coord_ra', 'coord_dec']
- self.assertTrue(self.parq.toDataFrame(columns=columns).equals(self.df[columns]))
-
- # TO DO: DM-21976 Confirm this is the behavior we want
- # Quietly ignore nonsense columns
- self.assertTrue(self.parq.toDataFrame(columns=columns + ['hello']).equals(self.df[columns]))
-
-
-class MultilevelParquetTableTestCase(ParquetTableTestCase):
- """Test case for MultilevelParquetTable
- """
-
- def simulateDF(self):
- self.datasets = ['meas', 'ref']
- self.filters = ['G', 'R']
- self.columns = ['coord_ra', 'coord_dec']
- simpleDF = super(MultilevelParquetTableTestCase, self).simulateDF()
- dfFilterDSCombos = []
- for ds in self.datasets:
- for filterName in self.filters:
- df = copy.copy(simpleDF)
- df.reindex(sorted(df.columns), axis=1)
- df['dataset'] = 'meas'
- df['filter'] = filterName
- df.columns = pd.MultiIndex.from_tuples([(ds, filterName, c) for c in df.columns],
- names=('dataset', 'filter', 'column'))
- dfFilterDSCombos.append(df)
-
- return functools.reduce(lambda d1, d2: d1.join(d2), dfFilterDSCombos)
-
- def getParq(self, filename, df):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- fromFile = MultilevelParquetTable(filename)
- fromDf = MultilevelParquetTable(dataFrame=df)
- return fromFile, fromDf
-
- def testProperties(self):
- self.assertTrue(all([x == y for x, y in zip(self.parq.columnLevels, self.df.columns.names)]))
- self.assertEqual(len(self.parq.columns), len(self.df.columns))
-
- self.assertTrue(all([x == y for x, y in zip(self.dfParq.columnLevels, self.df.columns.names)]))
- self.assertEqual(len(self.dfParq.columns), len(self.df.columns))
-
- def testColumns(self):
- df = self.df
- parq = self.parq
-
- # Case A, each level has multiple values
- datasets_A = self.datasets
- filters_A = self.filters
- columns_A = self.columns
- columnDict_A = {'dataset': datasets_A,
- 'filter': filters_A,
- 'column': columns_A
- }
- colTuples_A = [(self.datasets[0], self.filters[0], self.columns[0]),
- (self.datasets[0], self.filters[0], self.columns[1]),
- (self.datasets[0], self.filters[1], self.columns[0]),
- (self.datasets[0], self.filters[1], self.columns[1]),
- (self.datasets[1], self.filters[0], self.columns[0]),
- (self.datasets[1], self.filters[0], self.columns[1]),
- (self.datasets[1], self.filters[1], self.columns[0]),
- (self.datasets[1], self.filters[1], self.columns[1])]
- df_A = df[colTuples_A]
- assert_frame_equal(parq.toDataFrame(columns=columnDict_A), df_A)
-
- # Case A1, add a bogus column and test that it gets ignored
- datasets_A1 = self.datasets
- filters_A1 = self.filters
- columns_A1 = self.columns + ['garbage']
- columnDict_A1 = {'dataset': datasets_A1,
- 'filter': filters_A1,
- 'column': columns_A1}
- colTuples_A1 = [(self.datasets[0], self.filters[0], self.columns[0]),
- (self.datasets[0], self.filters[0], self.columns[1]),
- (self.datasets[0], self.filters[1], self.columns[0]),
- (self.datasets[0], self.filters[1], self.columns[1]),
- (self.datasets[1], self.filters[0], self.columns[0]),
- (self.datasets[1], self.filters[0], self.columns[1]),
- (self.datasets[1], self.filters[1], self.columns[0]),
- (self.datasets[1], self.filters[1], self.columns[1])]
- df_A1 = df[colTuples_A1]
- assert_frame_equal(parq.toDataFrame(columns=columnDict_A1), df_A1)
-
- # Case B: One level has only a single value
- datasets_B = self.datasets[0]
- filters_B = self.filters
- columns_B = self.columns
- columnDict_B = {'dataset': datasets_B,
- 'filter': filters_B,
- 'column': columns_B}
- colTuples_B = [(self.datasets[0], self.filters[0], self.columns[0]),
- (self.datasets[0], self.filters[0], self.columns[1]),
- (self.datasets[0], self.filters[1], self.columns[0]),
- (self.datasets[0], self.filters[1], self.columns[1])]
- df_B = df[colTuples_B]
- df_B.columns = df_B.columns.droplevel('dataset')
- assert_frame_equal(parq.toDataFrame(columns=columnDict_B), df_B)
- assert_frame_equal(df_B, parq.toDataFrame(columns=colTuples_B))
-
- # When explicit columns are not provided, comparison requires
- # first getting the column index in sorted order. Apparently this
- # happens by default in parq.toDataFrame(); to be honest, I'm not
- # exactly sure how/why.
-
- # Case C: Two levels have a single value; third is not provided
- datasets_C = self.datasets[0]
- filters_C = self.filters[0]
- columnDict_C = {'dataset': datasets_C,
- 'filter': filters_C}
- df_C = df[datasets_C][filters_C].sort_index(axis=1)
-
- self.assertTrue(parq.toDataFrame(columns=columnDict_C).equals(df_C))
-
- # Case D: Only one level (first level) is provided
- dataset_D = self.datasets[0]
- columnDict_D = {'dataset': dataset_D}
- df_D = df[dataset_D].sort_index(axis=1)
- self.assertTrue(parq.toDataFrame(columns=columnDict_D).equals(df_D))
-
- # Case E: Only one level (second level) is provided
- filters_E = self.filters[1]
- columnDict_E = {'filter': filters_E}
- # get second level of multi-index column using .xs()
- df_E = df.xs(filters_E, level=1, axis=1).sort_index(axis=1)
- self.assertTrue(parq.toDataFrame(columns=columnDict_E).equals(df_E))
-
- # Case when all requested columns don't exist
- columnDictNonsense = {'dataset': 'meas', 'filter': 'G', 'column': ('hello')}
- self.assertRaises(ValueError, parq.toDataFrame, columns=columnDictNonsense)
-
- # Case when some requested columns don't exist.
- # TO DO: DM-21976 Confirm this is the behavior we want
- # Quietly ignore nonsense columns
- columnDictSomeNonsense = {'dataset': 'meas', 'filter': 'G', 'column': ('coord_ra', 'hello')}
- dfGood = pd.DataFrame(df['meas']['G']['coord_ra'])
- self.assertTrue(parq.toDataFrame(columns=columnDictSomeNonsense).equals(dfGood))
-
-
-if __name__ == "__main__":
- lsst.utils.tests.init()
- unittest.main()