Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,22 +304,25 @@ def online_write_batch(
for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip(
keys, prev_event_timestamps, data
):
event_time_seconds = int(utils.make_tzaware(timestamp).timestamp())

# ignore if event_timestamp is before the event features that are currently in the feature store
# Convert incoming timestamp to millisecond-aware datetime
aware_ts = utils.make_tzaware(timestamp)
# Build protobuf timestamp with nanos
ts = Timestamp()
ts.FromDatetime(aware_ts)
# New timestamp in nanoseconds
new_total_nanos = ts.seconds * 1_000_000_000 + ts.nanos
# Compare against existing timestamp (nanosecond precision)
if prev_event_time:
prev_ts = Timestamp()
prev_ts.ParseFromString(prev_event_time)
if prev_ts.seconds and event_time_seconds <= prev_ts.seconds:
# TODO: somehow signal that it's not overwriting the current record?
prev_total_nanos = prev_ts.seconds * 1_000_000_000 + prev_ts.nanos
# Skip only if older OR exact same instant
if prev_total_nanos and new_total_nanos <= prev_total_nanos:
if progress:
progress(1)
continue

ts = Timestamp()
ts.seconds = event_time_seconds
entity_hset = dict()
entity_hset[ts_key] = ts.SerializeToString()
# Store full timestamp (seconds + nanos)
entity_hset = {ts_key: ts.SerializeToString()}

for feature_name, val in values.items():
f_key = _mmh3(f"{feature_view}:{feature_name}")
Expand Down Expand Up @@ -456,5 +459,7 @@ def _get_features_for_entity(
if not res:
return None, None
else:
timestamp = datetime.fromtimestamp(res_ts.seconds, tz=timezone.utc)
# reconstruct full timestamp including nanos
total_seconds = res_ts.seconds + res_ts.nanos / 1_000_000_000.0
timestamp = datetime.fromtimestamp(total_seconds, tz=timezone.utc)
return timestamp, res
Loading