From 02efba23bd43375f841ade7fba1cd5607776e10a Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 12 Feb 2025 19:26:00 +0000 Subject: [PATCH 1/3] feat: JSON dtype support for read_pandas and Series constructor --- bigframes/core/compile/compiler.py | 15 +++++- bigframes/core/utils.py | 22 ++++++++ bigframes/dtypes.py | 1 + bigframes/session/_io/pandas.py | 3 ++ bigframes/session/loader.py | 14 ++++-- tests/system/small/bigquery/test_json.py | 64 ++++++++++++------------ tests/system/small/test_dataframe_io.py | 15 ++++++ tests/system/small/test_series.py | 22 +++++++- 8 files changed, 117 insertions(+), 39 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 0b508b01ab..35323d55d7 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -20,6 +20,7 @@ import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery import bigframes_vendored.ibis.expr.api as ibis_api +import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes import bigframes_vendored.ibis.expr.types as ibis_types import google.cloud.bigquery import pandas as pd @@ -29,7 +30,6 @@ import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.explode import bigframes.core.compile.ibis_types -import bigframes.core.compile.scalar_op_compiler import bigframes.core.compile.scalar_op_compiler as compile_scalar import bigframes.core.compile.schema_translator import bigframes.core.expression as ex @@ -37,6 +37,7 @@ import bigframes.core.nodes as nodes import bigframes.core.ordering as bf_ordering import bigframes.core.rewrite as rewrites +import bigframes.dtypes if typing.TYPE_CHECKING: import bigframes.core @@ -224,6 +225,18 @@ def compile_read_table_unordered( ibis_table = self.read_table_as_unordered_ibis( source, scan_cols=[col.source_id for col in scan.items] ) + + # TODO(b/395912450): Remove workaround solution once b/374784249 got resolved. + for scan_item in scan.items: + if ( + scan_item.dtype == bigframes.dtypes.JSON_DTYPE + and ibis_table[scan_item.source_id].type() == ibis_dtypes.string + ): + json_column = compile_scalar.parse_json( + ibis_table[scan_item.source_id] + ).name(scan_item.source_id) + ibis_table = ibis_table.mutate(json_column) + return compiled.UnorderedIR( ibis_table, tuple( diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 0198f12537..502a40d92d 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -24,6 +24,7 @@ import pandas.api.types as pdtypes import typing_extensions +import bigframes.dtypes as dtypes import bigframes.exceptions as bfe UNNAMED_COLUMN_ID = "bigframes_unnamed_column" @@ -226,3 +227,24 @@ def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]: updated_columns.append(dataframe.index.name) return updated_columns + + +def replace_json_with_string(dataframe: pd.DataFrame) -> List[str]: + """ + Due to a BigQuery IO limitation with loading JSON from Parquet files (b/374784249), + we're using a workaround: storing JSON as strings and then parsing them into JSON + objects. + TODO(b/395912450): Remove workaround solution once b/374784249 got resolved. + """ + updated_columns = [] + + for col in dataframe.columns: + if dataframe[col].dtype == dtypes.JSON_DTYPE: + dataframe[col] = dataframe[col].astype(dtypes.STRING_DTYPE) + updated_columns.append(col) + + if dataframe.index.dtype == dtypes.JSON_DTYPE: + dataframe.index = dataframe.index.astype(dtypes.STRING_DTYPE) + updated_columns.append(dataframe.index.name) + + return updated_columns diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index eed45e1dde..9083e13bc8 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -301,6 +301,7 @@ def is_object_like(type_: Union[ExpressionType, str]) -> bool: return type_ in ("object", "O") or ( getattr(type_, "kind", None) == "O" and getattr(type_, "storage", None) != "pyarrow" + and getattr(type_, "name", None) != "dbjson" ) diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index 532a909430..cc4fdafe57 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -39,6 +39,7 @@ class DataFrameAndLabels: index_labels: Collection ordering_col: str timedelta_cols: List[str] + json_cols: List[str] def _arrow_to_pandas_arrowdtype( @@ -165,6 +166,7 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0]) timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy) + json_cols = utils.replace_json_with_string(pandas_dataframe_copy) return DataFrameAndLabels( df=pandas_dataframe_copy, @@ -172,4 +174,5 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL index_labels=idx_labels, ordering_col=ordering_col, timedelta_cols=timedelta_cols, + json_cols=json_cols, ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index b7550583e5..e58a1ececd 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -177,8 +177,11 @@ def read_pandas_load_job( destination_table = self._bqclient.get_table(load_table_destination) col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { - col: bigframes.dtypes.TIMEDELTA_DTYPE - for col in df_and_labels.timedelta_cols + **{ + col: bigframes.dtypes.TIMEDELTA_DTYPE + for col in df_and_labels.timedelta_cols + }, + **{col: bigframes.dtypes.JSON_DTYPE for col in df_and_labels.json_cols}, } array_value = core.ArrayValue.from_table( table=destination_table, @@ -236,8 +239,11 @@ def read_pandas_streaming( ) col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { - col: bigframes.dtypes.TIMEDELTA_DTYPE - for col in df_and_labels.timedelta_cols + **{ + col: bigframes.dtypes.TIMEDELTA_DTYPE + for col in df_and_labels.timedelta_cols + }, + **{col: bigframes.dtypes.JSON_DTYPE for col in df_and_labels.json_cols}, } array_value = ( core.ArrayValue.from_table( diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index 8f97856eea..492c0cf9b6 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json - +import db_dtypes # type: ignore import geopandas as gpd # type: ignore import pandas as pd import pyarrow as pa @@ -24,19 +23,6 @@ import bigframes.pandas as bpd -def _get_series_from_json(json_data): - # Note: converts None to sql "null" and not to json none. - values = [ - f"JSON '{json.dumps(data)}'" if data is not None else "NULL" - for data in json_data - ] - sql = " UNION ALL ".join( - [f"SELECT {id} AS id, {value} AS data" for id, value in enumerate(values)] - ) - df = bpd.read_gbq(sql).set_index("id").sort_index() - return df["data"] - - @pytest.mark.parametrize( ("json_path", "expected_json"), [ @@ -45,10 +31,11 @@ def _get_series_from_json(json_data): ], ) def test_json_set_at_json_path(json_path, expected_json): - s = _get_series_from_json([{"a": {"b": {"c": "tester", "d": []}}}]) + original_json = [{"a": {"b": {"c": "tester", "d": []}}}] + s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype()) actual = bbq.json_set(s, json_path_value_pairs=[(json_path, 10)]) - expected = _get_series_from_json(expected_json) + expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype()) pd.testing.assert_series_equal( actual.to_pandas(), expected.to_pandas(), @@ -65,10 +52,11 @@ def test_json_set_at_json_path(json_path, expected_json): ], ) def test_json_set_at_json_value_type(json_value, expected_json): - s = _get_series_from_json([{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}]) + original_json = [{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}] + s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype()) actual = bbq.json_set(s, json_path_value_pairs=[("$.a.b", json_value)]) - expected = _get_series_from_json(expected_json) + expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype()) pd.testing.assert_series_equal( actual.to_pandas(), expected.to_pandas(), @@ -76,13 +64,14 @@ def test_json_set_at_json_value_type(json_value, expected_json): def test_json_set_w_more_pairs(): - s = _get_series_from_json([{"a": 2}, {"b": 5}, {"c": 1}]) + original_json = [{"a": 2}, {"b": 5}, {"c": 1}] + s = bpd.Series(original_json, dtype=db_dtypes.JSONDtype()) actual = bbq.json_set( s, json_path_value_pairs=[("$.a", 1), ("$.b", 2), ("$.a", [3, 4, 5])] ) - expected = _get_series_from_json( - [{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}] - ) + + expected_json = [{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}] + expected = bpd.Series(expected_json, dtype=db_dtypes.JSONDtype()) pd.testing.assert_series_equal( actual.to_pandas(), expected.to_pandas(), @@ -90,16 +79,16 @@ def test_json_set_w_more_pairs(): def test_json_set_w_invalid_json_path_value_pairs(): + s = bpd.Series([{"a": 10}], dtype=db_dtypes.JSONDtype()) with pytest.raises(ValueError): - bbq.json_set( - _get_series_from_json([{"a": 10}]), json_path_value_pairs=[("$.a", 1, 100)] # type: ignore - ) + bbq.json_set(s, json_path_value_pairs=[("$.a", 1, 100)]) # type: ignore def test_json_set_w_invalid_value_type(): + s = bpd.Series([{"a": 10}], dtype=db_dtypes.JSONDtype()) with pytest.raises(TypeError): bbq.json_set( - _get_series_from_json([{"a": 10}]), + s, json_path_value_pairs=[ ( "$.a", @@ -117,9 +106,12 @@ def test_json_set_w_invalid_series_type(): def test_json_extract_from_json(): - s = _get_series_from_json([{"a": {"b": [1, 2]}}, {"a": {"c": 1}}, {"a": {"b": 0}}]) + s = bpd.Series( + [{"a": {"b": [1, 2]}}, {"a": {"c": 1}}, {"a": {"b": 0}}], + dtype=db_dtypes.JSONDtype(), + ) actual = bbq.json_extract(s, "$.a.b").to_pandas() - expected = _get_series_from_json([[1, 2], None, 0]).to_pandas() + expected = bpd.Series([[1, 2], None, 0], dtype=db_dtypes.JSONDtype()).to_pandas() pd.testing.assert_series_equal( actual, expected, @@ -127,9 +119,12 @@ def test_json_extract_from_json(): def test_json_extract_from_string(): - s = bpd.Series(['{"a": {"b": [1, 2]}}', '{"a": {"c": 1}}', '{"a": {"b": 0}}']) + s = bpd.Series( + ['{"a": {"b": [1, 2]}}', '{"a": {"c": 1}}', '{"a": {"b": 0}}'], + dtype=pd.StringDtype(storage="pyarrow"), + ) actual = bbq.json_extract(s, "$.a.b") - expected = bpd.Series(["[1,2]", None, "0"]) + expected = bpd.Series(["[1,2]", None, "0"], dtype=pd.StringDtype(storage="pyarrow")) pd.testing.assert_series_equal( actual.to_pandas(), expected.to_pandas(), @@ -142,8 +137,9 @@ def test_json_extract_w_invalid_series_type(): def test_json_extract_array_from_json(): - s = _get_series_from_json( - [{"a": ["ab", "2", "3 xy"]}, {"a": []}, {"a": ["4", "5"]}, {}] + s = bpd.Series( + [{"a": ["ab", "2", "3 xy"]}, {"a": []}, {"a": ["4", "5"]}, {}], + dtype=db_dtypes.JSONDtype(), ) actual = bbq.json_extract_array(s, "$.a") @@ -160,6 +156,8 @@ def test_json_extract_array_from_json(): """ df = bpd.read_gbq(sql).set_index("id").sort_index() expected = df["data"] + expected.index.name = None + expected.name = None pd.testing.assert_series_equal( actual.to_pandas(), diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 10637b2395..f84ee811a3 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -658,6 +658,21 @@ def test_to_gbq_w_invalid_destination_table(scalars_df_index): scalars_df_index.to_gbq("table_id") +def test_to_gbq_w_json(bigquery_client): + """Test the `to_gbq` API can get a JSON column.""" + s1 = bpd.Series([1, 2, 3, 4]) + s2 = bpd.Series( + ["a", 1, False, ["a", {"b": 1}], {"c": [1, 2, 3]}], dtype=db_dtypes.JSONDtype() + ) + + df = bpd.DataFrame({"id": s1, "json_col": s2}) + destination_table = df.to_gbq() + table = bigquery_client.get_table(destination_table) + + assert table.schema[1].name == "json_col" + assert table.schema[1].field_type == "JSON" + + @pytest.mark.parametrize( ("index"), [True, False], diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index cdda7c753d..3d3baff21a 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -237,7 +237,7 @@ def test_series_construct_geodata(): pytest.param(pd.StringDtype(storage="pyarrow"), id="string"), ], ) -def test_series_construct_w_dtype_for_int(dtype): +def test_series_construct_w_dtype(dtype): data = [1, 2, 3] expected = pd.Series(data, dtype=dtype) expected.index = expected.index.astype("Int64") @@ -302,6 +302,26 @@ def test_series_construct_w_dtype_for_array_struct(): ) +def test_series_construct_w_dtype_for_json(): + data = [ + 1, + "str", + False, + ["a", {"b": 1}, None], + None, + {"a": {"b": [1, 2, 3], "c": True}}, + ] + s = bigframes.pandas.Series(data, dtype=db_dtypes.JSONDtype()) + + assert s[0] == 1 + assert s[1] == "str" + assert s[2] is False + assert s[3][0] == "a" + assert s[3][1]["b"] == 1 + assert pd.isna(s[4]) + assert s[5]["a"] == {"b": [1, 2, 3], "c": True} + + def test_series_keys(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df["int64_col"].keys().to_pandas() From 1088d635054f4c1beaaa6b5324b4eba591d214dc Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 12 Feb 2025 20:27:36 +0000 Subject: [PATCH 2/3] add read_pandas tests --- tests/system/small/test_session.py | 72 ++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index a4acb72117..0c8da52774 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -22,6 +22,7 @@ import warnings import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq +import db_dtypes # type: ignore import google import google.cloud.bigquery as bigquery import numpy as np @@ -747,6 +748,77 @@ def test_read_pandas_timedelta_index(session, write_engine): pd.testing.assert_index_equal(actual_result, expected_index) +@pytest.mark.parametrize( + ("write_engine"), + [ + pytest.param("default"), + pytest.param("bigquery_load"), + pytest.param("bigquery_streaming"), + pytest.param("bigquery_inline", marks=pytest.mark.xfail(raises=ValueError)), + ], +) +def test_read_pandas_json_dataframes(session, write_engine): + json_data = [ + 1, + None, + ["1", "3", "5"], + {"a": 1, "b": ["x", "y"], "c": {"z": False, "x": []}}, + ] + expected_df = pd.DataFrame( + {"my_col": pd.Series(json_data, dtype=db_dtypes.JSONDtype())} + ) + + actual_result = session.read_pandas( + expected_df, write_engine=write_engine + ).to_pandas() + + if write_engine == "bigquery_streaming": + expected_df.index = pd.Index([pd.NA] * 4, dtype="Int64") + pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False) + + +@pytest.mark.parametrize( + "write_engine", + ["default", "bigquery_load"], +) +def test_read_pandas_json_series(session, write_engine): + json_data = [ + 1, + None, + ["1", "3", "5"], + {"a": 1, "b": ["x", "y"], "c": {"z": False, "x": []}}, + ] + expected_series = pd.Series(json_data, dtype=db_dtypes.JSONDtype()) + + actual_result = session.read_pandas( + expected_series, write_engine=write_engine + ).to_pandas() + pd.testing.assert_series_equal( + actual_result, expected_series, check_index_type=False + ) + + +@pytest.mark.parametrize( + ("write_engine"), + [ + pytest.param("default"), + pytest.param("bigquery_load"), + ], +) +def test_read_pandas_json_index(session, write_engine): + json_data = [ + 1, + None, + ["1", "3", "5"], + {"a": 1, "b": ["x", "y"], "c": {"z": False, "x": []}}, + ] + expected_index = pd.Index(json_data, dtype=db_dtypes.JSONDtype()) + actual_result = session.read_pandas( + expected_index, write_engine=write_engine + ).to_pandas() + pd.testing.assert_index_equal(actual_result, expected_index) + + @utils.skip_legacy_pandas @pytest.mark.parametrize( ("write_engine",), From 593e35b271889c8dad6deec213884c7652324559 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Wed, 12 Feb 2025 23:01:09 +0000 Subject: [PATCH 3/3] address comments --- bigframes/core/compile/compiler.py | 4 ++-- bigframes/session/_io/pandas.py | 13 ++++++++----- bigframes/session/loader.py | 19 ++----------------- 3 files changed, 12 insertions(+), 24 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 35323d55d7..ed95dacf74 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -25,6 +25,7 @@ import google.cloud.bigquery import pandas as pd +from bigframes import dtypes from bigframes.core import utils import bigframes.core.compile.compiled as compiled import bigframes.core.compile.concat as concat_impl @@ -37,7 +38,6 @@ import bigframes.core.nodes as nodes import bigframes.core.ordering as bf_ordering import bigframes.core.rewrite as rewrites -import bigframes.dtypes if typing.TYPE_CHECKING: import bigframes.core @@ -229,7 +229,7 @@ def compile_read_table_unordered( # TODO(b/395912450): Remove workaround solution once b/374784249 got resolved. for scan_item in scan.items: if ( - scan_item.dtype == bigframes.dtypes.JSON_DTYPE + scan_item.dtype == dtypes.JSON_DTYPE and ibis_table[scan_item.source_id].type() == ibis_dtypes.string ): json_column = compile_scalar.parse_json( diff --git a/bigframes/session/_io/pandas.py b/bigframes/session/_io/pandas.py index cc4fdafe57..a1549238b3 100644 --- a/bigframes/session/_io/pandas.py +++ b/bigframes/session/_io/pandas.py @@ -14,7 +14,8 @@ from __future__ import annotations import dataclasses -from typing import Collection, List, Union +import typing +from typing import Collection, Union import bigframes_vendored.constants as constants import db_dtypes # type: ignore @@ -38,8 +39,7 @@ class DataFrameAndLabels: column_labels: Collection index_labels: Collection ordering_col: str - timedelta_cols: List[str] - json_cols: List[str] + col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] def _arrow_to_pandas_arrowdtype( @@ -167,12 +167,15 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy) json_cols = utils.replace_json_with_string(pandas_dataframe_copy) + col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { + **{col: bigframes.dtypes.TIMEDELTA_DTYPE for col in timedelta_cols}, + **{col: bigframes.dtypes.JSON_DTYPE for col in json_cols}, + } return DataFrameAndLabels( df=pandas_dataframe_copy, column_labels=col_labels, index_labels=idx_labels, ordering_col=ordering_col, - timedelta_cols=timedelta_cols, - json_cols=json_cols, + col_type_overrides=col_type_overrides, ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index e58a1ececd..7204a14870 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -176,18 +176,11 @@ def read_pandas_load_job( self._start_generic_job(load_job) destination_table = self._bqclient.get_table(load_table_destination) - col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { - **{ - col: bigframes.dtypes.TIMEDELTA_DTYPE - for col in df_and_labels.timedelta_cols - }, - **{col: bigframes.dtypes.JSON_DTYPE for col in df_and_labels.json_cols}, - } array_value = core.ArrayValue.from_table( table=destination_table, # TODO (b/394156190): Generate this directly from original pandas df. schema=schemata.ArraySchema.from_bq_table( - destination_table, col_type_overrides + destination_table, df_and_labels.col_type_overrides ), session=self._session, offsets_col=ordering_col, @@ -237,19 +230,11 @@ def read_pandas_streaming( raise ValueError( f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}" ) - - col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = { - **{ - col: bigframes.dtypes.TIMEDELTA_DTYPE - for col in df_and_labels.timedelta_cols - }, - **{col: bigframes.dtypes.JSON_DTYPE for col in df_and_labels.json_cols}, - } array_value = ( core.ArrayValue.from_table( table=destination_table, schema=schemata.ArraySchema.from_bq_table( - destination_table, col_type_overrides + destination_table, df_and_labels.col_type_overrides ), session=self._session, # Don't set the offsets column because we want to group by it.