Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
5 changes: 5 additions & 0 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import google.cloud.bigquery
import pandas as pd

from bigframes.core import utils
import bigframes.core.compile.compiled as compiled
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.explode
Expand Down Expand Up @@ -173,6 +174,10 @@ def compile_readlocal(self, node: nodes.ReadLocalNode):
io.BytesIO(node.feather_bytes),
columns=[item.source_id for item in node.scan_list.items],
)

# Convert timedeltas to microseconds for compatibility with BigQuery
_ = utils.replace_timedeltas_with_micros(array_as_pd)

offsets = node.offsets_col.sql if node.offsets_col else None
return compiled.UnorderedIR.from_pandas(
array_as_pd, node.scan_list, offsets=offsets
Expand Down
3 changes: 3 additions & 0 deletions bigframes/core/local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType:
if pa.types.is_time64(type):
# This is potentially lossy, but BigFrames doesn't support ns
return pa.time64("us")
if pa.types.is_duration(type):
# This is potentially lossy, but BigFrames doesn't support ns
return pa.duration("us")
if pa.types.is_decimal128(type):
return pa.decimal128(38, 9)
if pa.types.is_decimal256(type):
Expand Down
8 changes: 6 additions & 2 deletions bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ class ArraySchema:
items: typing.Tuple[SchemaItem, ...]

@classmethod
def from_bq_table(cls, table: google.cloud.bigquery.Table):
def from_bq_table(
cls,
table: google.cloud.bigquery.Table,
column_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {},
):
items = tuple(
SchemaItem(name, dtype)
SchemaItem(name, column_type_overrides.get(name, dtype))
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
table.schema
).items()
Expand Down
27 changes: 27 additions & 0 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
import pandas as pd
import pandas.api.types as pdtypes
import typing_extensions

import bigframes.exceptions as bfe
Expand Down Expand Up @@ -184,3 +185,29 @@ def wrapper(*args, **kwargs):
return wrapper

return decorator


def timedelta_to_micros(td: pd.Timedelta) -> int:
# td.value returns total nanoseconds.
return td.value // 1000


def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]:
"""
Replaces in-place timedeltas to integer values in microseconds. Nanosecond part is ignored.

Returns:
The names of updated columns
"""
updated_columns = []

for col in dataframe.columns:
if pdtypes.is_timedelta64_dtype(dataframe[col].dtype):
dataframe[col] = dataframe[col].apply(timedelta_to_micros)
updated_columns.append(col)

if pdtypes.is_timedelta64_dtype(dataframe.index.dtype):
dataframe.index = dataframe.index.map(timedelta_to_micros)
updated_columns.append(dataframe.index.name)

return updated_columns
5 changes: 5 additions & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,16 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype:
def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
if arrow_dtype in _ARROW_TO_BIGFRAMES:
return _ARROW_TO_BIGFRAMES[arrow_dtype]

if pa.types.is_list(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

if pa.types.is_struct(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

if pa.types.is_duration(arrow_dtype):
return pd.ArrowDtype(arrow_dtype)

# BigFrames doesn't distinguish between string and large_string because the
# largest string (2 GB) is already larger than the largest BigQuery row.
if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype):
Expand Down
1 change: 1 addition & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
pandas.ArrowDtype(pa.timestamp("us", tz="UTC")),
pandas.ArrowDtype(pa.decimal128(38, 9)),
pandas.ArrowDtype(pa.decimal256(76, 38)),
pandas.ArrowDtype(pa.duration("us")),
)


Expand Down
6 changes: 5 additions & 1 deletion bigframes/session/_io/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

import dataclasses
from typing import Collection, Union
from typing import Collection, List, Union

import bigframes_vendored.constants as constants
import db_dtypes # type: ignore
Expand All @@ -38,6 +38,7 @@ class DataFrameAndLabels:
column_labels: Collection
index_labels: Collection
ordering_col: str
timedelta_cols: List[str]
Comment thread
sycai marked this conversation as resolved.


def _arrow_to_pandas_arrowdtype(
Expand Down Expand Up @@ -163,9 +164,12 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL
pandas_dataframe_copy.columns = pandas.Index(new_col_ids)
pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0])

timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy)

return DataFrameAndLabels(
df=pandas_dataframe_copy,
column_labels=col_labels,
index_labels=idx_labels,
ordering_col=ordering_col,
timedelta_cols=timedelta_cols,
)
18 changes: 15 additions & 3 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,16 @@ def read_pandas_load_job(
self._start_generic_job(load_job)

destination_table = self._bqclient.get_table(load_table_destination)
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
col: bigframes.dtypes.TIMEDETLA_DTYPE
for col in df_and_labels.timedelta_cols
}
array_value = core.ArrayValue.from_table(
table=destination_table,
# TODO: Generate this directly from original pandas df.
schema=schemata.ArraySchema.from_bq_table(destination_table),
# TODO (b/394156190): Generate this directly from original pandas df.
schema=schemata.ArraySchema.from_bq_table(
destination_table, col_type_overrides
),
session=self._session,
offsets_col=ordering_col,
).drop_columns([ordering_col])
Expand Down Expand Up @@ -229,10 +235,16 @@ def read_pandas_streaming(
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
)

col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
col: bigframes.dtypes.TIMEDETLA_DTYPE
for col in df_and_labels.timedelta_cols
}
array_value = (
core.ArrayValue.from_table(
table=destination_table,
schema=schemata.ArraySchema.from_bq_table(destination_table),
schema=schemata.ArraySchema.from_bq_table(
destination_table, col_type_overrides
),
session=self._session,
# Don't set the offsets column because we want to group by it.
)
Expand Down
56 changes: 56 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,62 @@ def test_read_pandas_tokyo(
assert len(expected) == result.total_rows


@pytest.mark.parametrize(
"write_engine",
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
)
def test_read_pandas_timedelta_dataframes(session, write_engine):
expected_df = pd.DataFrame({"my_col": pd.to_timedelta([1, 2, 3], unit="d")})

actual_result = (
session.read_pandas(expected_df, write_engine=write_engine)
.to_pandas()
.astype("timedelta64[ns]")
)

if write_engine == "bigquery_streaming":
expected_df.index = pd.Index([pd.NA] * 3, dtype="Int64")
pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False)


@pytest.mark.parametrize(
"write_engine",
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
)
def test_read_pandas_timedelta_series(session, write_engine):
expected_series = pd.Series(pd.to_timedelta([1, 2, 3], unit="d"))

actual_result = (
session.read_pandas(expected_series, write_engine=write_engine)
.to_pandas()
.astype("timedelta64[ns]")
)

if write_engine == "bigquery_streaming":
expected_series.index = pd.Index([pd.NA] * 3, dtype="Int64")
pd.testing.assert_series_equal(
actual_result, expected_series, check_index_type=False
)


@pytest.mark.parametrize(
"write_engine",
["default", "bigquery_inline", "bigquery_load"],
)
def test_read_pandas_timedelta_index(session, write_engine):
expected_index = pd.to_timedelta(
[1, 2, 3], unit="d"
) # to_timedelta returns an index

actual_result = (
session.read_pandas(expected_index, write_engine=write_engine)
.to_pandas()
.astype("timedelta64[ns]")
)

pd.testing.assert_index_equal(actual_result, expected_index)


@utils.skip_legacy_pandas
@pytest.mark.parametrize(
("write_engine",),
Expand Down