-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Closed
Labels
Description
Expected Behavior
feature_store.get_historical_features(df, features=fs_columns).to_df()
where feature_store is a feature store with file offline store and fs_columns is a list of column names, and df is a Pandas data frame, should work.
Current Behavior
It currently raises an error inside of dask:
E NotImplementedError: dd.DataFrame.apply only supports axis=1
E Try: df.apply(func, axis=1)
Stacktrace:
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/offline_store.py:81: in to_df
features_df = self._to_df_internal()
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/usage.py:280: in wrapper
raise exc.with_traceback(traceback)
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/usage.py:269: in wrapper
return func(*args, **kwargs)
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/file.py:75: in _to_df_internal
df = self.evaluation_function().compute()
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/file.py:231: in evaluate_historical_retrieval
df_to_join = _normalize_timestamp(
../../.cache/pypoetry/virtualenvs/w3-search-letor-SCEBvDm1-py3.9/lib/python3.9/site-packages/feast/infra/offline_stores/file.py:530: in _normalize_timestamp
df_to_join[timestamp_field] = df_to_join[timestamp_field].apply(
Steps to reproduce
Here is my feature store definition:
from feast import FeatureStore, RepoConfig, FileSource, FeatureView, ValueType, Entity, Feature
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from google.protobuf.duration_pb2 import Duration
source_path = tmp_path / "source.parquet"
timestamp = datetime.datetime(year=2022, month=4, day=29, tzinfo=datetime.timezone.utc)
df = pd.DataFrame(
{
"entity": [0, 1, 2, 3, 4],
"f1": [1.0, 1.1, 1.2, 1.3, 1.4],
"f2": ["a", "b", "c", "d", "e"],
"timestamp": [
timestamp,
# this one should not be fetched as it is too far into the past
timestamp - datetime.timedelta(days=2),
timestamp,
timestamp,
timestamp,
],
}
)
df.to_parquet(source_path)
source = FileSource(
path=str(source_path),
event_timestamp_column="timestamp",
created_timestamp_column="timestamp",
)
entity = Entity(
name="entity",
value_type=ValueType.INT64,
description="Entity",
)
view = FeatureView(
name="view",
entities=["entity"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="f1", dtype=ValueType.FLOAT),
Feature(name="f2", dtype=ValueType.STRING),
],
online=True,
batch_source=source,
tags={},
)
config = RepoConfig(
registry=str(tmp_path / "registry.db"),
project="hello",
provider="local",
offline_store=FileOfflineStoreConfig(),
)
store = FeatureStore(config=config)
store.apply([entity, view])
expected = pd.DataFrame(
{
"event_timestamp": timestamp,
"entity": [0, 1, 2, 3, 5],
"someval": [0.0, 0.1, 0.2, 0.3, 0.5],
"f1": [1.0, np.nan, 1.2, 1.3, np.nan],
"f2": ["a", np.nan, "c", "d", np.nan],
}
)Specifications
- Version: 0.21.3
- Platform: Linux
- Subsystem: Python 3.9
Possible Solution
This works fine in at least version 0.18.1, but I think it fails for any >0.20
It might have something to do with adding Dask requirement, maybe the version is insufficient? I used to use 2022.2 before, but the requirement is now for 2022.1.1. But this is just a guess, really.