diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 2e34687d6c7..c06c3398519 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -82,6 +82,7 @@ * [Type System](reference/type-system.md) * [Data sources](reference/data-sources/README.md) * [Overview](reference/data-sources/overview.md) + * [Table formats](reference/data-sources/table-formats.md) * [File](reference/data-sources/file.md) * [Snowflake](reference/data-sources/snowflake.md) * [BigQuery](reference/data-sources/bigquery.md) diff --git a/docs/reference/data-sources/spark.md b/docs/reference/data-sources/spark.md index 99d5902667a..8967e8bd181 100644 --- a/docs/reference/data-sources/spark.md +++ b/docs/reference/data-sources/spark.md @@ -4,6 +4,8 @@ Spark data sources are tables or files that can be loaded from some Spark store (e.g. Hive or in-memory). They can also be specified by a SQL query. +**New in Feast:** SparkSource now supports advanced table formats including **Apache Iceberg**, **Delta Lake**, and **Apache Hudi**, enabling ACID transactions, time travel, and schema evolution capabilities. See the [Table Formats guide](table-formats.md) for detailed documentation. + ## Disclaimer The Spark data source does not achieve full test coverage. @@ -11,6 +13,8 @@ Please do not assume complete stability. ## Examples +### Basic Examples + Using a table reference from SparkSession (for example, either in-memory or a Hive Metastore): ```python @@ -51,8 +55,77 @@ my_spark_source = SparkSource( ) ``` +### Table Format Examples + +SparkSource supports advanced table formats for modern data lakehouse architectures. For detailed documentation, configuration options, and best practices, see the **[Table Formats guide](table-formats.md)**. + +#### Apache Iceberg + +```python +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource +from feast.table_format import IcebergFormat + +iceberg_format = IcebergFormat( + catalog="my_catalog", + namespace="my_database" +) + +my_spark_source = SparkSource( + name="user_features", + path="my_catalog.my_database.user_table", + table_format=iceberg_format, + timestamp_field="event_timestamp" +) +``` + +#### Delta Lake + +```python +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource +from feast.table_format import DeltaFormat + +delta_format = DeltaFormat() + +my_spark_source = SparkSource( + name="transaction_features", + path="s3://my-bucket/delta-tables/transactions", + table_format=delta_format, + timestamp_field="transaction_timestamp" +) +``` + +#### Apache Hudi + +```python +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource +from feast.table_format import HudiFormat + +hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="user_id", + precombine_field="updated_at" +) + +my_spark_source = SparkSource( + name="user_profiles", + path="s3://my-bucket/hudi-tables/user_profiles", + table_format=hudi_format, + timestamp_field="event_timestamp" +) +``` + +For advanced configuration including time travel, incremental queries, and performance tuning, see the **[Table Formats guide](table-formats.md)**. + +## Configuration Options + The full set of configuration options is available [here](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource). +### Table Format Options + +- **IcebergFormat**: See [Table Formats - Iceberg](table-formats.md#apache-iceberg) +- **DeltaFormat**: See [Table Formats - Delta Lake](table-formats.md#delta-lake) +- **HudiFormat**: See [Table Formats - Hudi](table-formats.md#apache-hudi) + ## Supported Types Spark data sources support all eight primitive types and their corresponding array types. diff --git a/docs/reference/data-sources/table-formats.md b/docs/reference/data-sources/table-formats.md new file mode 100644 index 00000000000..f00e6f80c5f --- /dev/null +++ b/docs/reference/data-sources/table-formats.md @@ -0,0 +1,357 @@ +# Table Formats + +## Overview + +Table formats are metadata and transaction layers built on top of data storage formats (like Parquet). They provide advanced capabilities for managing large-scale data lakes, including ACID transactions, time travel, schema evolution, and efficient data management. + +Feast supports modern table formats to enable data lakehouse architectures with your feature store. + +## Supported Table Formats + +### Apache Iceberg + +[Apache Iceberg](https://iceberg.apache.org/) is an open table format designed for huge analytic datasets. It provides: +- **ACID transactions**: Atomic commits with snapshot isolation +- **Time travel**: Query data as of any snapshot +- **Schema evolution**: Add, drop, rename, or reorder columns safely +- **Hidden partitioning**: Partitioning is transparent to users +- **Performance**: Advanced pruning and filtering + +#### Basic Configuration + +```python +from feast.table_format import IcebergFormat + +iceberg_format = IcebergFormat( + catalog="my_catalog", + namespace="my_database" +) +``` + +#### Configuration Options + +| Parameter | Type | Description | +|-----------|------|-------------| +| `catalog` | `str` (optional) | Iceberg catalog name | +| `namespace` | `str` (optional) | Namespace/schema within the catalog | +| `properties` | `dict` (optional) | Additional Iceberg configuration properties | + +#### Common Properties + +```python +iceberg_format = IcebergFormat( + catalog="spark_catalog", + namespace="production", + properties={ + # Snapshot selection + "snapshot-id": "123456789", + "as-of-timestamp": "1609459200000", # Unix timestamp in ms + + # Performance tuning + "read.split.target-size": "134217728", # 128 MB splits + "read.parquet.vectorization.enabled": "true", + + # Advanced configuration + "io-impl": "org.apache.iceberg.hadoop.HadoopFileIO", + "warehouse": "s3://my-bucket/warehouse" + } +) +``` + +#### Time Travel Example + +```python +# Read from a specific snapshot +iceberg_format = IcebergFormat( + catalog="spark_catalog", + namespace="lakehouse" +) +iceberg_format.set_property("snapshot-id", "7896524153287651133") + +# Or read as of a timestamp +iceberg_format.set_property("as-of-timestamp", "1609459200000") +``` + +### Delta Lake + +[Delta Lake](https://delta.io/) is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It provides: +- **ACID transactions**: Serializable isolation for reads and writes +- **Time travel**: Access and revert to earlier versions +- **Schema enforcement**: Prevent bad data from corrupting tables +- **Unified batch and streaming**: Process data incrementally +- **Audit history**: Full history of all changes + +#### Basic Configuration + +```python +from feast.table_format import DeltaFormat + +delta_format = DeltaFormat() +``` + +#### Configuration Options + +| Parameter | Type | Description | +|-----------|------|-------------| +| `checkpoint_location` | `str` (optional) | Location for Delta transaction log checkpoints | +| `properties` | `dict` (optional) | Additional Delta configuration properties | + +#### Common Properties + +```python +delta_format = DeltaFormat( + checkpoint_location="s3://my-bucket/checkpoints", + properties={ + # Time travel + "versionAsOf": "5", + "timestampAsOf": "2024-01-01 00:00:00", + + # Performance optimization + "delta.autoOptimize.optimizeWrite": "true", + "delta.autoOptimize.autoCompact": "true", + + # Data skipping + "delta.dataSkippingNumIndexedCols": "32", + + # Z-ordering + "delta.autoOptimize.zOrderCols": "event_timestamp" + } +) +``` + +#### Time Travel Example + +```python +# Read from a specific version +delta_format = DeltaFormat() +delta_format.set_property("versionAsOf", "10") + +# Or read as of a timestamp +delta_format = DeltaFormat() +delta_format.set_property("timestampAsOf", "2024-01-15 12:00:00") +``` + +### Apache Hudi + +[Apache Hudi](https://hudi.apache.org/) (Hadoop Upserts Deletes and Incrementals) is a data lake storage framework for simplifying incremental data processing. It provides: +- **Upserts and deletes**: Efficient record-level updates +- **Incremental queries**: Process only changed data +- **Time travel**: Query historical versions +- **Multiple table types**: Optimize for read vs. write workloads +- **Change data capture**: Track data changes over time + +#### Basic Configuration + +```python +from feast.table_format import HudiFormat + +hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="user_id", + precombine_field="updated_at" +) +``` + +#### Configuration Options + +| Parameter | Type | Description | +|-----------|------|-------------| +| `table_type` | `str` (optional) | `COPY_ON_WRITE` or `MERGE_ON_READ` | +| `record_key` | `str` (optional) | Field(s) that uniquely identify a record | +| `precombine_field` | `str` (optional) | Field used to determine the latest version | +| `properties` | `dict` (optional) | Additional Hudi configuration properties | + +#### Table Types + +**COPY_ON_WRITE (COW)** +- Stores data in columnar format (Parquet) +- Updates create new file versions +- Best for **read-heavy workloads** +- Lower query latency + +```python +hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="id", + precombine_field="timestamp" +) +``` + +**MERGE_ON_READ (MOR)** +- Uses columnar + row-based formats +- Updates written to delta logs +- Best for **write-heavy workloads** +- Lower write latency + +```python +hudi_format = HudiFormat( + table_type="MERGE_ON_READ", + record_key="id", + precombine_field="timestamp" +) +``` + +#### Common Properties + +```python +hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="user_id", + precombine_field="updated_at", + properties={ + # Query type + "hoodie.datasource.query.type": "snapshot", # or "incremental" + + # Incremental queries + "hoodie.datasource.read.begin.instanttime": "20240101000000", + "hoodie.datasource.read.end.instanttime": "20240102000000", + + # Indexing + "hoodie.index.type": "BLOOM", + + # Compaction (for MOR tables) + "hoodie.compact.inline": "true", + "hoodie.compact.inline.max.delta.commits": "5", + + # Clustering + "hoodie.clustering.inline": "true" + } +) +``` + +#### Incremental Query Example + +```python +# Process only new/changed data +hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="id", + precombine_field="timestamp", + properties={ + "hoodie.datasource.query.type": "incremental", + "hoodie.datasource.read.begin.instanttime": "20240101000000", + "hoodie.datasource.read.end.instanttime": "20240102000000" + } +) +``` + +## Table Format vs File Format + +It's important to understand the distinction: + +| Aspect | File Format | Table Format | +|--------|-------------|--------------| +| **What it is** | Physical encoding of data | Metadata and transaction layer | +| **Examples** | Parquet, Avro, ORC, CSV | Iceberg, Delta Lake, Hudi | +| **Handles** | Data serialization | ACID, versioning, schema evolution | +| **Layer** | Storage layer | Metadata layer | + +### Can be used together + +```python +# Table format (metadata layer) built on top of file format (storage layer) +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource +from feast.table_format import IcebergFormat + +iceberg = IcebergFormat(catalog="my_catalog", namespace="db") + +source = SparkSource( + name="features", + path="catalog.db.table", + file_format="parquet", # Underlying storage format + table_format=iceberg, # Table metadata format + timestamp_field="event_timestamp" +) +``` + +## Benefits of Table Formats + +### Reliability +- **ACID transactions**: Ensure data consistency across concurrent operations +- **Automatic retries**: Handle transient failures gracefully +- **Schema validation**: Prevent incompatible schema changes +- **Data quality**: Constraints and validation rules + +### Performance +- **Data skipping**: Read only relevant files based on metadata +- **Partition pruning**: Skip entire partitions based on predicates +- **Compaction**: Merge small files for better performance +- **Columnar pruning**: Read only necessary columns +- **Indexing**: Advanced indexing for fast lookups + +### Flexibility +- **Schema evolution**: Add, remove, or modify columns without rewriting data +- **Time travel**: Access historical data states for auditing or debugging +- **Incremental processing**: Process only changed data efficiently +- **Multiple readers/writers**: Concurrent access without conflicts + +## Choosing the Right Table Format + +| Use Case | Recommended Format | Why | +|----------|-------------------|-----| +| Large-scale analytics with frequent schema changes | **Iceberg** | Best schema evolution, hidden partitioning, mature ecosystem | +| Streaming + batch workloads | **Delta Lake** | Unified architecture, strong integration with Spark, good docs | +| CDC and upsert-heavy workloads | **Hudi** | Efficient record-level updates, incremental queries | +| Read-heavy analytics | **Iceberg or Delta** | Excellent query performance | +| Write-heavy transactional | **Hudi (MOR)** | Optimized for fast writes | +| Multi-engine support | **Iceberg** | Widest engine support (Spark, Flink, Trino, etc.) | + +## Best Practices + +### 1. Choose Appropriate Partitioning +```python +# Iceberg - hidden partitioning +iceberg_format.set_property("partition-spec", "days(event_timestamp)") + +# Delta - explicit partitioning in data source +# Hudi - configure via properties +hudi_format.set_property("hoodie.datasource.write.partitionpath.field", "date") +``` + +### 2. Enable Optimization Features +```python +# Delta auto-optimize +delta_format.set_property("delta.autoOptimize.optimizeWrite", "true") +delta_format.set_property("delta.autoOptimize.autoCompact", "true") + +# Hudi compaction +hudi_format.set_property("hoodie.compact.inline", "true") +``` + +### 3. Manage Table History +```python +# Regularly clean up old snapshots/versions +# For Iceberg: Use expire_snapshots() procedure +# For Delta: Use VACUUM command +# For Hudi: Configure retention policies +``` + +### 4. Monitor Metadata Size +- Table formats maintain metadata for all operations +- Monitor metadata size and clean up old versions +- Configure retention policies based on your needs + +### 5. Test Schema Evolution +```python +# Always test schema changes in non-production first +# Ensure backward compatibility +# Use proper migration procedures +``` + +## Data Source Support + +Currently, table formats are supported with: +- [Spark data source](spark.md) - Full support for Iceberg, Delta, and Hudi + +Future support planned for: +- BigQuery (Iceberg) +- Snowflake (Iceberg) +- Other data sources + +## See Also + +- [Spark Data Source](spark.md) +- [Apache Iceberg Documentation](https://iceberg.apache.org/docs/latest/) +- [Delta Lake Documentation](https://docs.delta.io/latest/index.html) +- [Apache Hudi Documentation](https://hudi.apache.org/docs/overview) +- [Python API Reference - TableFormat](https://rtd.feast.dev/en/master/#feast.table_format) \ No newline at end of file diff --git a/protos/feast/core/DataFormat.proto b/protos/feast/core/DataFormat.proto index 0a32089b0f3..98464fa3c2b 100644 --- a/protos/feast/core/DataFormat.proto +++ b/protos/feast/core/DataFormat.proto @@ -27,12 +27,59 @@ message FileFormat { // Defines options for the Parquet data format message ParquetFormat {} - // Defines options for delta data format - message DeltaFormat {} - oneof format { ParquetFormat parquet_format = 1; + // Deprecated: Delta Lake is a table format, not a file format. + // Use TableFormat.DeltaFormat instead for Delta Lake support. + TableFormat.DeltaFormat delta_format = 2 [deprecated = true]; + } +} + +message TableFormat { + // Defines options for Apache Iceberg table format + message IcebergFormat { + // Optional catalog name for the Iceberg table + string catalog = 1; + + // Optional namespace (schema/database) within the catalog + string namespace = 2; + + // Additional properties for Iceberg configuration + // Examples: warehouse location, snapshot-id, as-of-timestamp, etc. + map properties = 3; + } + + // Defines options for Delta Lake table format + message DeltaFormat { + // Optional checkpoint location for Delta transaction logs + string checkpoint_location = 1; + + // Additional properties for Delta configuration + // Examples: auto-optimize settings, vacuum settings, etc. + map properties = 2; + } + + // Defines options for Apache Hudi table format + message HudiFormat { + // Type of Hudi table (COPY_ON_WRITE or MERGE_ON_READ) + string table_type = 1; + + // Field(s) that uniquely identify a record + string record_key = 2; + + // Field used to determine the latest version of a record + string precombine_field = 3; + + // Additional properties for Hudi configuration + // Examples: compaction strategy, indexing options, etc. + map properties = 4; + } + + // Specifies the table format and format-specific options + oneof format { + IcebergFormat iceberg_format = 1; DeltaFormat delta_format = 2; + HudiFormat hudi_format = 3; } } diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index b27767f527c..b91296dca31 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -36,7 +36,7 @@ message DataSource { reserved 6 to 10; // Type of Data Source. - // Next available id: 12 + // Next available id: 13 enum SourceType { INVALID = 0; BATCH_FILE = 1; @@ -231,6 +231,9 @@ message DataSource { // Date Format of date partition column (e.g. %Y-%m-%d) string date_partition_column_format = 5; + + // Table Format (e.g. iceberg, delta, hudi) + TableFormat table_format = 6; } // Defines configuration for custom third-party data sources. diff --git a/sdk/python/feast/data_format.py b/sdk/python/feast/data_format.py index 301dfb81302..409c1500f88 100644 --- a/sdk/python/feast/data_format.py +++ b/sdk/python/feast/data_format.py @@ -17,6 +17,7 @@ from feast.protos.feast.core.DataFormat_pb2 import FileFormat as FileFormatProto from feast.protos.feast.core.DataFormat_pb2 import StreamFormat as StreamFormatProto +from feast.protos.feast.core.DataFormat_pb2 import TableFormat as TableFormatProto class FileFormat(ABC): @@ -70,11 +71,12 @@ def __str__(self): class DeltaFormat(FileFormat): """ - Defines delta data format + Defines delta data format (deprecated - use TableFormat.DeltaFormat instead) """ def to_proto(self): - return FileFormatProto(delta_format=FileFormatProto.DeltaFormat()) + # Reference TableFormat.DeltaFormat since DeltaFormat is now nested there + return FileFormatProto(delta_format=TableFormatProto.DeltaFormat()) def __str__(self): return "delta" diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index bd4fb1ac817..6f2af7054b4 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -14,17 +14,17 @@ ) from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage +from feast.table_format import TableFormat, table_format_from_proto from feast.type_map import spark_to_feast_value_type from feast.value_type import ValueType logger = logging.getLogger(__name__) -class SparkSourceFormat(Enum): +class SparkFileSourceFormat(Enum): csv = "csv" json = "json" parquet = "parquet" - delta = "delta" avro = "avro" @@ -42,6 +42,7 @@ def __init__( query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None, + table_format: Optional[TableFormat] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, description: Optional[str] = "", @@ -58,7 +59,9 @@ def __init__( table: The name of a Spark table. query: The query to be executed in Spark. path: The path to file data. - file_format: The format of the file data. + file_format: The underlying file format (parquet, avro, csv, json). + table_format: The table metadata format (iceberg, delta, hudi, etc.). + Optional and separate from file_format. created_timestamp_column: Timestamp column indicating when the row was created, used for deduplicating rows. field_mapping: A dictionary mapping of column names in this data @@ -70,7 +73,7 @@ def __init__( timestamp_field: Event timestamp field used for point-in-time joins of feature values. date_partition_column: The column to partition the data on for faster - retrieval. This is useful for large tables and will limit the number ofi + retrieval. This is useful for large tables and will limit the number of """ # If no name, use the table as the default name. if name is None and table is None: @@ -102,6 +105,7 @@ def __init__( path=path, file_format=file_format, date_partition_column_format=date_partition_column_format, + table_format=table_format, ) @property @@ -132,6 +136,13 @@ def file_format(self): """ return self.spark_options.file_format + @property + def table_format(self): + """ + Returns the table format of this feature data source. + """ + return self.spark_options.table_format + @property def date_partition_column_format(self): """ @@ -151,6 +162,7 @@ def from_proto(data_source: DataSourceProto) -> Any: query=spark_options.query, path=spark_options.path, file_format=spark_options.file_format, + table_format=spark_options.table_format, date_partition_column_format=spark_options.date_partition_column_format, date_partition_column=data_source.date_partition_column, timestamp_field=data_source.timestamp_field, @@ -219,7 +231,7 @@ def get_table_query_string(self) -> str: if spark_session is None: raise AssertionError("Could not find an active spark session.") try: - df = spark_session.read.format(self.file_format).load(self.path) + df = self._load_dataframe_from_path(spark_session) except Exception: logger.exception( "Spark read of file source failed.\n" + traceback.format_exc() @@ -230,6 +242,24 @@ def get_table_query_string(self) -> str: return f"`{tmp_table_name}`" + def _load_dataframe_from_path(self, spark_session): + """Load DataFrame from path, considering both file format and table format.""" + + if self.table_format is None: + # No table format specified, use standard file reading with file_format + return spark_session.read.format(self.file_format).load(self.path) + + # Build reader with table format and options + reader = spark_session.read.format(self.table_format.format_type.value) + + # Add table format specific options + for key, value in self.table_format.properties.items(): + reader = reader.option(key, value) + + # For catalog-based table formats like Iceberg, the path is actually a table name + # For file-based formats, it's still a file path + return reader.load(self.path) + def __eq__(self, other): base_eq = super().__eq__(other) if not base_eq: @@ -245,7 +275,7 @@ def __hash__(self): class SparkOptions: - allowed_formats = [format.value for format in SparkSourceFormat] + allowed_formats = [format.value for format in SparkFileSourceFormat] def __init__( self, @@ -254,6 +284,7 @@ def __init__( path: Optional[str], file_format: Optional[str], date_partition_column_format: Optional[str] = "%Y-%m-%d", + table_format: Optional[TableFormat] = None, ): # Check that only one of the ways to load a spark dataframe can be used. We have # to treat empty string and null the same due to proto (de)serialization. @@ -262,11 +293,14 @@ def __init__( "Exactly one of params(table, query, path) must be specified." ) if path: - if not file_format: + # If table_format is specified, file_format is optional (table format determines the reader) + # If no table_format, file_format is required for basic file reading + if not table_format and not file_format: raise ValueError( - "If 'path' is specified, then 'file_format' is required." + "If 'path' is specified without 'table_format', then 'file_format' is required." ) - if file_format not in self.allowed_formats: + # Only validate file_format if it's provided (it's optional with table_format) + if file_format and file_format not in self.allowed_formats: raise ValueError( f"'file_format' should be one of {self.allowed_formats}" ) @@ -276,6 +310,7 @@ def __init__( self._path = path self._file_format = file_format self._date_partition_column_format = date_partition_column_format + self._table_format = table_format @property def table(self): @@ -317,6 +352,14 @@ def date_partition_column_format(self): def date_partition_column_format(self, date_partition_column_format): self._date_partition_column_format = date_partition_column_format + @property + def table_format(self): + return self._table_format + + @table_format.setter + def table_format(self, table_format): + self._table_format = table_format + @classmethod def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions): """ @@ -326,12 +369,18 @@ def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions): Returns: Returns a SparkOptions object based on the spark_options protobuf """ + # Parse table_format if present + table_format = None + if spark_options_proto.HasField("table_format"): + table_format = table_format_from_proto(spark_options_proto.table_format) + spark_options = cls( table=spark_options_proto.table, query=spark_options_proto.query, path=spark_options_proto.path, file_format=spark_options_proto.file_format, date_partition_column_format=spark_options_proto.date_partition_column_format, + table_format=table_format, ) return spark_options @@ -342,6 +391,10 @@ def to_proto(self) -> DataSourceProto.SparkOptions: Returns: SparkOptionsProto protobuf """ + table_format_proto = None + if self.table_format: + table_format_proto = self.table_format.to_proto() + spark_options_proto = DataSourceProto.SparkOptions( table=self.table, query=self.query, @@ -350,6 +403,9 @@ def to_proto(self) -> DataSourceProto.SparkOptions: date_partition_column_format=self.date_partition_column_format, ) + if table_format_proto: + spark_options_proto.table_format.CopyFrom(table_format_proto) + return spark_options_proto @@ -364,12 +420,14 @@ def __init__( query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None, + table_format: Optional[TableFormat] = None, ): self.spark_options = SparkOptions( table=table, query=query, path=path, file_format=file_format, + table_format=table_format, ) @staticmethod @@ -380,6 +438,7 @@ def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: query=spark_options.query, path=spark_options.path, file_format=spark_options.file_format, + table_format=spark_options.table_format, ) def to_proto(self) -> SavedDatasetStorageProto: @@ -391,4 +450,5 @@ def to_data_source(self) -> DataSource: query=self.spark_options.query, path=self.spark_options.path, file_format=self.spark_options.file_format, + table_format=self.spark_options.table_format, ) diff --git a/sdk/python/feast/protos/feast/core/DataFormat_pb2.py b/sdk/python/feast/protos/feast/core/DataFormat_pb2.py index a3883dcec3b..b90958cb325 100644 --- a/sdk/python/feast/protos/feast/core/DataFormat_pb2.py +++ b/sdk/python/feast/protos/feast/core/DataFormat_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataFormat.proto\x12\nfeast.core\"\xb2\x01\n\nFileFormat\x12>\n\x0eparquet_format\x18\x01 \x01(\x0b\x32$.feast.core.FileFormat.ParquetFormatH\x00\x12:\n\x0c\x64\x65lta_format\x18\x02 \x01(\x0b\x32\".feast.core.FileFormat.DeltaFormatH\x00\x1a\x0f\n\rParquetFormat\x1a\r\n\x0b\x44\x65ltaFormatB\x08\n\x06\x66ormat\"\xb7\x02\n\x0cStreamFormat\x12:\n\x0b\x61vro_format\x18\x01 \x01(\x0b\x32#.feast.core.StreamFormat.AvroFormatH\x00\x12<\n\x0cproto_format\x18\x02 \x01(\x0b\x32$.feast.core.StreamFormat.ProtoFormatH\x00\x12:\n\x0bjson_format\x18\x03 \x01(\x0b\x32#.feast.core.StreamFormat.JsonFormatH\x00\x1a!\n\x0bProtoFormat\x12\x12\n\nclass_path\x18\x01 \x01(\t\x1a!\n\nAvroFormat\x12\x13\n\x0bschema_json\x18\x01 \x01(\t\x1a!\n\nJsonFormat\x12\x13\n\x0bschema_json\x18\x01 \x01(\tB\x08\n\x06\x66ormatBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taFormatProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataFormat.proto\x12\nfeast.core\"\xa8\x01\n\nFileFormat\x12>\n\x0eparquet_format\x18\x01 \x01(\x0b\x32$.feast.core.FileFormat.ParquetFormatH\x00\x12?\n\x0c\x64\x65lta_format\x18\x02 \x01(\x0b\x32#.feast.core.TableFormat.DeltaFormatB\x02\x18\x01H\x00\x1a\x0f\n\rParquetFormatB\x08\n\x06\x66ormat\"\xf9\x05\n\x0bTableFormat\x12?\n\x0eiceberg_format\x18\x01 \x01(\x0b\x32%.feast.core.TableFormat.IcebergFormatH\x00\x12;\n\x0c\x64\x65lta_format\x18\x02 \x01(\x0b\x32#.feast.core.TableFormat.DeltaFormatH\x00\x12\x39\n\x0bhudi_format\x18\x03 \x01(\x0b\x32\".feast.core.TableFormat.HudiFormatH\x00\x1a\xb1\x01\n\rIcebergFormat\x12\x0f\n\x07\x63\x61talog\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12I\n\nproperties\x18\x03 \x03(\x0b\x32\x35.feast.core.TableFormat.IcebergFormat.PropertiesEntry\x1a\x31\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xa6\x01\n\x0b\x44\x65ltaFormat\x12\x1b\n\x13\x63heckpoint_location\x18\x01 \x01(\t\x12G\n\nproperties\x18\x02 \x03(\x0b\x32\x33.feast.core.TableFormat.DeltaFormat.PropertiesEntry\x1a\x31\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xc9\x01\n\nHudiFormat\x12\x12\n\ntable_type\x18\x01 \x01(\t\x12\x12\n\nrecord_key\x18\x02 \x01(\t\x12\x18\n\x10precombine_field\x18\x03 \x01(\t\x12\x46\n\nproperties\x18\x04 \x03(\x0b\x32\x32.feast.core.TableFormat.HudiFormat.PropertiesEntry\x1a\x31\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x08\n\x06\x66ormat\"\xb7\x02\n\x0cStreamFormat\x12:\n\x0b\x61vro_format\x18\x01 \x01(\x0b\x32#.feast.core.StreamFormat.AvroFormatH\x00\x12<\n\x0cproto_format\x18\x02 \x01(\x0b\x32$.feast.core.StreamFormat.ProtoFormatH\x00\x12:\n\x0bjson_format\x18\x03 \x01(\x0b\x32#.feast.core.StreamFormat.JsonFormatH\x00\x1a!\n\x0bProtoFormat\x12\x12\n\nclass_path\x18\x01 \x01(\t\x1a!\n\nAvroFormat\x12\x13\n\x0bschema_json\x18\x01 \x01(\t\x1a!\n\nJsonFormat\x12\x13\n\x0bschema_json\x18\x01 \x01(\tB\x08\n\x06\x66ormatBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taFormatProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -22,18 +22,38 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'\n\020feast.proto.coreB\017DataFormatProtoZ/github.com/feast-dev/feast/go/protos/feast/core' + _globals['_FILEFORMAT'].fields_by_name['delta_format']._options = None + _globals['_FILEFORMAT'].fields_by_name['delta_format']._serialized_options = b'\030\001' + _globals['_TABLEFORMAT_ICEBERGFORMAT_PROPERTIESENTRY']._options = None + _globals['_TABLEFORMAT_ICEBERGFORMAT_PROPERTIESENTRY']._serialized_options = b'8\001' + _globals['_TABLEFORMAT_DELTAFORMAT_PROPERTIESENTRY']._options = None + _globals['_TABLEFORMAT_DELTAFORMAT_PROPERTIESENTRY']._serialized_options = b'8\001' + _globals['_TABLEFORMAT_HUDIFORMAT_PROPERTIESENTRY']._options = None + _globals['_TABLEFORMAT_HUDIFORMAT_PROPERTIESENTRY']._serialized_options = b'8\001' _globals['_FILEFORMAT']._serialized_start=44 - _globals['_FILEFORMAT']._serialized_end=222 - _globals['_FILEFORMAT_PARQUETFORMAT']._serialized_start=182 - _globals['_FILEFORMAT_PARQUETFORMAT']._serialized_end=197 - _globals['_FILEFORMAT_DELTAFORMAT']._serialized_start=199 - _globals['_FILEFORMAT_DELTAFORMAT']._serialized_end=212 - _globals['_STREAMFORMAT']._serialized_start=225 - _globals['_STREAMFORMAT']._serialized_end=536 - _globals['_STREAMFORMAT_PROTOFORMAT']._serialized_start=423 - _globals['_STREAMFORMAT_PROTOFORMAT']._serialized_end=456 - _globals['_STREAMFORMAT_AVROFORMAT']._serialized_start=458 - _globals['_STREAMFORMAT_AVROFORMAT']._serialized_end=491 - _globals['_STREAMFORMAT_JSONFORMAT']._serialized_start=493 - _globals['_STREAMFORMAT_JSONFORMAT']._serialized_end=526 + _globals['_FILEFORMAT']._serialized_end=212 + _globals['_FILEFORMAT_PARQUETFORMAT']._serialized_start=187 + _globals['_FILEFORMAT_PARQUETFORMAT']._serialized_end=202 + _globals['_TABLEFORMAT']._serialized_start=215 + _globals['_TABLEFORMAT']._serialized_end=976 + _globals['_TABLEFORMAT_ICEBERGFORMAT']._serialized_start=416 + _globals['_TABLEFORMAT_ICEBERGFORMAT']._serialized_end=593 + _globals['_TABLEFORMAT_ICEBERGFORMAT_PROPERTIESENTRY']._serialized_start=544 + _globals['_TABLEFORMAT_ICEBERGFORMAT_PROPERTIESENTRY']._serialized_end=593 + _globals['_TABLEFORMAT_DELTAFORMAT']._serialized_start=596 + _globals['_TABLEFORMAT_DELTAFORMAT']._serialized_end=762 + _globals['_TABLEFORMAT_DELTAFORMAT_PROPERTIESENTRY']._serialized_start=544 + _globals['_TABLEFORMAT_DELTAFORMAT_PROPERTIESENTRY']._serialized_end=593 + _globals['_TABLEFORMAT_HUDIFORMAT']._serialized_start=765 + _globals['_TABLEFORMAT_HUDIFORMAT']._serialized_end=966 + _globals['_TABLEFORMAT_HUDIFORMAT_PROPERTIESENTRY']._serialized_start=544 + _globals['_TABLEFORMAT_HUDIFORMAT_PROPERTIESENTRY']._serialized_end=593 + _globals['_STREAMFORMAT']._serialized_start=979 + _globals['_STREAMFORMAT']._serialized_end=1290 + _globals['_STREAMFORMAT_PROTOFORMAT']._serialized_start=1177 + _globals['_STREAMFORMAT_PROTOFORMAT']._serialized_end=1210 + _globals['_STREAMFORMAT_AVROFORMAT']._serialized_start=1212 + _globals['_STREAMFORMAT_AVROFORMAT']._serialized_end=1245 + _globals['_STREAMFORMAT_JSONFORMAT']._serialized_start=1247 + _globals['_STREAMFORMAT_JSONFORMAT']._serialized_end=1280 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/DataFormat_pb2.pyi b/sdk/python/feast/protos/feast/core/DataFormat_pb2.pyi index 1f904e9886a..193fb82a776 100644 --- a/sdk/python/feast/protos/feast/core/DataFormat_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/DataFormat_pb2.pyi @@ -17,7 +17,9 @@ See the License for the specific language governing permissions and limitations under the License. """ import builtins +import collections.abc import google.protobuf.descriptor +import google.protobuf.internal.containers import google.protobuf.message import sys @@ -42,32 +44,174 @@ class FileFormat(google.protobuf.message.Message): self, ) -> None: ... + PARQUET_FORMAT_FIELD_NUMBER: builtins.int + DELTA_FORMAT_FIELD_NUMBER: builtins.int + @property + def parquet_format(self) -> global___FileFormat.ParquetFormat: ... + @property + def delta_format(self) -> global___TableFormat.DeltaFormat: + """Deprecated: Delta Lake is a table format, not a file format. + Use TableFormat.DeltaFormat instead for Delta Lake support. + """ + def __init__( + self, + *, + parquet_format: global___FileFormat.ParquetFormat | None = ..., + delta_format: global___TableFormat.DeltaFormat | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["delta_format", b"delta_format", "format", b"format", "parquet_format", b"parquet_format"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["delta_format", b"delta_format", "format", b"format", "parquet_format", b"parquet_format"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["format", b"format"]) -> typing_extensions.Literal["parquet_format", "delta_format"] | None: ... + +global___FileFormat = FileFormat + +class TableFormat(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class IcebergFormat(google.protobuf.message.Message): + """Defines options for Apache Iceberg table format""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class PropertiesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + + CATALOG_FIELD_NUMBER: builtins.int + NAMESPACE_FIELD_NUMBER: builtins.int + PROPERTIES_FIELD_NUMBER: builtins.int + catalog: builtins.str + """Optional catalog name for the Iceberg table""" + namespace: builtins.str + """Optional namespace (schema/database) within the catalog""" + @property + def properties(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Additional properties for Iceberg configuration + Examples: warehouse location, snapshot-id, as-of-timestamp, etc. + """ + def __init__( + self, + *, + catalog: builtins.str = ..., + namespace: builtins.str = ..., + properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["catalog", b"catalog", "namespace", b"namespace", "properties", b"properties"]) -> None: ... + class DeltaFormat(google.protobuf.message.Message): - """Defines options for delta data format""" + """Defines options for Delta Lake table format""" DESCRIPTOR: google.protobuf.descriptor.Descriptor + class PropertiesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + + CHECKPOINT_LOCATION_FIELD_NUMBER: builtins.int + PROPERTIES_FIELD_NUMBER: builtins.int + checkpoint_location: builtins.str + """Optional checkpoint location for Delta transaction logs""" + @property + def properties(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Additional properties for Delta configuration + Examples: auto-optimize settings, vacuum settings, etc. + """ def __init__( self, + *, + checkpoint_location: builtins.str = ..., + properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["checkpoint_location", b"checkpoint_location", "properties", b"properties"]) -> None: ... - PARQUET_FORMAT_FIELD_NUMBER: builtins.int + class HudiFormat(google.protobuf.message.Message): + """Defines options for Apache Hudi table format""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class PropertiesEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"]) -> None: ... + + TABLE_TYPE_FIELD_NUMBER: builtins.int + RECORD_KEY_FIELD_NUMBER: builtins.int + PRECOMBINE_FIELD_FIELD_NUMBER: builtins.int + PROPERTIES_FIELD_NUMBER: builtins.int + table_type: builtins.str + """Type of Hudi table (COPY_ON_WRITE or MERGE_ON_READ)""" + record_key: builtins.str + """Field(s) that uniquely identify a record""" + precombine_field: builtins.str + """Field used to determine the latest version of a record""" + @property + def properties(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Additional properties for Hudi configuration + Examples: compaction strategy, indexing options, etc. + """ + def __init__( + self, + *, + table_type: builtins.str = ..., + record_key: builtins.str = ..., + precombine_field: builtins.str = ..., + properties: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["precombine_field", b"precombine_field", "properties", b"properties", "record_key", b"record_key", "table_type", b"table_type"]) -> None: ... + + ICEBERG_FORMAT_FIELD_NUMBER: builtins.int DELTA_FORMAT_FIELD_NUMBER: builtins.int + HUDI_FORMAT_FIELD_NUMBER: builtins.int @property - def parquet_format(self) -> global___FileFormat.ParquetFormat: ... + def iceberg_format(self) -> global___TableFormat.IcebergFormat: ... @property - def delta_format(self) -> global___FileFormat.DeltaFormat: ... + def delta_format(self) -> global___TableFormat.DeltaFormat: ... + @property + def hudi_format(self) -> global___TableFormat.HudiFormat: ... def __init__( self, *, - parquet_format: global___FileFormat.ParquetFormat | None = ..., - delta_format: global___FileFormat.DeltaFormat | None = ..., + iceberg_format: global___TableFormat.IcebergFormat | None = ..., + delta_format: global___TableFormat.DeltaFormat | None = ..., + hudi_format: global___TableFormat.HudiFormat | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["delta_format", b"delta_format", "format", b"format", "parquet_format", b"parquet_format"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["delta_format", b"delta_format", "format", b"format", "parquet_format", b"parquet_format"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["format", b"format"]) -> typing_extensions.Literal["parquet_format", "delta_format"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["delta_format", b"delta_format", "format", b"format", "hudi_format", b"hudi_format", "iceberg_format", b"iceberg_format"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["delta_format", b"delta_format", "format", b"format", "hudi_format", b"hudi_format", "iceberg_format", b"iceberg_format"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["format", b"format"]) -> typing_extensions.Literal["iceberg_format", "delta_format", "hudi_format"] | None: ... -global___FileFormat = FileFormat +global___TableFormat = TableFormat class StreamFormat(google.protobuf.message.Message): """Defines the data format encoding features/entity data in data streams""" diff --git a/sdk/python/feast/protos/feast/core/DataSource_pb2.py b/sdk/python/feast/protos/feast/core/DataSource_pb2.py index cb06cca5c10..f3086233584 100644 --- a/sdk/python/feast/protos/feast/core/DataSource_pb2.py +++ b/sdk/python/feast/protos/feast/core/DataSource_pb2.py @@ -19,7 +19,7 @@ from feast.protos.feast.core import Feature_pb2 as feast_dot_core_dot_Feature__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataSource.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1b\x66\x65\x61st/core/DataFormat.proto\x1a\x17\x66\x65\x61st/types/Value.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\"\xd9\x17\n\nDataSource\x12\x0c\n\x04name\x18\x14 \x01(\t\x12\x0f\n\x07project\x18\x15 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x17 \x01(\t\x12.\n\x04tags\x18\x18 \x03(\x0b\x32 .feast.core.DataSource.TagsEntry\x12\r\n\x05owner\x18\x19 \x01(\t\x12/\n\x04type\x18\x01 \x01(\x0e\x32!.feast.core.DataSource.SourceType\x12?\n\rfield_mapping\x18\x02 \x03(\x0b\x32(.feast.core.DataSource.FieldMappingEntry\x12\x17\n\x0ftimestamp_field\x18\x03 \x01(\t\x12\x1d\n\x15\x64\x61te_partition_column\x18\x04 \x01(\t\x12 \n\x18\x63reated_timestamp_column\x18\x05 \x01(\t\x12\x1e\n\x16\x64\x61ta_source_class_type\x18\x11 \x01(\t\x12,\n\x0c\x62\x61tch_source\x18\x1a \x01(\x0b\x32\x16.feast.core.DataSource\x12/\n\x04meta\x18\x32 \x01(\x0b\x32!.feast.core.DataSource.SourceMeta\x12:\n\x0c\x66ile_options\x18\x0b \x01(\x0b\x32\".feast.core.DataSource.FileOptionsH\x00\x12\x42\n\x10\x62igquery_options\x18\x0c \x01(\x0b\x32&.feast.core.DataSource.BigQueryOptionsH\x00\x12<\n\rkafka_options\x18\r \x01(\x0b\x32#.feast.core.DataSource.KafkaOptionsH\x00\x12@\n\x0fkinesis_options\x18\x0e \x01(\x0b\x32%.feast.core.DataSource.KinesisOptionsH\x00\x12\x42\n\x10redshift_options\x18\x0f \x01(\x0b\x32&.feast.core.DataSource.RedshiftOptionsH\x00\x12I\n\x14request_data_options\x18\x12 \x01(\x0b\x32).feast.core.DataSource.RequestDataOptionsH\x00\x12\x44\n\x0e\x63ustom_options\x18\x10 \x01(\x0b\x32*.feast.core.DataSource.CustomSourceOptionsH\x00\x12\x44\n\x11snowflake_options\x18\x13 \x01(\x0b\x32\'.feast.core.DataSource.SnowflakeOptionsH\x00\x12:\n\x0cpush_options\x18\x16 \x01(\x0b\x32\".feast.core.DataSource.PushOptionsH\x00\x12<\n\rspark_options\x18\x1b \x01(\x0b\x32#.feast.core.DataSource.SparkOptionsH\x00\x12<\n\rtrino_options\x18\x1e \x01(\x0b\x32#.feast.core.DataSource.TrinoOptionsH\x00\x12>\n\x0e\x61thena_options\x18# \x01(\x0b\x32$.feast.core.DataSource.AthenaOptionsH\x00\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldMappingEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xf5\x01\n\nSourceMeta\x12:\n\x16\x65\x61rliestEventTimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x14latestEventTimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x35\n\x11\x63reated_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x1a\x65\n\x0b\x46ileOptions\x12+\n\x0b\x66ile_format\x18\x01 \x01(\x0b\x32\x16.feast.core.FileFormat\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x03 \x01(\t\x1a/\n\x0f\x42igQueryOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a,\n\x0cTrinoOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a\xae\x01\n\x0cKafkaOptions\x12\x1f\n\x17kafka_bootstrap_servers\x18\x01 \x01(\t\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x30\n\x0emessage_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x12<\n\x19watermark_delay_threshold\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a\x66\n\x0eKinesisOptions\x12\x0e\n\x06region\x18\x01 \x01(\t\x12\x13\n\x0bstream_name\x18\x02 \x01(\t\x12/\n\rrecord_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x1aQ\n\x0fRedshiftOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\t\x1aT\n\rAthenaOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x13\n\x0b\x64\x61ta_source\x18\x04 \x01(\t\x1aX\n\x10SnowflakeOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\tJ\x04\x08\x05\x10\x06\x1au\n\x0cSparkOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\x12\x13\n\x0b\x66ile_format\x18\x04 \x01(\t\x12$\n\x1c\x64\x61te_partition_column_format\x18\x05 \x01(\t\x1a,\n\x13\x43ustomSourceOptions\x12\x15\n\rconfiguration\x18\x01 \x01(\x0c\x1a\xf7\x01\n\x12RequestDataOptions\x12Z\n\x11\x64\x65precated_schema\x18\x02 \x03(\x0b\x32?.feast.core.DataSource.RequestDataOptions.DeprecatedSchemaEntry\x12)\n\x06schema\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aT\n\x15\x44\x65precatedSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0e\x32\x1b.feast.types.ValueType.Enum:\x02\x38\x01J\x04\x08\x01\x10\x02\x1a\x13\n\x0bPushOptionsJ\x04\x08\x01\x10\x02\"\xf8\x01\n\nSourceType\x12\x0b\n\x07INVALID\x10\x00\x12\x0e\n\nBATCH_FILE\x10\x01\x12\x13\n\x0f\x42\x41TCH_SNOWFLAKE\x10\x08\x12\x12\n\x0e\x42\x41TCH_BIGQUERY\x10\x02\x12\x12\n\x0e\x42\x41TCH_REDSHIFT\x10\x05\x12\x10\n\x0cSTREAM_KAFKA\x10\x03\x12\x12\n\x0eSTREAM_KINESIS\x10\x04\x12\x11\n\rCUSTOM_SOURCE\x10\x06\x12\x12\n\x0eREQUEST_SOURCE\x10\x07\x12\x0f\n\x0bPUSH_SOURCE\x10\t\x12\x0f\n\x0b\x42\x41TCH_TRINO\x10\n\x12\x0f\n\x0b\x42\x41TCH_SPARK\x10\x0b\x12\x10\n\x0c\x42\x41TCH_ATHENA\x10\x0c\x42\t\n\x07optionsJ\x04\x08\x06\x10\x0b\"=\n\x0e\x44\x61taSourceList\x12+\n\x0b\x64\x61tasources\x18\x01 \x03(\x0b\x32\x16.feast.core.DataSourceBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taSourceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataSource.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1b\x66\x65\x61st/core/DataFormat.proto\x1a\x17\x66\x65\x61st/types/Value.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\"\x89\x18\n\nDataSource\x12\x0c\n\x04name\x18\x14 \x01(\t\x12\x0f\n\x07project\x18\x15 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x17 \x01(\t\x12.\n\x04tags\x18\x18 \x03(\x0b\x32 .feast.core.DataSource.TagsEntry\x12\r\n\x05owner\x18\x19 \x01(\t\x12/\n\x04type\x18\x01 \x01(\x0e\x32!.feast.core.DataSource.SourceType\x12?\n\rfield_mapping\x18\x02 \x03(\x0b\x32(.feast.core.DataSource.FieldMappingEntry\x12\x17\n\x0ftimestamp_field\x18\x03 \x01(\t\x12\x1d\n\x15\x64\x61te_partition_column\x18\x04 \x01(\t\x12 \n\x18\x63reated_timestamp_column\x18\x05 \x01(\t\x12\x1e\n\x16\x64\x61ta_source_class_type\x18\x11 \x01(\t\x12,\n\x0c\x62\x61tch_source\x18\x1a \x01(\x0b\x32\x16.feast.core.DataSource\x12/\n\x04meta\x18\x32 \x01(\x0b\x32!.feast.core.DataSource.SourceMeta\x12:\n\x0c\x66ile_options\x18\x0b \x01(\x0b\x32\".feast.core.DataSource.FileOptionsH\x00\x12\x42\n\x10\x62igquery_options\x18\x0c \x01(\x0b\x32&.feast.core.DataSource.BigQueryOptionsH\x00\x12<\n\rkafka_options\x18\r \x01(\x0b\x32#.feast.core.DataSource.KafkaOptionsH\x00\x12@\n\x0fkinesis_options\x18\x0e \x01(\x0b\x32%.feast.core.DataSource.KinesisOptionsH\x00\x12\x42\n\x10redshift_options\x18\x0f \x01(\x0b\x32&.feast.core.DataSource.RedshiftOptionsH\x00\x12I\n\x14request_data_options\x18\x12 \x01(\x0b\x32).feast.core.DataSource.RequestDataOptionsH\x00\x12\x44\n\x0e\x63ustom_options\x18\x10 \x01(\x0b\x32*.feast.core.DataSource.CustomSourceOptionsH\x00\x12\x44\n\x11snowflake_options\x18\x13 \x01(\x0b\x32\'.feast.core.DataSource.SnowflakeOptionsH\x00\x12:\n\x0cpush_options\x18\x16 \x01(\x0b\x32\".feast.core.DataSource.PushOptionsH\x00\x12<\n\rspark_options\x18\x1b \x01(\x0b\x32#.feast.core.DataSource.SparkOptionsH\x00\x12<\n\rtrino_options\x18\x1e \x01(\x0b\x32#.feast.core.DataSource.TrinoOptionsH\x00\x12>\n\x0e\x61thena_options\x18# \x01(\x0b\x32$.feast.core.DataSource.AthenaOptionsH\x00\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldMappingEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xf5\x01\n\nSourceMeta\x12:\n\x16\x65\x61rliestEventTimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x14latestEventTimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x35\n\x11\x63reated_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x1a\x65\n\x0b\x46ileOptions\x12+\n\x0b\x66ile_format\x18\x01 \x01(\x0b\x32\x16.feast.core.FileFormat\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x03 \x01(\t\x1a/\n\x0f\x42igQueryOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a,\n\x0cTrinoOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a\xae\x01\n\x0cKafkaOptions\x12\x1f\n\x17kafka_bootstrap_servers\x18\x01 \x01(\t\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x30\n\x0emessage_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x12<\n\x19watermark_delay_threshold\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a\x66\n\x0eKinesisOptions\x12\x0e\n\x06region\x18\x01 \x01(\t\x12\x13\n\x0bstream_name\x18\x02 \x01(\t\x12/\n\rrecord_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x1aQ\n\x0fRedshiftOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\t\x1aT\n\rAthenaOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x13\n\x0b\x64\x61ta_source\x18\x04 \x01(\t\x1aX\n\x10SnowflakeOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\tJ\x04\x08\x05\x10\x06\x1a\xa4\x01\n\x0cSparkOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\x12\x13\n\x0b\x66ile_format\x18\x04 \x01(\t\x12$\n\x1c\x64\x61te_partition_column_format\x18\x05 \x01(\t\x12-\n\x0ctable_format\x18\x06 \x01(\x0b\x32\x17.feast.core.TableFormat\x1a,\n\x13\x43ustomSourceOptions\x12\x15\n\rconfiguration\x18\x01 \x01(\x0c\x1a\xf7\x01\n\x12RequestDataOptions\x12Z\n\x11\x64\x65precated_schema\x18\x02 \x03(\x0b\x32?.feast.core.DataSource.RequestDataOptions.DeprecatedSchemaEntry\x12)\n\x06schema\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aT\n\x15\x44\x65precatedSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0e\x32\x1b.feast.types.ValueType.Enum:\x02\x38\x01J\x04\x08\x01\x10\x02\x1a\x13\n\x0bPushOptionsJ\x04\x08\x01\x10\x02\"\xf8\x01\n\nSourceType\x12\x0b\n\x07INVALID\x10\x00\x12\x0e\n\nBATCH_FILE\x10\x01\x12\x13\n\x0f\x42\x41TCH_SNOWFLAKE\x10\x08\x12\x12\n\x0e\x42\x41TCH_BIGQUERY\x10\x02\x12\x12\n\x0e\x42\x41TCH_REDSHIFT\x10\x05\x12\x10\n\x0cSTREAM_KAFKA\x10\x03\x12\x12\n\x0eSTREAM_KINESIS\x10\x04\x12\x11\n\rCUSTOM_SOURCE\x10\x06\x12\x12\n\x0eREQUEST_SOURCE\x10\x07\x12\x0f\n\x0bPUSH_SOURCE\x10\t\x12\x0f\n\x0b\x42\x41TCH_TRINO\x10\n\x12\x0f\n\x0b\x42\x41TCH_SPARK\x10\x0b\x12\x10\n\x0c\x42\x41TCH_ATHENA\x10\x0c\x42\t\n\x07optionsJ\x04\x08\x06\x10\x0b\"=\n\x0e\x44\x61taSourceList\x12+\n\x0b\x64\x61tasources\x18\x01 \x03(\x0b\x32\x16.feast.core.DataSourceBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taSourceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,7 +34,7 @@ _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._options = None _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_options = b'8\001' _globals['_DATASOURCE']._serialized_start=189 - _globals['_DATASOURCE']._serialized_end=3222 + _globals['_DATASOURCE']._serialized_end=3270 _globals['_DATASOURCE_TAGSENTRY']._serialized_start=1436 _globals['_DATASOURCE_TAGSENTRY']._serialized_end=1479 _globals['_DATASOURCE_FIELDMAPPINGENTRY']._serialized_start=1481 @@ -57,18 +57,18 @@ _globals['_DATASOURCE_ATHENAOPTIONS']._serialized_end=2428 _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_start=2430 _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_end=2518 - _globals['_DATASOURCE_SPARKOPTIONS']._serialized_start=2520 - _globals['_DATASOURCE_SPARKOPTIONS']._serialized_end=2637 - _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_start=2639 - _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_end=2683 - _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_start=2686 - _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_end=2933 - _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_start=2843 - _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_end=2927 - _globals['_DATASOURCE_PUSHOPTIONS']._serialized_start=2935 - _globals['_DATASOURCE_PUSHOPTIONS']._serialized_end=2954 - _globals['_DATASOURCE_SOURCETYPE']._serialized_start=2957 - _globals['_DATASOURCE_SOURCETYPE']._serialized_end=3205 - _globals['_DATASOURCELIST']._serialized_start=3224 - _globals['_DATASOURCELIST']._serialized_end=3285 + _globals['_DATASOURCE_SPARKOPTIONS']._serialized_start=2521 + _globals['_DATASOURCE_SPARKOPTIONS']._serialized_end=2685 + _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_start=2687 + _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_end=2731 + _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_start=2734 + _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_end=2981 + _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_start=2891 + _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_end=2975 + _globals['_DATASOURCE_PUSHOPTIONS']._serialized_start=2983 + _globals['_DATASOURCE_PUSHOPTIONS']._serialized_end=3002 + _globals['_DATASOURCE_SOURCETYPE']._serialized_start=3005 + _globals['_DATASOURCE_SOURCETYPE']._serialized_end=3253 + _globals['_DATASOURCELIST']._serialized_start=3272 + _globals['_DATASOURCELIST']._serialized_end=3333 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi b/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi index 668d83525cf..7876e1adc98 100644 --- a/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi @@ -66,7 +66,7 @@ class DataSource(google.protobuf.message.Message): class SourceType(_SourceType, metaclass=_SourceTypeEnumTypeWrapper): """Type of Data Source. - Next available id: 12 + Next available id: 13 """ INVALID: DataSource.SourceType.ValueType # 0 @@ -369,6 +369,7 @@ class DataSource(google.protobuf.message.Message): PATH_FIELD_NUMBER: builtins.int FILE_FORMAT_FIELD_NUMBER: builtins.int DATE_PARTITION_COLUMN_FORMAT_FIELD_NUMBER: builtins.int + TABLE_FORMAT_FIELD_NUMBER: builtins.int table: builtins.str """Table name""" query: builtins.str @@ -379,6 +380,9 @@ class DataSource(google.protobuf.message.Message): """Format of files at `path` (e.g. parquet, avro, etc)""" date_partition_column_format: builtins.str """Date Format of date partition column (e.g. %Y-%m-%d)""" + @property + def table_format(self) -> feast.core.DataFormat_pb2.TableFormat: + """Table Format (e.g. iceberg, delta, hudi)""" def __init__( self, *, @@ -387,8 +391,10 @@ class DataSource(google.protobuf.message.Message): path: builtins.str = ..., file_format: builtins.str = ..., date_partition_column_format: builtins.str = ..., + table_format: feast.core.DataFormat_pb2.TableFormat | None = ..., ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["date_partition_column_format", b"date_partition_column_format", "file_format", b"file_format", "path", b"path", "query", b"query", "table", b"table"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["table_format", b"table_format"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["date_partition_column_format", b"date_partition_column_format", "file_format", b"file_format", "path", b"path", "query", b"query", "table", b"table", "table_format", b"table_format"]) -> None: ... class CustomSourceOptions(google.protobuf.message.Message): """Defines configuration for custom third-party data sources.""" diff --git a/sdk/python/feast/table_format.py b/sdk/python/feast/table_format.py new file mode 100644 index 00000000000..829d8a6e19e --- /dev/null +++ b/sdk/python/feast/table_format.py @@ -0,0 +1,564 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from abc import ABC, abstractmethod +from enum import Enum +from typing import TYPE_CHECKING, Dict, Optional + +if TYPE_CHECKING: + from feast.protos.feast.core.DataFormat_pb2 import TableFormat as TableFormatProto + + +class TableFormatType(Enum): + """Enum for supported table formats""" + + DELTA = "delta" + ICEBERG = "iceberg" + HUDI = "hudi" + + +class TableFormat(ABC): + """ + Abstract base class for table formats. + + Table formats encapsulate metadata and configuration specific to different + table storage formats like Iceberg, Delta Lake, Hudi, etc. They provide + a unified interface for configuring table-specific properties that are + used when reading from or writing to these advanced table formats. + + This base class defines the contract that all table format implementations + must follow, including serialization/deserialization capabilities and + property management. + + Attributes: + format_type (TableFormatType): The type of table format (iceberg, delta, hudi). + properties (Dict[str, str]): Dictionary of format-specific properties. + + Examples: + Table formats are typically used with data sources to specify + advanced table metadata and reading options: + + >>> from feast.table_format import IcebergFormat + >>> iceberg_format = IcebergFormat( + ... catalog="my_catalog", + ... namespace="my_namespace" + ... ) + >>> iceberg_format.set_property("snapshot-id", "123456789") + """ + + def __init__( + self, format_type: TableFormatType, properties: Optional[Dict[str, str]] = None + ): + self.format_type = format_type + self.properties = properties or {} + + @abstractmethod + def to_dict(self) -> Dict: + """Convert table format to dictionary representation""" + pass + + @classmethod + @abstractmethod + def from_dict(cls, data: Dict) -> "TableFormat": + """Create table format from dictionary representation""" + pass + + def get_property(self, key: str, default: Optional[str] = None) -> Optional[str]: + """Get a table format property""" + return self.properties.get(key, default) + + def set_property(self, key: str, value: str) -> None: + """Set a table format property""" + self.properties[key] = value + + +class IcebergFormat(TableFormat): + """ + Apache Iceberg table format configuration. + + Iceberg is an open table format for huge analytic datasets. This class provides + configuration for Iceberg-specific properties including catalog configuration, + namespace settings, and table-level properties for reading and writing Iceberg tables. + + Args: + catalog (Optional[str]): Name of the Iceberg catalog to use. The catalog manages + table metadata and provides access to tables. + namespace (Optional[str]): Namespace (schema/database) within the catalog where + the table is located. + properties (Optional[Dict[str, str]]): Properties for configuring Iceberg + catalog and table operations (e.g., warehouse location, snapshot-id, + as-of-timestamp, file format, compression, partitioning). + + Attributes: + catalog (str): The Iceberg catalog name. + namespace (str): The namespace within the catalog. + properties (Dict[str, str]): Iceberg configuration properties. + + Examples: + Basic Iceberg configuration: + + >>> iceberg_format = IcebergFormat( + ... catalog="my_catalog", + ... namespace="my_database" + ... ) + + Advanced configuration with properties: + + >>> iceberg_format = IcebergFormat( + ... catalog="spark_catalog", + ... namespace="lakehouse", + ... properties={ + ... "warehouse": "s3://my-bucket/warehouse", + ... "catalog-impl": "org.apache.iceberg.spark.SparkCatalog", + ... "format-version": "2", + ... "write.parquet.compression-codec": "snappy" + ... } + ... ) + + Reading from a specific snapshot: + + >>> iceberg_format = IcebergFormat(catalog="my_catalog", namespace="db") + >>> iceberg_format.set_property("snapshot-id", "123456789") + + Time travel queries: + + >>> iceberg_format.set_property("as-of-timestamp", "1648684800000") + """ + + def __init__( + self, + catalog: Optional[str] = None, + namespace: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__(TableFormatType.ICEBERG, properties) + self.catalog = catalog + self.namespace = namespace + + # Add catalog and namespace to properties if provided + if catalog: + self.properties["iceberg.catalog"] = catalog + if namespace: + self.properties["iceberg.namespace"] = namespace + + def to_dict(self) -> Dict: + return { + "format_type": self.format_type.value, + "catalog": self.catalog, + "namespace": self.namespace, + "properties": self.properties, + } + + @classmethod + def from_dict(cls, data: Dict) -> "IcebergFormat": + return cls( + catalog=data.get("catalog"), + namespace=data.get("namespace"), + properties=data.get("properties", {}), + ) + + def to_proto(self) -> "TableFormatProto": + """Convert to protobuf TableFormat message""" + from feast.protos.feast.core.DataFormat_pb2 import ( + TableFormat as TableFormatProto, + ) + + iceberg_proto = TableFormatProto.IcebergFormat( + catalog=self.catalog or "", + namespace=self.namespace or "", + properties=self.properties, + ) + return TableFormatProto(iceberg_format=iceberg_proto) + + @classmethod + def from_proto(cls, proto: "TableFormatProto") -> "IcebergFormat": + """Create from protobuf TableFormat message""" + iceberg_proto = proto.iceberg_format + return cls( + catalog=iceberg_proto.catalog if iceberg_proto.catalog else None, + namespace=iceberg_proto.namespace if iceberg_proto.namespace else None, + properties=dict(iceberg_proto.properties), + ) + + +class DeltaFormat(TableFormat): + """ + Delta Lake table format configuration. + + Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark + and big data workloads. This class provides configuration for Delta-specific properties + including table properties, checkpoint locations, and versioning options. + + Args: + checkpoint_location (Optional[str]): Location for storing Delta transaction logs + and checkpoints. Required for streaming operations. + properties (Optional[Dict[str, str]]): Properties for configuring Delta table + behavior (e.g., auto-optimize, vacuum settings, data skipping). + + Attributes: + checkpoint_location (str): Path to checkpoint storage location. + properties (Dict[str, str]): Delta table configuration properties. + + Examples: + Basic Delta configuration: + + >>> delta_format = DeltaFormat() + + Configuration with table properties: + + >>> delta_format = DeltaFormat( + ... properties={ + ... "delta.autoOptimize.optimizeWrite": "true", + ... "delta.autoOptimize.autoCompact": "true", + ... "delta.tuneFileSizesForRewrites": "true" + ... } + ... ) + + Streaming configuration with checkpoint: + + >>> delta_format = DeltaFormat( + ... checkpoint_location="s3://my-bucket/checkpoints/my_table" + ... ) + + Time travel - reading specific version: + + >>> delta_format = DeltaFormat() + >>> delta_format.set_property("versionAsOf", "5") + + Time travel - reading at specific timestamp: + + >>> delta_format.set_property("timestampAsOf", "2023-01-01 00:00:00") + """ + + def __init__( + self, + checkpoint_location: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__(TableFormatType.DELTA, properties) + self.checkpoint_location = checkpoint_location + + # Add checkpoint location to properties if provided + if checkpoint_location: + self.properties["delta.checkpointLocation"] = checkpoint_location + + def to_dict(self) -> Dict: + return { + "format_type": self.format_type.value, + "checkpoint_location": self.checkpoint_location, + "properties": self.properties, + } + + @classmethod + def from_dict(cls, data: Dict) -> "DeltaFormat": + return cls( + checkpoint_location=data.get("checkpoint_location"), + properties=data.get("properties", {}), + ) + + def to_proto(self) -> "TableFormatProto": + """Convert to protobuf TableFormat message""" + from feast.protos.feast.core.DataFormat_pb2 import ( + TableFormat as TableFormatProto, + ) + + delta_proto = TableFormatProto.DeltaFormat( + checkpoint_location=self.checkpoint_location or "", + properties=self.properties, + ) + return TableFormatProto(delta_format=delta_proto) + + @classmethod + def from_proto(cls, proto: "TableFormatProto") -> "DeltaFormat": + """Create from protobuf TableFormat message""" + delta_proto = proto.delta_format + return cls( + checkpoint_location=delta_proto.checkpoint_location + if delta_proto.checkpoint_location + else None, + properties=dict(delta_proto.properties), + ) + + +class HudiFormat(TableFormat): + """ + Apache Hudi table format configuration. + + Apache Hudi is a data management framework used to simplify incremental data processing + and data pipeline development. This class provides configuration for Hudi-specific + properties including table type, record keys, and write operations. + + Args: + table_type (Optional[str]): Type of Hudi table. Options are: + - "COPY_ON_WRITE": Stores data in columnar format (Parquet) and rewrites entire files + - "MERGE_ON_READ": Stores data using combination of columnar and row-based formats + record_key (Optional[str]): Field(s) that uniquely identify a record. Can be a single + field or comma-separated list for composite keys. + precombine_field (Optional[str]): Field used to determine the latest version of a record + when multiple updates exist (usually a timestamp or version field). + properties (Optional[Dict[str, str]]): Additional Hudi table properties for + configuring compaction, indexing, and other Hudi features. + + Attributes: + table_type (str): The Hudi table type (COPY_ON_WRITE or MERGE_ON_READ). + record_key (str): The record key field(s). + precombine_field (str): The field used for record deduplication. + properties (Dict[str, str]): Additional Hudi configuration properties. + + Examples: + Basic Hudi configuration: + + >>> hudi_format = HudiFormat( + ... table_type="COPY_ON_WRITE", + ... record_key="user_id", + ... precombine_field="timestamp" + ... ) + + Configuration with composite record key: + + >>> hudi_format = HudiFormat( + ... table_type="MERGE_ON_READ", + ... record_key="user_id,event_type", + ... precombine_field="event_timestamp" + ... ) + + Advanced configuration with table properties: + + >>> hudi_format = HudiFormat( + ... table_type="COPY_ON_WRITE", + ... record_key="id", + ... precombine_field="updated_at", + ... properties={ + ... "hoodie.compaction.strategy": "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy", + ... "hoodie.index.type": "BLOOM", + ... "hoodie.bloom.index.parallelism": "100" + ... } + ... ) + + Reading incremental data: + + >>> hudi_format = HudiFormat(table_type="COPY_ON_WRITE") + >>> hudi_format.set_property("hoodie.datasource.query.type", "incremental") + >>> hudi_format.set_property("hoodie.datasource.read.begin.instanttime", "20230101000000") + """ + + def __init__( + self, + table_type: Optional[str] = None, # COPY_ON_WRITE or MERGE_ON_READ + record_key: Optional[str] = None, + precombine_field: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ): + super().__init__(TableFormatType.HUDI, properties) + self.table_type = table_type + self.record_key = record_key + self.precombine_field = precombine_field + + # Add Hudi-specific properties if provided + if table_type: + self.properties["hoodie.datasource.write.table.type"] = table_type + if record_key: + self.properties["hoodie.datasource.write.recordkey.field"] = record_key + if precombine_field: + self.properties["hoodie.datasource.write.precombine.field"] = ( + precombine_field + ) + + def to_dict(self) -> Dict: + return { + "format_type": self.format_type.value, + "table_type": self.table_type, + "record_key": self.record_key, + "precombine_field": self.precombine_field, + "properties": self.properties, + } + + @classmethod + def from_dict(cls, data: Dict) -> "HudiFormat": + return cls( + table_type=data.get("table_type"), + record_key=data.get("record_key"), + precombine_field=data.get("precombine_field"), + properties=data.get("properties", {}), + ) + + def to_proto(self) -> "TableFormatProto": + """Convert to protobuf TableFormat message""" + from feast.protos.feast.core.DataFormat_pb2 import ( + TableFormat as TableFormatProto, + ) + + hudi_proto = TableFormatProto.HudiFormat( + table_type=self.table_type or "", + record_key=self.record_key or "", + precombine_field=self.precombine_field or "", + properties=self.properties, + ) + return TableFormatProto(hudi_format=hudi_proto) + + @classmethod + def from_proto(cls, proto: "TableFormatProto") -> "HudiFormat": + """Create from protobuf TableFormat message""" + hudi_proto = proto.hudi_format + return cls( + table_type=hudi_proto.table_type if hudi_proto.table_type else None, + record_key=hudi_proto.record_key if hudi_proto.record_key else None, + precombine_field=hudi_proto.precombine_field + if hudi_proto.precombine_field + else None, + properties=dict(hudi_proto.properties), + ) + + +def create_table_format(format_type: TableFormatType, **kwargs) -> TableFormat: + """ + Factory function to create appropriate TableFormat instance based on type. + + This is a convenience function that creates the correct TableFormat subclass + based on the provided format type, passing through any additional keyword arguments + to the constructor. + + Args: + format_type (TableFormatType): The type of table format to create. + **kwargs: Additional keyword arguments passed to the format constructor. + + Returns: + TableFormat: An instance of the appropriate TableFormat subclass. + + Raises: + ValueError: If an unsupported format_type is provided. + + Examples: + Create an Iceberg format: + + >>> iceberg_format = create_table_format( + ... TableFormatType.ICEBERG, + ... catalog="my_catalog", + ... namespace="my_db" + ... ) + + Create a Delta format: + + >>> delta_format = create_table_format( + ... TableFormatType.DELTA, + ... checkpoint_location="s3://bucket/checkpoints" + ... ) + """ + if format_type == TableFormatType.ICEBERG: + return IcebergFormat(**kwargs) + elif format_type == TableFormatType.DELTA: + return DeltaFormat(**kwargs) + elif format_type == TableFormatType.HUDI: + return HudiFormat(**kwargs) + else: + raise ValueError(f"Unknown table format type: {format_type}") + + +def table_format_from_dict(data: Dict) -> TableFormat: + """ + Create TableFormat instance from dictionary representation. + + This function deserializes a dictionary (typically from JSON or protobuf) + back into the appropriate TableFormat instance. The dictionary must contain + a 'format_type' field that indicates which format class to instantiate. + + Args: + data (Dict): Dictionary containing table format configuration. Must include + 'format_type' field with value 'iceberg', 'delta', or 'hudi'. + + Returns: + TableFormat: An instance of the appropriate TableFormat subclass. + + Raises: + ValueError: If format_type is not recognized. + KeyError: If format_type field is missing from data. + + Examples: + Deserialize an Iceberg format: + + >>> data = { + ... "format_type": "iceberg", + ... "catalog": "my_catalog", + ... "namespace": "my_db" + ... } + >>> iceberg_format = table_format_from_dict(data) + """ + if "format_type" not in data: + raise KeyError("Missing 'format_type' field in data") + format_type = data["format_type"] + + if format_type == TableFormatType.ICEBERG.value: + return IcebergFormat.from_dict(data) + elif format_type == TableFormatType.DELTA.value: + return DeltaFormat.from_dict(data) + elif format_type == TableFormatType.HUDI.value: + return HudiFormat.from_dict(data) + else: + raise ValueError(f"Unknown table format type: {format_type}") + + +def table_format_from_json(json_str: str) -> TableFormat: + """ + Create TableFormat instance from JSON string. + + This is a convenience function that parses a JSON string and creates + the appropriate TableFormat instance. Useful for loading table format + configurations from files or network requests. + + Args: + json_str (str): JSON string containing table format configuration. + + Returns: + TableFormat: An instance of the appropriate TableFormat subclass. + + Raises: + json.JSONDecodeError: If the JSON string is invalid. + ValueError: If format_type is not recognized. + KeyError: If format_type field is missing. + + Examples: + Load from JSON string: + + >>> json_config = '{"format_type": "delta", "checkpoint_location": "s3://bucket/checkpoints"}' + >>> delta_format = table_format_from_json(json_config) + """ + data = json.loads(json_str) + return table_format_from_dict(data) + + +def table_format_from_proto(proto: "TableFormatProto") -> TableFormat: + """ + Create TableFormat instance from protobuf TableFormat message. + + Args: + proto: TableFormat protobuf message + + Returns: + TableFormat: An instance of the appropriate TableFormat subclass. + + Raises: + ValueError: If the proto doesn't contain a recognized format. + """ + + which_format = proto.WhichOneof("format") + + if which_format == "iceberg_format": + return IcebergFormat.from_proto(proto) + elif which_format == "delta_format": + return DeltaFormat.from_proto(proto) + elif which_format == "hudi_format": + return HudiFormat.from_proto(proto) + else: + raise ValueError(f"Unknown table format in proto: {which_format}") diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_table_format_integration.py b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_table_format_integration.py new file mode 100644 index 00000000000..639f478fb28 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_table_format_integration.py @@ -0,0 +1,303 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( + SparkOptions, + SparkSource, +) +from feast.table_format import ( + DeltaFormat, + HudiFormat, + IcebergFormat, +) + + +class TestSparkSourceWithTableFormat: + """Test SparkSource integration with TableFormat.""" + + def test_spark_source_with_iceberg_table_format(self): + """Test SparkSource with IcebergFormat.""" + iceberg_format = IcebergFormat( + catalog="my_catalog", + namespace="my_db", + ) + iceberg_format.set_property("snapshot-id", "123456789") + + spark_source = SparkSource( + name="iceberg_features", + path="my_catalog.my_db.my_table", + table_format=iceberg_format, + ) + + assert spark_source.table_format == iceberg_format + assert spark_source.table_format.catalog == "my_catalog" + assert spark_source.table_format.get_property("snapshot-id") == "123456789" + assert spark_source.path == "my_catalog.my_db.my_table" + + def test_spark_source_with_delta_table_format(self): + """Test SparkSource with DeltaFormat.""" + delta_format = DeltaFormat() + delta_format.set_property("versionAsOf", "1") + + spark_source = SparkSource( + name="delta_features", + path="s3://bucket/delta-table", + table_format=delta_format, + ) + + assert spark_source.table_format == delta_format + assert spark_source.table_format.get_property("versionAsOf") == "1" + assert spark_source.path == "s3://bucket/delta-table" + + def test_spark_source_with_hudi_table_format(self): + """Test SparkSource with HudiFormat.""" + hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="id", + ) + hudi_format.set_property("hoodie.datasource.query.type", "snapshot") + + spark_source = SparkSource( + name="hudi_features", + path="s3://bucket/hudi-table", + table_format=hudi_format, + ) + + assert spark_source.table_format == hudi_format + assert spark_source.table_format.table_type == "COPY_ON_WRITE" + assert ( + spark_source.table_format.get_property("hoodie.datasource.query.type") + == "snapshot" + ) + + def test_spark_source_without_table_format(self): + """Test SparkSource without table format (traditional file reading).""" + spark_source = SparkSource( + name="parquet_features", + path="s3://bucket/data.parquet", + file_format="parquet", + ) + + assert spark_source.table_format is None + assert spark_source.file_format == "parquet" + assert spark_source.path == "s3://bucket/data.parquet" + + def test_spark_source_both_file_and_table_format(self): + """Test SparkSource with both file_format and table_format.""" + iceberg_format = IcebergFormat() + + spark_source = SparkSource( + name="mixed_features", + path="s3://bucket/iceberg-table", + file_format="parquet", # Underlying file format + table_format=iceberg_format, # Table metadata format + ) + + assert spark_source.table_format == iceberg_format + assert spark_source.file_format == "parquet" + + def test_spark_source_validation_with_table_format(self): + """Test SparkSource validation with table_format.""" + iceberg_format = IcebergFormat() + + # Should work: path with table_format, no file_format + spark_source = SparkSource( + name="iceberg_table", + path="catalog.db.table", + table_format=iceberg_format, + ) + assert spark_source.table_format == iceberg_format + + # Should work: path with both table_format and file_format + spark_source = SparkSource( + name="iceberg_table_with_file_format", + path="s3://bucket/data", + file_format="parquet", + table_format=iceberg_format, + ) + assert spark_source.table_format == iceberg_format + assert spark_source.file_format == "parquet" + + def test_spark_source_validation_without_table_format(self): + """Test SparkSource validation without table_format.""" + # Should work: path with file_format, no table_format + spark_source = SparkSource( + name="parquet_file", + path="s3://bucket/data.parquet", + file_format="parquet", + ) + assert spark_source.file_format == "parquet" + assert spark_source.table_format is None + + # Should fail: path without file_format or table_format + with pytest.raises( + ValueError, + match="If 'path' is specified without 'table_format', then 'file_format' is required", + ): + SparkSource( + name="invalid_source", + path="s3://bucket/data", + ) + + @patch( + "feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig" + ) + def test_load_dataframe_from_path_with_table_format(self, mock_get_spark_session): + """Test _load_dataframe_from_path with table formats.""" + mock_spark_session = MagicMock() + mock_get_spark_session.getActiveSession.return_value = mock_spark_session + + mock_reader = MagicMock() + mock_spark_session.read.format.return_value = mock_reader + mock_reader.option.return_value = mock_reader + mock_reader.load.return_value = MagicMock() + + # Test Iceberg with options + iceberg_format = IcebergFormat() + iceberg_format.set_property("snapshot-id", "123456789") + iceberg_format.set_property("read.split.target-size", "134217728") + + spark_source = SparkSource( + name="iceberg_test", + path="catalog.db.table", + table_format=iceberg_format, + ) + + spark_source._load_dataframe_from_path(mock_spark_session) + + # Verify format was set to iceberg + mock_spark_session.read.format.assert_called_with("iceberg") + + # Verify options were set + mock_reader.option.assert_any_call("snapshot-id", "123456789") + mock_reader.option.assert_any_call("read.split.target-size", "134217728") + + # Verify load was called with the path + mock_reader.load.assert_called_with("catalog.db.table") + + @patch( + "feast.infra.offline_stores.contrib.spark_offline_store.spark.get_spark_session_or_start_new_with_repoconfig" + ) + def test_load_dataframe_from_path_without_table_format( + self, mock_get_spark_session + ): + """Test _load_dataframe_from_path without table formats.""" + mock_spark_session = MagicMock() + mock_get_spark_session.getActiveSession.return_value = mock_spark_session + + mock_reader = MagicMock() + mock_spark_session.read.format.return_value = mock_reader + mock_reader.load.return_value = MagicMock() + + spark_source = SparkSource( + name="parquet_test", + path="s3://bucket/data.parquet", + file_format="parquet", + ) + + spark_source._load_dataframe_from_path(mock_spark_session) + + # Verify format was set to parquet (file format) + mock_spark_session.read.format.assert_called_with("parquet") + + # Verify load was called with the path + mock_reader.load.assert_called_with("s3://bucket/data.parquet") + + +class TestSparkOptionsWithTableFormat: + """Test SparkOptions serialization with TableFormat.""" + + def test_spark_options_protobuf_serialization_with_table_format(self): + """Test SparkOptions protobuf serialization/deserialization with table format.""" + iceberg_format = IcebergFormat( + catalog="test_catalog", + namespace="test_namespace", + ) + iceberg_format.set_property("snapshot-id", "123456789") + + spark_options = SparkOptions( + table=None, + query=None, + path="catalog.db.table", + file_format=None, + table_format=iceberg_format, + ) + + # Test serialization to proto + proto = spark_options.to_proto() + assert proto.path == "catalog.db.table" + assert proto.file_format == "" # Should be empty when not provided + + # Verify table_format is serialized as proto TableFormat + assert proto.HasField("table_format") + assert proto.table_format.HasField("iceberg_format") + assert proto.table_format.iceberg_format.catalog == "test_catalog" + assert proto.table_format.iceberg_format.namespace == "test_namespace" + assert ( + proto.table_format.iceberg_format.properties["snapshot-id"] == "123456789" + ) + + # Test deserialization from proto + restored_options = SparkOptions.from_proto(proto) + assert restored_options.path == "catalog.db.table" + assert restored_options.file_format == "" + assert isinstance(restored_options.table_format, IcebergFormat) + assert restored_options.table_format.catalog == "test_catalog" + assert restored_options.table_format.namespace == "test_namespace" + assert restored_options.table_format.get_property("snapshot-id") == "123456789" + + def test_spark_options_protobuf_serialization_without_table_format(self): + """Test SparkOptions protobuf serialization/deserialization without table format.""" + spark_options = SparkOptions( + table=None, + query=None, + path="s3://bucket/data.parquet", + file_format="parquet", + table_format=None, + ) + + # Test serialization to proto + proto = spark_options.to_proto() + assert proto.path == "s3://bucket/data.parquet" + assert proto.file_format == "parquet" + assert not proto.HasField("table_format") # Should not have table_format field + + # Test deserialization from proto + restored_options = SparkOptions.from_proto(proto) + assert restored_options.path == "s3://bucket/data.parquet" + assert restored_options.file_format == "parquet" + assert restored_options.table_format is None + + def test_spark_source_protobuf_roundtrip_with_table_format(self): + """Test complete SparkSource protobuf roundtrip with table format.""" + delta_format = DeltaFormat() + delta_format.set_property("versionAsOf", "1") + + original_source = SparkSource( + name="delta_test", + path="s3://bucket/delta-table", + table_format=delta_format, + timestamp_field="event_timestamp", + created_timestamp_column="created_at", + description="Test delta source", + ) + + # Serialize to proto + proto = original_source._to_proto_impl() + + # Deserialize from proto + restored_source = SparkSource.from_proto(proto) + + assert restored_source.name == original_source.name + assert restored_source.path == original_source.path + assert restored_source.timestamp_field == original_source.timestamp_field + assert ( + restored_source.created_timestamp_column + == original_source.created_timestamp_column + ) + assert restored_source.description == original_source.description + + # Verify table_format is properly restored + assert isinstance(restored_source.table_format, DeltaFormat) + assert restored_source.table_format.get_property("versionAsOf") == "1" diff --git a/sdk/python/tests/unit/test_table_format.py b/sdk/python/tests/unit/test_table_format.py new file mode 100644 index 00000000000..908d491e940 --- /dev/null +++ b/sdk/python/tests/unit/test_table_format.py @@ -0,0 +1,323 @@ +import json + +import pytest + +from feast.table_format import ( + DeltaFormat, + HudiFormat, + IcebergFormat, + TableFormatType, + create_table_format, + table_format_from_dict, + table_format_from_json, +) + + +class TestTableFormat: + """Test core TableFormat classes and functionality.""" + + def test_iceberg_table_format_creation(self): + """Test IcebergFormat creation and properties.""" + iceberg_format = IcebergFormat( + catalog="my_catalog", + namespace="my_namespace", + properties={"catalog.uri": "s3://bucket/warehouse", "format-version": "2"}, + ) + + assert iceberg_format.format_type == TableFormatType.ICEBERG + assert iceberg_format.catalog == "my_catalog" + assert iceberg_format.namespace == "my_namespace" + assert iceberg_format.get_property("iceberg.catalog") == "my_catalog" + assert iceberg_format.get_property("iceberg.namespace") == "my_namespace" + assert iceberg_format.get_property("catalog.uri") == "s3://bucket/warehouse" + assert iceberg_format.get_property("format-version") == "2" + + def test_iceberg_table_format_minimal(self): + """Test IcebergFormat with minimal config.""" + iceberg_format = IcebergFormat() + + assert iceberg_format.format_type == TableFormatType.ICEBERG + assert iceberg_format.catalog is None + assert iceberg_format.namespace is None + assert len(iceberg_format.properties) == 0 + + def test_delta_table_format_creation(self): + """Test DeltaFormat creation and properties.""" + delta_format = DeltaFormat( + checkpoint_location="s3://bucket/checkpoints", + properties={"delta.autoOptimize.optimizeWrite": "true"}, + ) + + assert delta_format.format_type == TableFormatType.DELTA + assert delta_format.checkpoint_location == "s3://bucket/checkpoints" + assert ( + delta_format.get_property("delta.checkpointLocation") + == "s3://bucket/checkpoints" + ) + assert delta_format.get_property("delta.autoOptimize.optimizeWrite") == "true" + + def test_hudi_table_format_creation(self): + """Test HudiFormat creation and properties.""" + hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="id", + precombine_field="timestamp", + properties={ + "hoodie.compaction.strategy": "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy" + }, + ) + + assert hudi_format.format_type == TableFormatType.HUDI + assert hudi_format.table_type == "COPY_ON_WRITE" + assert hudi_format.record_key == "id" + assert hudi_format.precombine_field == "timestamp" + assert ( + hudi_format.get_property("hoodie.datasource.write.table.type") + == "COPY_ON_WRITE" + ) + assert ( + hudi_format.get_property("hoodie.datasource.write.recordkey.field") == "id" + ) + assert ( + hudi_format.get_property("hoodie.datasource.write.precombine.field") + == "timestamp" + ) + + def test_table_format_property_methods(self): + """Test property getter/setter methods.""" + iceberg_format = IcebergFormat() + + # Test setting and getting properties + iceberg_format.set_property("snapshot-id", "123456789") + assert iceberg_format.get_property("snapshot-id") == "123456789" + + # Test default value + assert iceberg_format.get_property("non-existent-key", "default") == "default" + assert iceberg_format.get_property("non-existent-key") is None + + def test_table_format_serialization(self): + """Test table format serialization to/from dict.""" + # Test Iceberg + iceberg_format = IcebergFormat( + catalog="test_catalog", + namespace="test_namespace", + properties={"key1": "value1", "key2": "value2"}, + ) + + iceberg_dict = iceberg_format.to_dict() + iceberg_restored = IcebergFormat.from_dict(iceberg_dict) + + assert iceberg_restored.format_type == iceberg_format.format_type + assert iceberg_restored.catalog == iceberg_format.catalog + assert iceberg_restored.namespace == iceberg_format.namespace + assert iceberg_restored.properties == iceberg_format.properties + + # Test Delta + delta_format = DeltaFormat( + checkpoint_location="s3://bucket/checkpoints", + properties={"key": "value"}, + ) + + delta_dict = delta_format.to_dict() + delta_restored = DeltaFormat.from_dict(delta_dict) + + assert delta_restored.format_type == delta_format.format_type + assert delta_restored.properties == delta_format.properties + assert delta_restored.checkpoint_location == delta_format.checkpoint_location + + # Test Hudi + hudi_format = HudiFormat( + table_type="MERGE_ON_READ", + record_key="uuid", + precombine_field="ts", + ) + + hudi_dict = hudi_format.to_dict() + hudi_restored = HudiFormat.from_dict(hudi_dict) + + assert hudi_restored.format_type == hudi_format.format_type + assert hudi_restored.table_type == hudi_format.table_type + assert hudi_restored.record_key == hudi_format.record_key + assert hudi_restored.precombine_field == hudi_format.precombine_field + + def test_factory_function(self): + """Test create_table_format factory function.""" + # Test Iceberg + iceberg_format = create_table_format( + TableFormatType.ICEBERG, + catalog="test_catalog", + namespace="test_ns", + ) + assert isinstance(iceberg_format, IcebergFormat) + assert iceberg_format.catalog == "test_catalog" + assert iceberg_format.namespace == "test_ns" + + # Test Delta + delta_format = create_table_format( + TableFormatType.DELTA, + checkpoint_location="s3://test", + ) + assert isinstance(delta_format, DeltaFormat) + assert delta_format.checkpoint_location == "s3://test" + + # Test Hudi + hudi_format = create_table_format( + TableFormatType.HUDI, + table_type="COPY_ON_WRITE", + ) + assert isinstance(hudi_format, HudiFormat) + assert hudi_format.table_type == "COPY_ON_WRITE" + + # Test invalid format type + with pytest.raises(ValueError, match="Unknown table format type"): + create_table_format("invalid_format") + + def test_table_format_from_dict(self): + """Test table_format_from_dict function.""" + # Test Iceberg + iceberg_dict = { + "format_type": "iceberg", + "catalog": "test_catalog", + "namespace": "test_namespace", + "properties": {"key1": "value1", "key2": "value2"}, + } + iceberg_format = table_format_from_dict(iceberg_dict) + assert isinstance(iceberg_format, IcebergFormat) + assert iceberg_format.catalog == "test_catalog" + + # Test Delta + delta_dict = { + "format_type": "delta", + "properties": {"key": "value"}, + "checkpoint_location": "s3://bucket/checkpoints", + } + delta_format = table_format_from_dict(delta_dict) + assert isinstance(delta_format, DeltaFormat) + assert delta_format.checkpoint_location == "s3://bucket/checkpoints" + + # Test Hudi + hudi_dict = { + "format_type": "hudi", + "table_type": "MERGE_ON_READ", + "record_key": "id", + "precombine_field": "ts", + "properties": {}, + } + hudi_format = table_format_from_dict(hudi_dict) + assert isinstance(hudi_format, HudiFormat) + assert hudi_format.table_type == "MERGE_ON_READ" + + # Test invalid format type + with pytest.raises(ValueError, match="Unknown table format type"): + table_format_from_dict({"format_type": "invalid"}) + + def test_table_format_from_json(self): + """Test table_format_from_json function.""" + iceberg_dict = { + "format_type": "iceberg", + "catalog": "test_catalog", + "namespace": "test_namespace", + "properties": {}, + } + json_str = json.dumps(iceberg_dict) + iceberg_format = table_format_from_json(json_str) + + assert isinstance(iceberg_format, IcebergFormat) + assert iceberg_format.catalog == "test_catalog" + assert iceberg_format.namespace == "test_namespace" + + def test_table_format_error_handling(self): + """Test error handling in table format operations.""" + + # Test invalid format type - create mock enum value + class MockFormat: + value = "invalid_format" + + with pytest.raises(ValueError, match="Unknown table format type"): + create_table_format(MockFormat()) + + # Test invalid format type in from_dict + with pytest.raises(ValueError, match="Unknown table format type"): + table_format_from_dict({"format_type": "invalid"}) + + # Test missing format_type + with pytest.raises(KeyError): + table_format_from_dict({}) + + # Test invalid JSON + with pytest.raises(json.JSONDecodeError): + table_format_from_json("invalid json") + + def test_table_format_property_edge_cases(self): + """Test edge cases for table format properties.""" + iceberg_format = IcebergFormat() + + # Test property overwriting + iceberg_format.set_property("snapshot-id", "123") + assert iceberg_format.get_property("snapshot-id") == "123" + iceberg_format.set_property("snapshot-id", "456") + assert iceberg_format.get_property("snapshot-id") == "456" + + # Test empty properties + delta_format = DeltaFormat(properties=None) + assert len(delta_format.properties) == 0 + + # Test None values in constructors + hudi_format = HudiFormat( + table_type=None, + record_key=None, + precombine_field=None, + properties=None, + ) + assert hudi_format.table_type is None + assert hudi_format.record_key is None + assert hudi_format.precombine_field is None + + def test_hudi_format_comprehensive(self): + """Test comprehensive Hudi format functionality.""" + # Test with all properties + hudi_format = HudiFormat( + table_type="COPY_ON_WRITE", + record_key="id,uuid", + precombine_field="ts", + properties={"custom.prop": "value"}, + ) + + assert ( + hudi_format.get_property("hoodie.datasource.write.table.type") + == "COPY_ON_WRITE" + ) + assert ( + hudi_format.get_property("hoodie.datasource.write.recordkey.field") + == "id,uuid" + ) + assert ( + hudi_format.get_property("hoodie.datasource.write.precombine.field") == "ts" + ) + assert hudi_format.get_property("custom.prop") == "value" + + # Test serialization roundtrip with complex data + serialized = hudi_format.to_dict() + restored = HudiFormat.from_dict(serialized) + assert restored.table_type == hudi_format.table_type + assert restored.record_key == hudi_format.record_key + assert restored.precombine_field == hudi_format.precombine_field + + def test_table_format_with_special_characters(self): + """Test table formats with special characters and edge values.""" + # Test with unicode and special characters + iceberg_format = IcebergFormat( + catalog="测试目录", # Chinese + namespace="тест_ns", # Cyrillic + properties={"special.key": "value with spaces & symbols!@#$%^&*()"}, + ) + + # Serialization roundtrip should preserve special characters + serialized = iceberg_format.to_dict() + restored = IcebergFormat.from_dict(serialized) + assert restored.catalog == "测试目录" + assert restored.namespace == "тест_ns" + assert ( + restored.properties["special.key"] + == "value with spaces & symbols!@#$%^&*()" + )