From 90b60b7862cff4df9ee04ce13959a6f7f80c4b8a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 4 Feb 2025 21:39:33 +0000 Subject: [PATCH] perf: Simplify merge join key coalescing --- bigframes/core/blocks.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 43f605dc03..b1f4ed35cc 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2077,14 +2077,12 @@ def merge( result_columns = [] matching_join_labels = [] - coalesced_ids = [] - for left_id, right_id in zip(left_join_ids, right_join_ids): - joined_expr, coalesced_id = joined_expr.project_to_id( - ops.coalesce_op.as_expr( - get_column_left[left_id], get_column_right[right_id] - ), - ) - coalesced_ids.append(coalesced_id) + left_post_join_ids = tuple(get_column_left[id] for id in left_join_ids) + right_post_join_ids = tuple(get_column_right[id] for id in right_join_ids) + + joined_expr, coalesced_ids = coalesce_columns( + joined_expr, left_post_join_ids, right_post_join_ids, how=how, drop=False + ) for col_id in self.value_columns: if col_id in left_join_ids: @@ -2102,7 +2100,6 @@ def merge( result_columns.append(get_column_left[col_id]) for col_id in other.value_columns: if col_id in right_join_ids: - key_part = right_join_ids.index(col_id) if other.col_id_to_label[matching_right_id] in matching_join_labels: pass else: @@ -2928,26 +2925,31 @@ def resolve_label_id(label: Label) -> str: ) +# TODO: Rewrite just to return expressions def coalesce_columns( expr: core.ArrayValue, left_ids: typing.Sequence[str], right_ids: typing.Sequence[str], how: str, + drop: bool = True, ) -> Tuple[core.ArrayValue, Sequence[str]]: result_ids = [] for left_id, right_id in zip(left_ids, right_ids): if how == "left" or how == "inner" or how == "cross": result_ids.append(left_id) - expr = expr.drop_columns([right_id]) + if drop: + expr = expr.drop_columns([right_id]) elif how == "right": result_ids.append(right_id) - expr = expr.drop_columns([left_id]) + if drop: + expr = expr.drop_columns([left_id]) elif how == "outer": coalesced_id = guid.generate_guid() expr, coalesced_id = expr.project_to_id( ops.coalesce_op.as_expr(left_id, right_id) ) - expr = expr.drop_columns([left_id, right_id]) + if drop: + expr = expr.drop_columns([left_id, right_id]) result_ids.append(coalesced_id) else: raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}")