From c308b76f0e18c13cd37298602f18a479de96c1d5 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 24 Jul 2024 21:10:15 +0000 Subject: [PATCH 1/5] feat: Support to_csv/parquet/json to local files/objects --- bigframes/core/utils.py | 4 ++ bigframes/dataframe.py | 50 +++++++++----- bigframes/series.py | 39 ++++++++--- tests/system/small/test_dataframe.py | 67 ++++++++++++++++++- tests/system/small/test_series.py | 37 ++++++++++ .../bigframes_vendored/pandas/core/frame.py | 14 ++-- .../bigframes_vendored/pandas/core/generic.py | 41 +++++++----- 7 files changed, 202 insertions(+), 50 deletions(-) diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index 97c5ef03e5..43c05c6c83 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -23,6 +23,10 @@ UNNAMED_INDEX_ID = "bigframes_unnamed_index" +def is_gcs_path(value) -> typing_extensions.TypeGuard[str]: + return isinstance(value, str) and value.startswith("gs://") + + def get_axis_number(axis: typing.Union[str, int]) -> typing.Literal[0, 1]: if axis in {0, "index", "rows"}: return 0 diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2a3aead80a..1cfa2e0155 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2950,15 +2950,21 @@ def from_records( ) def to_csv( - self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True - ) -> None: + self, + path_or_buf=None, + sep=",", + *, + header: bool = True, + index: bool = True, + ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size - if not path_or_buf.startswith("gs://"): - raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS) + if not utils.is_gcs_path(path_or_buf): + pd_df = self.to_pandas() + return pd_df.to_csv(path_or_buf, sep=sep, header=header, index=index) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -2975,22 +2981,28 @@ def to_csv( export_data_statement, api_name="dataframe-to_csv" ) self._set_internal_query_job(query_job) + return None def to_json( self, - path_or_buf: str, - orient: Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", + path_or_buf=None, + orient: Optional[ + Literal["split", "records", "index", "columns", "values", "table"] + ] = None, *, lines: bool = False, index: bool = True, - ) -> None: + ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? - # TODO(chelsealin): Support local file paths. - if not path_or_buf.startswith("gs://"): - raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS) - + if not utils.is_gcs_path(path_or_buf): + pd_df = self.to_pandas() + return pd_df.to_json( + path_or_buf, + orient=orient, + lines=lines, + index=index, + default_handler=str, + ) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -3019,6 +3031,7 @@ def to_json( export_data_statement, api_name="dataframe-to_json" ) self._set_internal_query_job(query_job) + return None def to_gbq( self, @@ -3117,19 +3130,19 @@ def __array__(self, dtype=None) -> numpy.ndarray: def to_parquet( self, - path: str, + path=None, *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, - ) -> None: + ) -> Optional[bytes]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. # TODO(swast): Some warning that wildcard is recommended for large # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size - if not path.startswith("gs://"): - raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS) - + if not utils.is_gcs_path(path): + pd_df = self.to_pandas() + return pd_df.to_parquet(path, compression=compression, index=index) if "*" not in path: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -3153,6 +3166,7 @@ def to_parquet( export_data_statement, api_name="dataframe-to_parquet" ) self._set_internal_query_job(query_job) + return None def to_dict( self, diff --git a/bigframes/series.py b/bigframes/series.py index 8fdafe25e7..5b73582e05 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1652,9 +1652,22 @@ def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame: return bigframes.dataframe.DataFrame(block) def to_csv( - self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True - ) -> None: - return self.to_frame().to_csv(path_or_buf, sep=sep, header=header, index=index) + self, + path_or_buf, + sep=",", + *, + header: bool = True, + index: bool = True, + ) -> Optional[str]: + if utils.is_gcs_path(path_or_buf): + return self.to_frame().to_csv( + path_or_buf, sep=sep, header=header, index=index + ) + else: + pd_series = self.to_pandas() + return pd_series.to_csv( + path_or_buf=path_or_buf, sep=sep, header=header, index=index + ) def to_dict(self, into: type[dict] = dict) -> typing.Mapping: return typing.cast(dict, self.to_pandas().to_dict(into)) # type: ignore @@ -1664,17 +1677,21 @@ def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: def to_json( self, - path_or_buf: str, - orient: typing.Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", + path_or_buf, + orient: Optional[typing.Literal["split", "records", "index", "table"]] = None, *, lines: bool = False, index: bool = True, - ) -> None: - return self.to_frame().to_json( - path_or_buf=path_or_buf, orient=orient, lines=lines, index=index - ) + ) -> Optional[str]: + if utils.is_gcs_path(path_or_buf): + return self.to_frame().to_json( + path_or_buf=path_or_buf, orient=orient, lines=lines, index=index + ) + else: + pd_series = self.to_pandas() + return pd_series.to_json( + path_or_buf=path_or_buf, orient=orient, lines=lines, index=index + ) def to_latex( self, buf=None, columns=None, header=True, index=True, **kwargs diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 67792b3a1d..5f33f1cd27 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4125,6 +4125,71 @@ def test_df_to_latex(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +def test_df_to_json_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.to_json() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.to_json(default_handler=str) + + assert bf_result == pd_result + + +def test_df_to_json_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.to_json(bf_result_file, orient="table") + # default_handler for arrow types that have no default conversion + scalars_pandas_df_index.to_json( + pd_result_file, orient="table", default_handler=str + ) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + +def test_df_to_csv_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.to_csv() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.to_csv() + + assert bf_result == pd_result + + +def test_df_to_csv_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.to_csv(bf_result_file) + scalars_pandas_df_index.to_csv(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + +def test_df_to_parquet_local_bytes(scalars_df_index, scalars_pandas_df_index): + # GEOGRAPHY not supported in parquet export. + unsupported = ["geography_col"] + + bf_result = scalars_df_index.drop(columns=unsupported).to_parquet() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.drop(columns=unsupported).to_parquet() + + assert bf_result == pd_result + + +def test_df_to_parquet_local_file(scalars_df_index, scalars_pandas_df_index): + # GEOGRAPHY not supported in parquet export. + unsupported = ["geography_col"] + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.drop(columns=unsupported).to_parquet(bf_result_file) + scalars_pandas_df_index.drop(columns=unsupported).to_parquet(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + def test_df_to_records(scalars_df_index, scalars_pandas_df_index): unsupported = ["numeric_col"] bf_result = scalars_df_index.drop(columns=unsupported).to_records() @@ -4166,7 +4231,7 @@ def test_df_to_pickle(scalars_df_index, scalars_pandas_df_index): scalars_df_index.to_pickle(bf_result_file) scalars_pandas_df_index.to_pickle(pd_result_file) bf_result = bf_result_file.read() - pd_result = bf_result_file.read() + pd_result = pd_result_file.read() assert bf_result == pd_result diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 10fcec63ce..7c48fb8ad7 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2753,6 +2753,43 @@ def test_to_latex(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +def test_series_to_json_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.int64_col.to_json() + pd_result = scalars_pandas_df_index.int64_col.to_json() + + assert bf_result == pd_result + + +def test_series_to_json_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.int64_col.to_json(bf_result_file) + scalars_pandas_df_index.int64_col.to_json(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + +def test_series_to_csv_local_str(scalars_df_index, scalars_pandas_df_index): + bf_result = scalars_df_index.int64_col.to_csv() + # default_handler for arrow types that have no default conversion + pd_result = scalars_pandas_df_index.int64_col.to_csv() + + assert bf_result == pd_result + + +def test_series_to_csv_local_file(scalars_df_index, scalars_pandas_df_index): + with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: + scalars_df_index.int64_col.to_csv(bf_result_file) + scalars_pandas_df_index.int64_col.to_csv(pd_result_file) + + bf_result = bf_result_file.read() + pd_result = pd_result_file.read() + + assert bf_result == pd_result + + def test_to_dict(scalars_df_index, scalars_pandas_df_index): bf_result = scalars_df_index["int64_too"].to_dict() diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index f8088f8060..7048d9c6dd 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -476,11 +476,11 @@ def to_gbq( def to_parquet( self, - path: str, + path: Optional[str], *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, - ) -> None: + ) -> Optional[bytes]: """Write a DataFrame to the binary Parquet format. This function writes the dataframe as a `parquet file @@ -496,9 +496,13 @@ def to_parquet( >>> df.to_parquet(path=gcs_bucket) Args: - path (str): + path (str, path object, file-like object, or None, default None): + String, path object (implementing ``os.PathLike[str]``), or file-like + object implementing a binary ``write()`` function. If None, the result is + returned as bytes. If a string or path, it will be used as Root Directory + path when writing a partitioned dataset. Destination URI(s) of Cloud Storage files(s) to store the extracted dataframe - in format of ``gs:///``. + should be formatted ``gs:///``. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files varies. @@ -511,7 +515,7 @@ def to_parquet( If ``False``, they will not be written to the file. Returns: - None. + bytes if no path argument is provided else None """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 95302e51b2..6734fb6aa9 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -210,14 +210,14 @@ def empty(self) -> bool: def to_json( self, - path_or_buf: str, - orient: Literal[ - "split", "records", "index", "columns", "values", "table" - ] = "columns", + path_or_buf, + orient: Optional[ + Literal["split", "records", "index", "columns", "values", "table"] + ] = None, *, index: bool = True, lines: bool = False, - ) -> None: + ) -> Optional[str]: """Convert the object to a JSON string, written to Cloud Storage. Note NaN's and None will be converted to null and datetime objects @@ -227,16 +227,18 @@ def to_json( Only ``orient='records'`` and ``lines=True`` is supported so far. Args: - path_or_buf (str): - A destination URI of Cloud Storage files(s) to store the extracted + path_or_buf (str, path object, file-like object, or None, default None): + String, path object (implementing os.PathLike[str]), or file-like + object implementing a write() function. If None, the result is + returned as a string. + + Can be a destination URI of Cloud Storage files(s) to store the extracted dataframe in format of ``gs:///``. Must contain a wildcard `*` character. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files varies. - - None, file-like objects or local file paths not yet supported. orient ({`split`, `records`, `index`, `columns`, `values`, `table`}, default 'columns): Indication of expected JSON string format. @@ -271,17 +273,25 @@ def to_json( list-like. Returns: - None: String output not yet supported. + None or str: If path_or_buf is None, returns the resulting json format as a + string. Otherwise returns None. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_csv(self, path_or_buf: str, *, index: bool = True) -> None: + def to_csv(self, path_or_buf, *, index: bool = True) -> Optional[str]: """Write object to a comma-separated values (csv) file on Cloud Storage. Args: - path_or_buf (str): - A destination URI of Cloud Storage files(s) to store the extracted dataframe - in format of ``gs:///``. + path_or_buf (str, path object, file-like object, or None, default None): + String, path object (implementing os.PathLike[str]), or file-like + object implementing a write() function. If None, the result is + returned as a string. If a non-binary file object is passed, it should + be opened with `newline=''`, disabling universal newlines. If a binary + file object is passed, `mode` might need to contain a `'b'`. + + Alternatively, a destination URI of Cloud Storage files(s) to store the + extracted dataframe in format of + ``gs:///``. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files @@ -293,7 +303,8 @@ def to_csv(self, path_or_buf: str, *, index: bool = True) -> None: If True, write row names (index). Returns: - None: String output not yet supported. + None or str: If path_or_buf is None, returns the resulting json format as a + string. Otherwise returns None. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 523426ecc125538e5367230e40a96e0dca58a5f3 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 25 Jul 2024 17:24:05 +0000 Subject: [PATCH 2/5] add default path None for series to_json/csv --- bigframes/series.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index 5b73582e05..b902776ccd 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1653,7 +1653,7 @@ def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame: def to_csv( self, - path_or_buf, + path_or_buf=None, sep=",", *, header: bool = True, @@ -1677,7 +1677,7 @@ def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: def to_json( self, - path_or_buf, + path_or_buf=None, orient: Optional[typing.Literal["split", "records", "index", "table"]] = None, *, lines: bool = False, From 8c7a7cb38734eb85663a6ed76a8b832e876c57c5 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 26 Jul 2024 23:02:11 +0000 Subject: [PATCH 3/5] fix to_json type annotation --- bigframes/series.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index b902776ccd..3f78a6a9cc 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1678,7 +1678,9 @@ def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: def to_json( self, path_or_buf=None, - orient: Optional[typing.Literal["split", "records", "index", "table"]] = None, + orient: Optional[ + typing.Literal["split", "records", "index", "columns", "values", "table"] + ] = None, *, lines: bool = False, index: bool = True, @@ -1690,7 +1692,7 @@ def to_json( else: pd_series = self.to_pandas() return pd_series.to_json( - path_or_buf=path_or_buf, orient=orient, lines=lines, index=index + path_or_buf=path_or_buf, orient=orient, lines=lines, index=index # type: ignore ) def to_latex( From ea86de5d50be2bb7a1f11d260b945ba28be4616e Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 29 Jul 2024 19:48:58 +0000 Subject: [PATCH 4/5] skip to_json_local_file for legacy pandas --- tests/system/small/test_series.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 7c48fb8ad7..fe6e001797 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2760,6 +2760,7 @@ def test_series_to_json_local_str(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +@skip_legacy_pandas def test_series_to_json_local_file(scalars_df_index, scalars_pandas_df_index): with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: scalars_df_index.int64_col.to_json(bf_result_file) From 99defbd5807087125e76c86678c6561ecbcfa5a1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 30 Jul 2024 00:41:16 +0000 Subject: [PATCH 5/5] no legacy pandas for test_df_to_json_local_file --- tests/system/small/test_dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 5f33f1cd27..3a7eff621f 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4133,6 +4133,7 @@ def test_df_to_json_local_str(scalars_df_index, scalars_pandas_df_index): assert bf_result == pd_result +@skip_legacy_pandas def test_df_to_json_local_file(scalars_df_index, scalars_pandas_df_index): with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file: scalars_df_index.to_json(bf_result_file, orient="table")