Skip to content

get_historical_features fails with dask error for file offline store #2865

@elshize

Description

@elshize

Expected Behavior

feature_store.get_historical_features(df, features=fs_columns).to_df()

where feature_store is a feature store with file offline store and fs_columns is a list of column names, and df is a Pandas data frame, should work.

Current Behavior

It currently raises an error inside of dask:

E           NotImplementedError: dd.DataFrame.apply only supports axis=1
E             Try: df.apply(func, axis=1)

Stacktrace:

../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/offline_store.py:81: in to_df
    features_df = self._to_df_internal()
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/usage.py:280: in wrapper
    raise exc.with_traceback(traceback)
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/usage.py:269: in wrapper
    return func(*args, **kwargs)
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/file.py:75: in _to_df_internal
    df = self.evaluation_function().compute()
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/file.py:231: in evaluate_historical_retrieval
    df_to_join = _normalize_timestamp(
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/file.py:530: in _normalize_timestamp
    df_to_join[timestamp_field] = df_to_join[timestamp_field].apply(

Steps to reproduce

Here is my feature store definition:

from feast import FeatureStore, RepoConfig, FileSource, FeatureView, ValueType, Entity, Feature
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from google.protobuf.duration_pb2 import Duration

source_path = tmp_path / "source.parquet"
timestamp = datetime.datetime(year=2022, month=4, day=29, tzinfo=datetime.timezone.utc)
df = pd.DataFrame(
    {
        "entity": [0, 1, 2, 3, 4],
        "f1": [1.0, 1.1, 1.2, 1.3, 1.4],
        "f2": ["a", "b", "c", "d", "e"],
        "timestamp": [
            timestamp,
            # this one should not be fetched as it is too far into the past
            timestamp - datetime.timedelta(days=2),
            timestamp,
            timestamp,
            timestamp,
        ],
    }
)
df.to_parquet(source_path)
source = FileSource(
    path=str(source_path),
    event_timestamp_column="timestamp",
    created_timestamp_column="timestamp",
)
entity = Entity(
    name="entity",
    value_type=ValueType.INT64,
    description="Entity",
)

view = FeatureView(
    name="view",
    entities=["entity"],
    ttl=Duration(seconds=86400 * 1),
    features=[
        Feature(name="f1", dtype=ValueType.FLOAT),
        Feature(name="f2", dtype=ValueType.STRING),
    ],
    online=True,
    batch_source=source,
    tags={},
)

config = RepoConfig(
    registry=str(tmp_path / "registry.db"),
    project="hello",
    provider="local",
    offline_store=FileOfflineStoreConfig(),
)

store = FeatureStore(config=config)
store.apply([entity, view])

expected = pd.DataFrame(
    {
        "event_timestamp": timestamp,
        "entity": [0, 1, 2, 3, 5],
        "someval": [0.0, 0.1, 0.2, 0.3, 0.5],
        "f1": [1.0, np.nan, 1.2, 1.3, np.nan],
        "f2": ["a", np.nan, "c", "d", np.nan],
    }
)

Specifications

  • Version: 0.21.3
  • Platform: Linux
  • Subsystem: Python 3.9

Possible Solution

This works fine in at least version 0.18.1, but I think it fails for any >0.20

It might have something to do with adding Dask requirement, maybe the version is insufficient? I used to use 2022.2 before, but the requirement is now for 2022.1.1. But this is just a guess, really.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions