Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def is_dict_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Mapping]


def combine_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex:
"""Combines indices into multi-index while preserving dtypes, names."""
"""Combines indices into multi-index while preserving dtypes, names merging by rows 1:1"""
multi_index = pd.MultiIndex.from_frame(
pd.concat([index1.to_frame(index=False), index2.to_frame(index=False)], axis=1)
)
Expand All @@ -61,6 +61,20 @@ def combine_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex:
return multi_index


def cross_indices(index1: pd.Index, index2: pd.Index) -> pd.MultiIndex:
"""Combines indices into multi-index while preserving dtypes, names using cross product"""
multi_index = pd.MultiIndex.from_frame(
pd.merge(
left=index1.to_frame(index=False),
right=index2.to_frame(index=False),
how="cross",
)
)
# to_frame will produce numbered default names, we don't want these
multi_index.names = [*index1.names, *index2.names]
return multi_index


def index_as_tuples(index: pd.Index) -> typing.Sequence[typing.Tuple]:
if isinstance(index, pd.MultiIndex):
return [label for label in index]
Expand Down
37 changes: 37 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,35 @@ def combine(
def combine_first(self, other: DataFrame):
return self._apply_dataframe_binop(other, ops.fillna_op)

def _fast_stat_matrix(self, op: agg_ops.BinaryAggregateOp) -> DataFrame:
"""Faster corr, cov calculations, but creates more sql text, so cannot scale to many columns"""
assert len(self.columns) * len(self.columns) < bigframes.constants.MAX_COLUMNS
orig_columns = self.columns
frame = self.copy()
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
frame.columns = pandas.Index(range(len(orig_columns)))
frame = frame.astype(bigframes.dtypes.FLOAT_DTYPE)
block = frame._block

aggregations = [
ex.BinaryAggregation(op, ex.deref(left_col), ex.deref(right_col))
for left_col in block.value_columns
for right_col in block.value_columns
]
# unique columns stops
uniq_orig_columns = utils.combine_indices(
orig_columns, pandas.Index(range(len(orig_columns)))
)
labels = utils.cross_indices(uniq_orig_columns, uniq_orig_columns)

block, _ = block.aggregate(aggregations=aggregations, column_labels=labels)

block = block.stack(levels=orig_columns.nlevels + 1)
# The aggregate operation crated a index level with just 0, need to drop it
# Also, drop the last level of each index, which was created to guarantee uniqueness
return DataFrame(block).droplevel(0).droplevel(-1, axis=0).droplevel(-1, axis=1)

def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFrame:
if method != "pearson":
raise NotImplementedError(
Expand All @@ -1285,6 +1314,10 @@ def corr(self, method="pearson", min_periods=None, numeric_only=False) -> DataFr
else:
frame = self._drop_non_numeric()

if len(frame.columns) <= 30:
return frame._fast_stat_matrix(agg_ops.CorrOp())

frame = frame.copy()
orig_columns = frame.columns
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
Expand Down Expand Up @@ -1393,6 +1426,10 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame:
else:
frame = self._drop_non_numeric()

if len(frame.columns) <= 30:
return frame._fast_stat_matrix(agg_ops.CovOp())

frame = frame.copy()
orig_columns = frame.columns
# Replace column names with 0 to n - 1 to keep order
# and avoid the influence of duplicated column name
Expand Down
4 changes: 2 additions & 2 deletions tests/system/large/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# See: https://github.com/python/cpython/issues/112282
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
)
def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
def test_corr_150_columns(scalars_df_numeric_150_columns_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
bf_result = scalars_df.corr(numeric_only=True).to_pandas()
pd_result = scalars_pandas_df.corr(numeric_only=True)
Expand All @@ -28,7 +28,7 @@ def test_corr_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
# See: https://github.com/python/cpython/issues/112282
reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.",
)
def test_cov_w_numeric_only(scalars_df_numeric_150_columns_maybe_ordered):
def test_cov_150_columns(scalars_df_numeric_150_columns_maybe_ordered):
scalars_df, scalars_pandas_df = scalars_df_numeric_150_columns_maybe_ordered
bf_result = scalars_df.cov(numeric_only=True).to_pandas()
pd_result = scalars_pandas_df.cov(numeric_only=True)
Expand Down
4 changes: 2 additions & 2 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2214,7 +2214,7 @@ def test_combine_first(
),
],
)
def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
def test_df_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
scalars_df, scalars_pandas_df = scalars_dfs_maybe_ordered

bf_result = scalars_df[columns].corr(numeric_only=numeric_only).to_pandas()
Expand All @@ -2228,7 +2228,7 @@ def test_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only):
)


def test_corr_w_invalid_parameters(scalars_dfs):
def test_df_corr_w_invalid_parameters(scalars_dfs):
columns = ["int64_too", "int64_col", "float64_col"]
scalars_df, _ = scalars_dfs

Expand Down