Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import bigframes.core.expression as ex
import bigframes.core.ordering as ordering
import bigframes.core.window_spec as windows
import bigframes.dtypes
import bigframes.dtypes as dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
Expand Down Expand Up @@ -409,6 +410,8 @@ def rank(
method: str = "average",
na_option: str = "keep",
ascending: bool = True,
grouping_cols: tuple[str, ...] = (),
columns: tuple[str, ...] = (),
):
if method not in ["average", "min", "max", "first", "dense"]:
raise ValueError(
Expand All @@ -417,8 +420,8 @@ def rank(
if na_option not in ["keep", "top", "bottom"]:
raise ValueError("na_option must be one of 'keep', 'top', or 'bottom'")

columns = block.value_columns
labels = block.column_labels
columns = columns or tuple(col for col in block.value_columns)
labels = [block.col_id_to_label[id] for id in columns]
# Step 1: Calculate row numbers for each row
# Identify null values to be treated according to na_option param
rownum_col_ids = []
Expand All @@ -442,9 +445,13 @@ def rank(
block, rownum_id = block.apply_window_op(
col if na_option == "keep" else nullity_col_id,
agg_ops.dense_rank_op if method == "dense" else agg_ops.count_op,
window_spec=windows.unbound(ordering=window_ordering)
window_spec=windows.unbound(
grouping_keys=grouping_cols, ordering=window_ordering
)
if method == "dense"
else windows.rows(following=0, ordering=window_ordering),
else windows.rows(
following=0, ordering=window_ordering, grouping_keys=grouping_cols
),
skip_reproject_unsafe=(col != columns[-1]),
)
rownum_col_ids.append(rownum_id)
Expand All @@ -462,12 +469,32 @@ def rank(
block, result_id = block.apply_window_op(
rownum_col_ids[i],
agg_op,
window_spec=windows.unbound(grouping_keys=(columns[i],)),
window_spec=windows.unbound(grouping_keys=(columns[i], *grouping_cols)),
skip_reproject_unsafe=(i < (len(columns) - 1)),
)
post_agg_rownum_col_ids.append(result_id)
rownum_col_ids = post_agg_rownum_col_ids

# Pandas masks all values where any grouping column is null
# Note: we use pd.NA instead of float('nan')
if grouping_cols:
predicate = functools.reduce(
ops.and_op.as_expr,
[ops.notnull_op.as_expr(column_id) for column_id in grouping_cols],
)
block = block.project_exprs(
[
ops.where_op.as_expr(
ex.deref(col),
predicate,
ex.const(None),
)
for col in rownum_col_ids
],
labels=labels,
)
rownum_col_ids = list(block.value_columns[-len(rownum_col_ids) :])

# Step 3: post processing: mask null values and cast to float
if method in ["min", "max", "first", "dense"]:
# Pandas rank always produces Float64, so must cast for aggregation types that produce ints
Expand Down
1 change: 1 addition & 0 deletions bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def literal_to_ibis_scalar(
)
# "correct" way would be to use ibis.array, but this produces invalid BQ SQL syntax
return tuple(literal)

if not pd.api.types.is_list_like(literal) and pd.isna(literal):
if ibis_dtype:
return bigframes_vendored.ibis.null().cast(ibis_dtype)
Expand Down
28 changes: 28 additions & 0 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ def median(self, numeric_only: bool = False, *, exact: bool = True) -> df.DataFr
return self.quantile(0.5)
return self._aggregate_all(agg_ops.median_op, numeric_only=True)

def rank(
self, method="average", ascending: bool = True, na_option: str = "keep"
) -> df.DataFrame:
return df.DataFrame(
block_ops.rank(
self._block,
method,
na_option,
ascending,
grouping_cols=tuple(self._by_col_ids),
columns=tuple(self._selected_cols),
)
)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
) -> df.DataFrame:
Expand Down Expand Up @@ -574,6 +588,20 @@ def sum(self, *args) -> series.Series:
def mean(self, *args) -> series.Series:
return self._aggregate(agg_ops.mean_op)

def rank(
self, method="average", ascending: bool = True, na_option: str = "keep"
) -> series.Series:
return series.Series(
block_ops.rank(
self._block,
method,
na_option,
ascending,
grouping_cols=tuple(self._by_col_ids),
columns=(self._value_column,),
)
)

def median(
self,
*args,
Expand Down
134 changes: 133 additions & 1 deletion tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pytest

import bigframes.pandas as bpd
from tests.system.utils import assert_pandas_df_equal
from tests.system.utils import assert_pandas_df_equal, skip_legacy_pandas

# =================
# DataFrame.groupby
Expand Down Expand Up @@ -94,6 +94,72 @@ def test_dataframe_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q
)


@skip_legacy_pandas
@pytest.mark.parametrize(
("na_option", "method", "ascending"),
[
(
"keep",
"average",
True,
),
(
"top",
"min",
False,
),
(
"bottom",
"max",
False,
),
(
"top",
"first",
False,
),
(
"bottom",
"dense",
False,
),
],
)
def test_dataframe_groupby_rank(
scalars_df_index,
scalars_pandas_df_index,
na_option,
method,
ascending,
):
col_names = ["int64_too", "float64_col", "int64_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
).to_pandas()
pd_result = (
(
scalars_pandas_df_index[col_names]
.groupby("string_col")
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
)
.astype("float64")
.astype("Float64")
)
pd.testing.assert_frame_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("operator"),
[
Expand Down Expand Up @@ -534,6 +600,72 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
)


@skip_legacy_pandas
@pytest.mark.parametrize(
("na_option", "method", "ascending"),
[
(
"keep",
"average",
True,
),
(
"top",
"min",
False,
),
(
"bottom",
"max",
False,
),
(
"top",
"first",
False,
),
(
"bottom",
"dense",
False,
),
],
)
def test_series_groupby_rank(
scalars_df_index,
scalars_pandas_df_index,
na_option,
method,
ascending,
):
col_names = ["int64_col", "string_col"]
bf_result = (
scalars_df_index[col_names]
.groupby("string_col")["int64_col"]
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
).to_pandas()
pd_result = (
(
scalars_pandas_df_index[col_names]
.groupby("string_col")["int64_col"]
.rank(
na_option=na_option,
method=method,
ascending=ascending,
)
)
.astype("float64")
.astype("Float64")
)
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize("dropna", [True, False])
def test_series_groupby_head(scalars_df_index, scalars_pandas_df_index, dropna):
bf_result = (
Expand Down
71 changes: 71 additions & 0 deletions third_party/bigframes_vendored/pandas/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,77 @@ def var(
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def rank(
self,
method: str = "average",
ascending: bool = True,
na_option: str = "keep",
):
"""
Provide the rank of values within each group.

**Examples:**

>>> import bigframes.pandas as bpd
>>> import numpy as np
>>> bpd.options.display.progress_bar = None

>>> df = bpd.DataFrame(
... {
... "group": ["a", "a", "a", "a", "a", "b", "b", "b", "b", "b"],
... "value": [2, 4, 2, 3, 5, 1, 2, 4, 1, 5],
... }
... )
>>> df
group value
0 a 2
1 a 4
2 a 2
3 a 3
4 a 5
5 b 1
6 b 2
7 b 4
8 b 1
9 b 5
<BLANKLINE>
[10 rows x 2 columns]
>>> for method in ['average', 'min', 'max', 'dense', 'first']:
... df[f'{method}_rank'] = df.groupby('group')['value'].rank(method)
>>> df
group value average_rank min_rank max_rank dense_rank first_rank
0 a 2 1.5 1.0 2.0 1.0 1.0
1 a 4 4.0 4.0 4.0 3.0 4.0
2 a 2 1.5 1.0 2.0 1.0 2.0
3 a 3 3.0 3.0 3.0 2.0 3.0
4 a 5 5.0 5.0 5.0 4.0 5.0
5 b 1 1.5 1.0 2.0 1.0 1.0
6 b 2 3.0 3.0 3.0 2.0 3.0
7 b 4 4.0 4.0 4.0 3.0 4.0
8 b 1 1.5 1.0 2.0 1.0 2.0
9 b 5 5.0 5.0 5.0 4.0 5.0
<BLANKLINE>
[10 rows x 7 columns]

Args:
method ({'average', 'min', 'max', 'first', 'dense'}, default 'average'):
* average: average rank of group.
* min: lowest rank in group.
* max: highest rank in group.
* first: ranks assigned in order they appear in the array.
* dense: like 'min', but rank always increases by 1 between groups.
ascending (bool, default True):
False for ranks by high (1) to low (N).
na_option ({'keep', 'top', 'bottom'}, default 'keep'):
* keep: leave NA values where they are.
* top: smallest rank if ascending.
* bottom: smallest rank if descending.

Returns:
DataFrame with ranking of values within each group
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def skew(
self,
*,
Expand Down