From 73b08c8f122607e3042872efbdff4232c4315a5a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 22 Jan 2025 02:08:07 +0000 Subject: [PATCH 1/3] perf: Speed up DataFrame corr, cov --- bigframes/core/utils.py | 16 +++++++++++- bigframes/dataframe.py | 37 +++++++++++++++++++++++++--- tests/system/large/test_dataframe.py | 4 +-- tests/system/small/test_dataframe.py | 4 +-- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index f9ca6cb5f0..25ca51e26f 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -48,7 +48,7 @@ def is_dict_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Mapping] def combine_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex: - """Combines indices into multi-index while preserving dtypes, names.""" + """Combines indices into multi-index while preserving dtypes, names merging by rows 1:1""" multi_index = pd.MultiIndex.from_frame( pd.concat([index1.to_frame(index=False), index2.to_frame(index=False)], axis=1) ) @@ -57,6 +57,20 @@ def combine_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex: return multi_index +def cross_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex: + """Combines indices into multi-index while preserving dtypes, names using cross product""" + multi_index = pd.MultiIndex.from_frame( + pd.merge( + left=index1.to_frame(index=False), + right=index2.to_frame(index=False), + how="cross", + ) + ) + # to_frame will produce numbered default names, we don't want these + multi_index.names = [*index1.names, *index2.names] + return multi_index + + def index_as_tuples(index: pd.Index) -> typing.Sequence[typing.Tuple]: if isinstance(index, pd.MultiIndex): return [label for label in index] diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index b2b22fbdbf..620bfb8b41 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1261,6 +1261,29 @@ def combine( def combine_first(self, other: DataFrame): return self._apply_dataframe_binop(other, ops.fillna_op) + def _fast_stat_matrix(self, op: agg_ops.BinaryAggregateOp) -> DataFrame: + """Faster corr, cov calculations, but creates more sql text, so cannot scale to many columns""" + assert len(self.columns) * len(self.columns) < bigframes.constants.MAX_COLUMNS + orig_columns = self.columns + frame = self.copy() + # Replace column names with 0 to n - 1 to keep order + # and avoid the influence of duplicated column name + frame.columns = pandas.Index(range(len(orig_columns))) + frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE) + block = frame._block + + aggregations = [ + ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col)) + for left_col in block.value_columns + for right_col in block.value_columns + ] + labels = utils.cross_indices(orig_columns, orig_columns) + + block, _ = block.aggregate(aggregations=aggregations, column_labels=labels) + block = block.stack(levels=orig_columns.nlevels) + # The aggregate operation crated a index level with just 0, need to drop it + return DataFrame(block).droplevel(0) + def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFrame: if method != "pearson": raise NotImplementedError( @@ -1272,9 +1295,12 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr ) if not numeric_only: - frame = self._raise_on_non_numeric("corr") + frame = self._raise_on_non_numeric("corr").copy() else: - frame = self._drop_non_numeric() + frame = self._drop_non_numeric().copy() + + if len(frame.columns) <= 30: + return self._fast_stat_matrix(agg_ops.CorrOp()) orig_columns = frame.columns # Replace column names with 0 to n - 1 to keep order @@ -1380,9 +1406,12 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr def cov(self, *, numeric_only: bool = False) -> DataFrame: if not numeric_only: - frame = self._raise_on_non_numeric("corr") + frame = self._raise_on_non_numeric("corr").copy() else: - frame = self._drop_non_numeric() + frame = self._drop_non_numeric().copy() + + if len(frame.columns) <= 30: + return self._fast_stat_matrix(agg_ops.CovOp()) orig_columns = frame.columns # Replace column names with 0 to n - 1 to keep order diff --git a/tests/system/large/test_dataframe.py b/tests/system/large/test_dataframe.py index 20d383463a..396f2eb436 100644 --- a/tests/system/large/test_dataframe.py +++ b/tests/system/large/test_dataframe.py @@ -9,7 +9,7 @@ # See: https://github.com/python/cpython/issues/112282 reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", ) -def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): +def test_corr_150_columns(scalars_df_numeric_150_columns_maybe_ordered): scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered bf_result = scalars_df.corr(numeric_only=True).to_pandas() pd_result = scalars_pandas_df.corr(numeric_only=True) @@ -28,7 +28,7 @@ def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): # See: https://github.com/python/cpython/issues/112282 reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", ) -def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered): +def test_cov_150_columns(scalars_df_numeric_150_columns_maybe_ordered): scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered bf_result = scalars_df.cov(numeric_only=True).to_pandas() pd_result = scalars_pandas_df.cov(numeric_only=True) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index e7d6ad67e1..2a1994e2de 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2183,7 +2183,7 @@ def test_combine_first( ), ], ) -def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): +def test_df_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas() @@ -2197,7 +2197,7 @@ def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): ) -def test_corr_w_invalid_parameters(scalars_dfs): +def test_df_corr_w_invalid_parameters(scalars_dfs): columns = ["int64_too", "int64_col", "float64_col"] scalars_df, _ = scalars_dfs From cee9da94599b41a783f3d9be9891a6522c41d05b Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 23 Jan 2025 01:25:18 +0000 Subject: [PATCH 2/3] fix non-unique corr fastpath --- bigframes/dataframe.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 620bfb8b41..377b921550 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1277,12 +1277,18 @@ def _fast_stat_matrix(self, op: agg_ops.BinaryAggregateOp) -> DataFrame: for left_col in block.value_columns for right_col in block.value_columns ] - labels = utils.cross_indices(orig_columns, orig_columns) + # unique columns stops + uniq_orig_columns = utils.combine_indices( + orig_columns, pandas.Index(range(len(orig_columns))) + ) + labels = utils.cross_indices(uniq_orig_columns, uniq_orig_columns) block, _ = block.aggregate(aggregations=aggregations, column_labels=labels) - block = block.stack(levels=orig_columns.nlevels) + + block = block.stack(levels=orig_columns.nlevels + 1) # The aggregate operation crated a index level with just 0, need to drop it - return DataFrame(block).droplevel(0) + # Also, drop the last level of each index, which was created to guarantee uniqueness + return DataFrame(block).droplevel(0).droplevel(-1, axis=0).droplevel(-1, axis=1) def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFrame: if method != "pearson": @@ -1295,13 +1301,14 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr ) if not numeric_only: - frame = self._raise_on_non_numeric("corr").copy() + frame = self._raise_on_non_numeric("corr") else: - frame = self._drop_non_numeric().copy() + frame = self._drop_non_numeric() if len(frame.columns) <= 30: - return self._fast_stat_matrix(agg_ops.CorrOp()) + return frame._fast_stat_matrix(agg_ops.CorrOp()) + frame = frame.copy() orig_columns = frame.columns # Replace column names with 0 to n - 1 to keep order # and avoid the influence of duplicated column name From dbd0b77af92a755e22114b5cebc28a954040bd34 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 23 Jan 2025 03:01:52 +0000 Subject: [PATCH 3/3] fix numeric dropping logic --- bigframes/dataframe.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 377b921550..4634e21d48 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1413,13 +1413,14 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr def cov(self, *, numeric_only: bool = False) -> DataFrame: if not numeric_only: - frame = self._raise_on_non_numeric("corr").copy() + frame = self._raise_on_non_numeric("corr") else: - frame = self._drop_non_numeric().copy() + frame = self._drop_non_numeric() if len(frame.columns) <= 30: - return self._fast_stat_matrix(agg_ops.CovOp()) + return frame._fast_stat_matrix(agg_ops.CovOp()) + frame = frame.copy() orig_columns = frame.columns # Replace column names with 0 to n - 1 to keep order # and avoid the influence of duplicated column name