diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 99405dc2ca0..d39a2e3a16c 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -95,6 +95,7 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig): metric_type: Optional[str] = "COSINE" embedding_dim: Optional[int] = 128 vector_enabled: Optional[bool] = True + text_search_enabled: Optional[bool] = False nlist: Optional[int] = 128 username: Optional[StrictStr] = "" password: Optional[StrictStr] = "" @@ -492,7 +493,19 @@ def retrieve_online_documents_v2( Optional[Dict[str, ValueProto]], ] ]: - assert embedding is not None, "Key Word Search not yet implemented for Milvus" + """ + Retrieve documents using vector similarity search or keyword search in Milvus. + Args: + config: Feast configuration object + table: FeatureView object as the table to search + requested_features: List of requested features to retrieve + embedding: Query embedding to search for (optional) + top_k: Number of items to return + distance_metric: Distance metric to use (optional) + query_string: The query string to search for using keyword search (optional) + Returns: + List of tuples containing the event timestamp, entity key, and feature values + """ entity_name_feast_primitive_type_map = { k.name: k.dtype for k in table.entity_columns } @@ -502,10 +515,8 @@ def retrieve_online_documents_v2( if not config.online_store.vector_enabled: raise ValueError("Vector search is not enabled in the online store config") - search_params = { - "metric_type": distance_metric or config.online_store.metric_type, - "params": {"nprobe": 10}, - } + if embedding is None and query_string is None: + raise ValueError("Either embedding or query_string must be provided") composite_key_name = _get_composite_key_name(table) @@ -520,25 +531,118 @@ def retrieve_online_documents_v2( ), ( f"field(s) [{[field for field in output_fields if field not in [f['name'] for f in collection['fields']]]}] not found in collection schema" ) - # Note we choose the first vector field as the field to search on. Not ideal but it's something. + + # Find the vector search field if we need it ann_search_field = None - for field in collection["fields"]: - if ( - field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR] - and field["name"] in output_fields - ): - ann_search_field = field["name"] - break + if embedding is not None: + for field in collection["fields"]: + if ( + field["type"] in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR] + and field["name"] in output_fields + ): + ann_search_field = field["name"] + break self.client.load_collection(collection_name) - results = self.client.search( - collection_name=collection_name, - data=[embedding], - anns_field=ann_search_field, - search_params=search_params, - limit=top_k, - output_fields=output_fields, - ) + + if ( + embedding is not None + and query_string is not None + and config.online_store.vector_enabled + ): + string_field_list = [ + f.name + for f in table.features + if isinstance(f.dtype, PrimitiveFeastType) + and f.dtype.to_value_type() == ValueType.STRING + ] + + if not string_field_list: + raise ValueError( + "No string fields found in the feature view for text search in hybrid mode" + ) + + # Create a filter expression for text search + filter_expressions = [] + for field in string_field_list: + if field in output_fields: + filter_expressions.append(f"{field} LIKE '%{query_string}%'") + + # Combine filter expressions with OR + filter_expr = " OR ".join(filter_expressions) if filter_expressions else "" + + # Vector search with text filter + search_params = { + "metric_type": distance_metric or config.online_store.metric_type, + "params": {"nprobe": 10}, + } + + # For hybrid search, use filter parameter instead of expr + results = self.client.search( + collection_name=collection_name, + data=[embedding], + anns_field=ann_search_field, + search_params=search_params, + limit=top_k, + output_fields=output_fields, + filter=filter_expr if filter_expr else None, + ) + + elif embedding is not None and config.online_store.vector_enabled: + # Vector search only + search_params = { + "metric_type": distance_metric or config.online_store.metric_type, + "params": {"nprobe": 10}, + } + + results = self.client.search( + collection_name=collection_name, + data=[embedding], + anns_field=ann_search_field, + search_params=search_params, + limit=top_k, + output_fields=output_fields, + ) + + elif query_string is not None: + string_field_list = [ + f.name + for f in table.features + if isinstance(f.dtype, PrimitiveFeastType) + and f.dtype.to_value_type() == ValueType.STRING + ] + + if not string_field_list: + raise ValueError( + "No string fields found in the feature view for text search" + ) + + filter_expressions = [] + for field in string_field_list: + if field in output_fields: + filter_expressions.append(f"{field} LIKE '%{query_string}%'") + + filter_expr = " OR ".join(filter_expressions) + + if not filter_expr: + raise ValueError( + "No text fields found in requested features for search" + ) + + query_results = self.client.query( + collection_name=collection_name, + filter=filter_expr, + output_fields=output_fields, + limit=top_k, + ) + + results = [ + [{"entity": entity, "distance": -1.0}] for entity in query_results + ] + else: + raise ValueError( + "Either vector_enabled must be True for embedding search or query_string must be provided for keyword search" + ) result_list = [] for hits in results: @@ -559,7 +663,7 @@ def retrieve_online_documents_v2( # entity_key_proto = None if field in ["created_ts", "event_ts"]: res_ts = datetime.fromtimestamp(field_value / 1e6) - elif field == ann_search_field: + elif field == ann_search_field and embedding is not None: serialized_embedding = _serialize_vector_to_float_list( embedding ) diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index e7fca47bb55..409a729ceee 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -1484,3 +1484,178 @@ def test_milvus_native_from_feast_data() -> None: # Clean up the collection client.drop_collection(collection_name=COLLECTION_NAME) + + +def test_milvus_keyword_search() -> None: + """ + Test retrieving documents from the Milvus online store using keyword search. + """ + random.seed(42) + n = 10 # number of samples + vector_length = 10 + runner = CliRunner() + with runner.local_repo( + example_repo_py=get_example_repo("example_rag_feature_repo.py"), + offline_store="file", + online_store="milvus", + apply=False, + teardown=False, + ) as store: + from datetime import timedelta + + from feast import Entity, FeatureView, Field, FileSource + from feast.types import Array, Float32, Int64, String, UnixTimestamp + + rag_documents_source = FileSource( + path="data/embedded_documents.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + ) + + item = Entity( + name="item_id", + join_keys=["item_id"], + value_type=ValueType.INT64, + ) + author = Entity( + name="author_id", + join_keys=["author_id"], + value_type=ValueType.STRING, + ) + + document_embeddings = FeatureView( + name="text_documents", + entities=[item, author], + schema=[ + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="COSINE", + ), + Field(name="item_id", dtype=Int64), + Field(name="author_id", dtype=String), + Field(name="content", dtype=String), + Field(name="title", dtype=String), + Field(name="created_timestamp", dtype=UnixTimestamp), + Field(name="event_timestamp", dtype=UnixTimestamp), + ], + source=rag_documents_source, + ttl=timedelta(hours=24), + ) + + store.apply([rag_documents_source, item, document_embeddings]) + + # Write some data with specific text content for keyword search + document_embeddings_fv = store.get_feature_view(name="text_documents") + provider = store._get_provider() + + contents = [ + "Feast is an open source feature store for machine learning", + "Feature stores solve the problem of coordinating features for training and serving", + "Milvus is a vector database that can be used with Feast", + "Keyword search uses BM25 algorithm for relevance ranking", + "Vector search uses embeddings for semantic similarity", + "Python is a popular programming language for machine learning", + "Feast supports multiple storage backends for online and offline use cases", + "Online stores are used for low-latency feature serving", + "Offline stores are used for batch feature retrieval during training", + "Feast enables data scientists to define, manage, and share features", + ] + + titles = [ + "Introduction to Feast", + "Feature Store Benefits", + "Using Milvus with Feast", + "Keyword Search Fundamentals", + "Vector Search Overview", + "Python for ML", + "Feast Storage Options", + "Online Serving with Feast", + "Offline Training Support", + "Feast for Data Scientists", + ] + + item_keys = [ + EntityKeyProto( + join_keys=["item_id", "author_id"], + entity_values=[ + ValueProto(int64_val=i), + ValueProto(string_val=f"author_{i}"), + ], + ) + for i in range(n) + ] + data = [] + for i, item_key in enumerate(item_keys): + data.append( + ( + item_key, + { + "vector": ValueProto( + float_list_val=FloatListProto( + val=np.random.random(vector_length) + ) + ), + "content": ValueProto(string_val=contents[i]), + "title": ValueProto(string_val=titles[i]), + }, + _utc_now(), + _utc_now(), + ) + ) + + provider.online_write_batch( + config=store.config, + table=document_embeddings_fv, + data=data, + progress=None, + ) + + # Test keyword search for "Milvus" + result_milvus = store.retrieve_online_documents_v2( + features=[ + "text_documents:content", + "text_documents:title", + ], + query_string="Milvus", + top_k=3, + ).to_dict() + + # Verify that documents containing "Milvus" are returned + assert len(result_milvus["content"]) > 0 + assert any("Milvus" in content for content in result_milvus["content"]) + + # Test keyword search for "machine learning" + result_ml = store.retrieve_online_documents_v2( + features=[ + "text_documents:content", + "text_documents:title", + ], + query_string="machine learning", + top_k=3, + ).to_dict() + + # Verify that documents containing "machine learning" are returned + assert len(result_ml["content"]) > 0 + assert any( + "machine learning" in content.lower() for content in result_ml["content"] + ) + + # Test hybrid search (vector + keyword) + query_embedding = np.random.random(vector_length).tolist() + result_hybrid = store.retrieve_online_documents_v2( + features=[ + "text_documents:content", + "text_documents:title", + "text_documents:vector", + ], + query=query_embedding, + query_string="Feast", + top_k=3, + ).to_dict() + + # Verify hybrid search results + assert len(result_hybrid["content"]) > 0 + assert any("Feast" in content for content in result_hybrid["content"]) + assert len(result_hybrid["vector"]) > 0