diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 8ef3aa123b..0e9525d5af 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -26,6 +26,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as ordering import bigframes.core.window_spec as windows +import bigframes.dtypes import bigframes.dtypes as dtypes import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -409,6 +410,8 @@ def rank( method: str = "average", na_option: str = "keep", ascending: bool = True, + grouping_cols: tuple[str, ...] = (), + columns: tuple[str, ...] = (), ): if method not in ["average", "min", "max", "first", "dense"]: raise ValueError( @@ -417,8 +420,8 @@ def rank( if na_option not in ["keep", "top", "bottom"]: raise ValueError("na_option must be one of 'keep', 'top', or 'bottom'") - columns = block.value_columns - labels = block.column_labels + columns = columns or tuple(col for col in block.value_columns) + labels = [block.col_id_to_label[id] for id in columns] # Step 1: Calculate row numbers for each row # Identify null values to be treated according to na_option param rownum_col_ids = [] @@ -442,9 +445,13 @@ def rank( block, rownum_id = block.apply_window_op( col if na_option == "keep" else nullity_col_id, agg_ops.dense_rank_op if method == "dense" else agg_ops.count_op, - window_spec=windows.unbound(ordering=window_ordering) + window_spec=windows.unbound( + grouping_keys=grouping_cols, ordering=window_ordering + ) if method == "dense" - else windows.rows(following=0, ordering=window_ordering), + else windows.rows( + following=0, ordering=window_ordering, grouping_keys=grouping_cols + ), skip_reproject_unsafe=(col != columns[-1]), ) rownum_col_ids.append(rownum_id) @@ -462,12 +469,32 @@ def rank( block, result_id = block.apply_window_op( rownum_col_ids[i], agg_op, - window_spec=windows.unbound(grouping_keys=(columns[i],)), + window_spec=windows.unbound(grouping_keys=(columns[i], *grouping_cols)), skip_reproject_unsafe=(i < (len(columns) - 1)), ) post_agg_rownum_col_ids.append(result_id) rownum_col_ids = post_agg_rownum_col_ids + # Pandas masks all values where any grouping column is null + # Note: we use pd.NA instead of float('nan') + if grouping_cols: + predicate = functools.reduce( + ops.and_op.as_expr, + [ops.notnull_op.as_expr(column_id) for column_id in grouping_cols], + ) + block = block.project_exprs( + [ + ops.where_op.as_expr( + ex.deref(col), + predicate, + ex.const(None), + ) + for col in rownum_col_ids + ], + labels=labels, + ) + rownum_col_ids = list(block.value_columns[-len(rownum_col_ids) :]) + # Step 3: post processing: mask null values and cast to float if method in ["min", "max", "first", "dense"]: # Pandas rank always produces Float64, so must cast for aggregation types that produce ints diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index c47c6cf07b..54a5a37736 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -397,6 +397,7 @@ def literal_to_ibis_scalar( ) # "correct" way would be to use ibis.array, but this produces invalid BQ SQL syntax return tuple(literal) + if not pd.api.types.is_list_like(literal) and pd.isna(literal): if ibis_dtype: return bigframes_vendored.ibis.null().cast(ibis_dtype) diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index f619cd72c9..126d2f4dd2 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -174,6 +174,20 @@ def median(self, numeric_only: bool = False, *, exact: bool = True) -> df.DataFr return self.quantile(0.5) return self._aggregate_all(agg_ops.median_op, numeric_only=True) + def rank( + self, method="average", ascending: bool = True, na_option: str = "keep" + ) -> df.DataFrame: + return df.DataFrame( + block_ops.rank( + self._block, + method, + na_option, + ascending, + grouping_cols=tuple(self._by_col_ids), + columns=tuple(self._selected_cols), + ) + ) + def quantile( self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False ) -> df.DataFrame: @@ -574,6 +588,20 @@ def sum(self, *args) -> series.Series: def mean(self, *args) -> series.Series: return self._aggregate(agg_ops.mean_op) + def rank( + self, method="average", ascending: bool = True, na_option: str = "keep" + ) -> series.Series: + return series.Series( + block_ops.rank( + self._block, + method, + na_option, + ascending, + grouping_cols=tuple(self._by_col_ids), + columns=(self._value_column,), + ) + ) + def median( self, *args, diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index cbf6e1269d..b7101c90f3 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -16,7 +16,7 @@ import pytest import bigframes.pandas as bpd -from tests.system.utils import assert_pandas_df_equal +from tests.system.utils import assert_pandas_df_equal, skip_legacy_pandas # ================= # DataFrame.groupby @@ -94,6 +94,72 @@ def test_dataframe_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q ) +@skip_legacy_pandas +@pytest.mark.parametrize( + ("na_option", "method", "ascending"), + [ + ( + "keep", + "average", + True, + ), + ( + "top", + "min", + False, + ), + ( + "bottom", + "max", + False, + ), + ( + "top", + "first", + False, + ), + ( + "bottom", + "dense", + False, + ), + ], +) +def test_dataframe_groupby_rank( + scalars_df_index, + scalars_pandas_df_index, + na_option, + method, + ascending, +): + col_names = ["int64_too", "float64_col", "int64_col", "string_col"] + bf_result = ( + scalars_df_index[col_names] + .groupby("string_col") + .rank( + na_option=na_option, + method=method, + ascending=ascending, + ) + ).to_pandas() + pd_result = ( + ( + scalars_pandas_df_index[col_names] + .groupby("string_col") + .rank( + na_option=na_option, + method=method, + ascending=ascending, + ) + ) + .astype("float64") + .astype("Float64") + ) + pd.testing.assert_frame_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("operator"), [ @@ -534,6 +600,72 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): ) +@skip_legacy_pandas +@pytest.mark.parametrize( + ("na_option", "method", "ascending"), + [ + ( + "keep", + "average", + True, + ), + ( + "top", + "min", + False, + ), + ( + "bottom", + "max", + False, + ), + ( + "top", + "first", + False, + ), + ( + "bottom", + "dense", + False, + ), + ], +) +def test_series_groupby_rank( + scalars_df_index, + scalars_pandas_df_index, + na_option, + method, + ascending, +): + col_names = ["int64_col", "string_col"] + bf_result = ( + scalars_df_index[col_names] + .groupby("string_col")["int64_col"] + .rank( + na_option=na_option, + method=method, + ascending=ascending, + ) + ).to_pandas() + pd_result = ( + ( + scalars_pandas_df_index[col_names] + .groupby("string_col")["int64_col"] + .rank( + na_option=na_option, + method=method, + ascending=ascending, + ) + ) + .astype("float64") + .astype("Float64") + ) + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize("dropna", [True, False]) def test_series_groupby_head(scalars_df_index, scalars_pandas_df_index, dropna): bf_result = ( diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index 1e30d827ca..31a9aa6a93 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -363,6 +363,77 @@ def var( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def rank( + self, + method: str = "average", + ascending: bool = True, + na_option: str = "keep", + ): + """ + Provide the rank of values within each group. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import numpy as np + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame( + ... { + ... "group": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"], + ... "value": [2, 4, 2, 3, 5, 1, 2, 4, 1, 5], + ... } + ... ) + >>> df + group value + 0 a 2 + 1 a 4 + 2 a 2 + 3 a 3 + 4 a 5 + 5 b 1 + 6 b 2 + 7 b 4 + 8 b 1 + 9 b 5 + + [10 rows x 2 columns] + >>> for method in ['average', 'min', 'max', 'dense', 'first']: + ... df[f'{method}_rank'] = df.groupby('group')['value'].rank(method) + >>> df + group value average_rank min_rank max_rank dense_rank first_rank + 0 a 2 1.5 1.0 2.0 1.0 1.0 + 1 a 4 4.0 4.0 4.0 3.0 4.0 + 2 a 2 1.5 1.0 2.0 1.0 2.0 + 3 a 3 3.0 3.0 3.0 2.0 3.0 + 4 a 5 5.0 5.0 5.0 4.0 5.0 + 5 b 1 1.5 1.0 2.0 1.0 1.0 + 6 b 2 3.0 3.0 3.0 2.0 3.0 + 7 b 4 4.0 4.0 4.0 3.0 4.0 + 8 b 1 1.5 1.0 2.0 1.0 2.0 + 9 b 5 5.0 5.0 5.0 4.0 5.0 + + [10 rows x 7 columns] + + Args: + method ({'average', 'min', 'max', 'first', 'dense'}, default 'average'): + * average: average rank of group. + * min: lowest rank in group. + * max: highest rank in group. + * first: ranks assigned in order they appear in the array. + * dense: like 'min', but rank always increases by 1 between groups. + ascending (bool, default True): + False for ranks by high (1) to low (N). + na_option ({'keep', 'top', 'bottom'}, default 'keep'): + * keep: leave NA values where they are. + * top: smallest rank if ascending. + * bottom: smallest rank if descending. + + Returns: + DataFrame with ranking of values within each group + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def skew( self, *,