From d50e77b5b469fc708b85fa2fe5558d680d570193 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 May 2024 00:04:46 +0000 Subject: [PATCH 1/6] fix: Properly identify non-unique index in non-pk tables --- bigframes/session/_io/bigquery/read_gbq_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 87083529ce..4067cb7712 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -170,7 +170,7 @@ def are_index_cols_unique( # If index_cols contain the primary_keys, the query engine assumes they are # provide a unique index. primary_keys = frozenset(_get_primary_keys(table)) - if primary_keys <= frozenset(index_cols): + if (len(primary_keys) > 0) and primary_keys <= frozenset(index_cols): return True # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring From 4d612956a5a276595ce56df3ac11a229293f61ef Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 May 2024 00:28:13 +0000 Subject: [PATCH 2/6] fix previously unused code --- bigframes/core/compile/compiled.py | 2 +- bigframes/core/sql.py | 17 ++++++++++++++--- .../session/_io/bigquery/read_gbq_table.py | 3 +-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 1c2217c25a..552061f612 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -897,7 +897,7 @@ def to_sql( output_columns = [ col_id_overrides.get(col, col) for col in baked_ir.column_ids ] - sql = bigframes.core.sql.select_from(output_columns, sql) + sql = bigframes.core.sql.select_from_subquery(output_columns, sql) # Single row frames may not have any ordering columns if len(baked_ir._ordering.all_ordering_columns) > 0: diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py index cacf86d234..7e6704fd7e 100644 --- a/bigframes/core/sql.py +++ b/bigframes/core/sql.py @@ -111,7 +111,7 @@ def infix_op(opname: str, left_arg: str, right_arg: str): ### Writing SELECT expressions -def select_from(columns: Iterable[str], subquery: str, distinct: bool = False): +def select_from_subquery(columns: Iterable[str], subquery: str, distinct: bool = False): selection = ", ".join(map(identifier, columns)) distinct_clause = "DISTINCT " if distinct else "" @@ -120,16 +120,27 @@ def select_from(columns: Iterable[str], subquery: str, distinct: bool = False): ) +def select_from_table_ref( + columns: Iterable[str], table_ref: bigquery.TableReference, distinct: bool = False +): + selection = ", ".join(map(identifier, columns)) + distinct_clause = "DISTINCT " if distinct else "" + + return textwrap.dedent( + f"SELECT {distinct_clause}{selection}\nFROM {table_reference(table_ref)}" + ) + + def select_table(table_ref: bigquery.TableReference): return textwrap.dedent(f"SELECT * FROM {table_reference(table_ref)}") def is_distinct_sql(columns: Iterable[str], table_sql: str) -> str: is_unique_sql = f"""WITH full_table AS ( - {select_from(columns, table_sql)} + {select_from_table_ref(columns, table_sql)} ), distinct_table AS ( - {select_from(columns, table_sql, distinct=True)} + {select_from_table_ref(columns, table_sql, distinct=True)} ) SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 4067cb7712..210436f5e5 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -175,8 +175,7 @@ def are_index_cols_unique( # TODO(b/337925142): Avoid a "SELECT *" subquery here by ensuring # table_expression only selects just index_cols. - table_sql = ibis_client.compile(table) - is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table_sql) + is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table.reference) job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name results = bqclient.query_and_wait(is_unique_sql, job_config=job_config) From 6e8b62e8e1a74ab45ca91dc0f7a14436775a16e9 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 May 2024 00:47:03 +0000 Subject: [PATCH 3/6] fix unit tests --- bigframes/core/sql.py | 6 +++--- tests/unit/session/test_session.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py index 7e6704fd7e..c1e319b860 100644 --- a/bigframes/core/sql.py +++ b/bigframes/core/sql.py @@ -135,12 +135,12 @@ def select_table(table_ref: bigquery.TableReference): return textwrap.dedent(f"SELECT * FROM {table_reference(table_ref)}") -def is_distinct_sql(columns: Iterable[str], table_sql: str) -> str: +def is_distinct_sql(columns: Iterable[str], table_ref: bigquery.TableReference) -> str: is_unique_sql = f"""WITH full_table AS ( - {select_from_table_ref(columns, table_sql)} + {select_from_table_ref(columns, table_ref)} ), distinct_table AS ( - {select_from_table_ref(columns, table_sql, distinct=True)} + {select_from_table_ref(columns, table_ref, distinct=True)} ) SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index bea858e037..111bfe0e94 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -200,6 +200,7 @@ def test_default_index_warning_raised_by_read_gbq(table): bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table + bqclient.query_and_wait.return_value = ({"total_count": 3, "distinct_count": 2},) session = resources.create_bigquery_session(bqclient=bqclient) table._properties["location"] = session._location @@ -222,6 +223,7 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64 bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" bqclient.get_table.return_value = table + bqclient.query_and_wait.return_value = ({"total_count": 4, "distinct_count": 3},) session = resources.create_bigquery_session(bqclient=bqclient) table._properties["location"] = session._location From 387286c82e6ca3a3bc816e2ae884e5f28ae11d09 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 May 2024 00:50:59 +0000 Subject: [PATCH 4/6] fix one more unit test --- tests/unit/session/test_session.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 111bfe0e94..4f5daebc87 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -179,6 +179,9 @@ def get_table_mock(table_ref): return table session.bqclient.get_table = get_table_mock + session.bqclient.query_and_wait.return_value = ( + {"total_count": 3, "distinct_count": 2}, + ) with pytest.warns(UserWarning, match=re.escape("use_cache=False")): df = session.read_gbq("my-project.my_dataset.my_table") From 69dce63c6529cb3d129752851ac6bf488839a8f3 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 May 2024 01:57:16 +0000 Subject: [PATCH 5/6] fix index uniqueness under no index --- bigframes/session/__init__.py | 1 - bigframes/session/_io/bigquery/read_gbq_table.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 89637644cf..ab53de8041 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -779,7 +779,6 @@ def _read_gbq_table( # check. is_index_unique = bf_read_gbq_table.are_index_cols_unique( bqclient=self.bqclient, - ibis_client=self.ibis_client, table=table, index_cols=index_cols, api_name=api_name, diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 210436f5e5..92e007c804 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -162,11 +162,12 @@ def get_ibis_time_travel_table( def are_index_cols_unique( bqclient: bigquery.Client, - ibis_client: ibis.BaseBackend, table: bigquery.table.Table, index_cols: List[str], api_name: str, ) -> bool: + if len(index_cols) == 0: + return False # If index_cols contain the primary_keys, the query engine assumes they are # provide a unique index. primary_keys = frozenset(_get_primary_keys(table)) From fe8b7bdc1debb9c0ba1ef23db8cfc6e4130b9ea3 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 17 May 2024 16:26:53 +0000 Subject: [PATCH 6/6] add unit test --- tests/unit/session/test_read_gbq_table.py | 71 +++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/unit/session/test_read_gbq_table.py b/tests/unit/session/test_read_gbq_table.py index 5afd6bb135..52c86cd1e4 100644 --- a/tests/unit/session/test_read_gbq_table.py +++ b/tests/unit/session/test_read_gbq_table.py @@ -15,8 +15,11 @@ """Unit tests for read_gbq_table helper functions.""" import datetime +import unittest.mock as mock +import google.cloud.bigquery import google.cloud.bigquery as bigquery +import pytest import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table @@ -45,3 +48,71 @@ def test_get_ibis_time_travel_table_doesnt_timetravel_anonymous_datasets(): # Need fully-qualified table name. assert "my-test-project" in sql + + +@pytest.mark.parametrize( + ("index_cols", "primary_keys", "values_distinct", "expected"), + ( + (["col1", "col2"], ["col1", "col2", "col3"], False, False), + (["col1", "col2", "col3"], ["col1", "col2", "col3"], True, True), + ( + ["col2", "col3", "col1"], + [ + "col3", + "col2", + ], + True, + True, + ), + (["col1", "col2"], [], False, False), + ([], ["col1", "col2", "col3"], False, False), + ([], [], False, False), + ), +) +def test_are_index_cols_unique(index_cols, primary_keys, values_distinct, expected): + """If a primary key is set on the table, we use that as the index column + by default, no error should be raised in this case. + + See internal issue 335727141. + """ + table = google.cloud.bigquery.Table.from_api_repr( + { + "tableReference": { + "projectId": "my-project", + "datasetId": "my_dataset", + "tableId": "my_table", + }, + "clustering": { + "fields": ["col1", "col2"], + }, + }, + ) + table.schema = ( + google.cloud.bigquery.SchemaField("col1", "INT64"), + google.cloud.bigquery.SchemaField("col2", "INT64"), + google.cloud.bigquery.SchemaField("col3", "INT64"), + google.cloud.bigquery.SchemaField("col4", "INT64"), + ) + + # TODO(b/305264153): use setter for table_constraints in client library + # when available. + table._properties["tableConstraints"] = { + "primaryKey": { + "columns": primary_keys, + }, + } + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + bqclient.project = "test-project" + bqclient.get_table.return_value = table + + bqclient.query_and_wait.return_value = ( + {"total_count": 3, "distinct_count": 3 if values_distinct else 2}, + ) + session = resources.create_bigquery_session( + bqclient=bqclient, table_schema=table.schema + ) + table._properties["location"] = session._location + + result = bf_read_gbq_table.are_index_cols_unique(bqclient, table, index_cols, "") + + assert result == expected