[adapters] Delta input: revamp error handling and retry logic.#6015
[adapters] Delta input: revamp error handling and retry logic.#6015
Conversation
0f16560 to
03479af
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
One blocker: please squash the commit. Dirty history is still a hard no for ready PRs.
mythical-fred
left a comment
There was a problem hiding this comment.
To be explicit: the commit is 2f741c2f with subject [ci] apply automatic fixes. Please squash it into the main commit before merge.
2f741c2 to
6b18d0e
Compare
It's literally a single commit 🤷♂️ |
6b18d0e to
e0be55e
Compare
swanandx
left a comment
There was a problem hiding this comment.
I do not understand this thoroughly, but LGTM
| | `checkpoint_interval` | <p>Checkpoint interval (i.e., the number of commits after which a new checkpoint should be created) for newly created Delta tables.</p><p>The option is only available when creating the Delta table (`mode = append` and there is no existing table at the target location or `mode = truncate`). It configures the `checkpointInterval` table property, which determines the number of commits after which a new checkpoint should be created.</p><p>0 means no checkpoints are created.</p><p>Default: 10.</p>| | ||
| | `max_retries`|<p>Maximum number of retries for failed Delta Lake operations like writing Parquet files and committing transactions.</p><p>The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits, and overall operation retries. This setting controls the overall operation retries. When a write to the table fails, because of an S3 timeout or any other reason that was not resolved by lower-level retries, the connector will retry the entire operation.</p><p>When not specified, the connector performs infinite retries. When set to 0, the connector doesn't retry failed operations.</p>| | ||
| | `threads` | Number of parallel threads used by the connector. Increasing this value can improve Delta Lake write throughput by enabling concurrent writes. Default: `1`. | | ||
| | `max_retries`| | |
There was a problem hiding this comment.
this seems to be a mistake
| | `max_retries`| | |
| input_stream: &mut dyn ArrowStream, | ||
| receiver: &mut Receiver<PipelineState>, | ||
| transaction: &Option<Option<String>>, | ||
| ) -> Result<usize, String> { |
There was a problem hiding this comment.
why return String here? AnyError doesn't work?
There was a problem hiding this comment.
AnyError would work, but we only use it to form an error message one level up the stack, so String seems more straightforward.
| result.sort(); | ||
| result.dedup(); |
There was a problem hiding this comment.
can there be unexpected duplicates in result that we fail to catch now due to calling dedup?
if yes, shall we call dedup based on inject_failure was used?
e0be55e to
4df22f2
Compare
The connector already had retry logic in some places, but mostly relied on delta-rs for retries. This wasn't always enough and we saw timeouts and expired token errors bubbling up. This commit adds retry loops around all object store accesses. The loops are controlled by the new `max_retries` setting, similar to the output connector. By default, it will retry forever. The retry loops set health status to UNHEALTHY while retrying. If the pipeline is stopped and restarted during a retry, the connector resumes from the last successfully ingested table version. After exhausting retry attempts the connector fails permanently with a fatal error, which eliminates the possibility of data loss. There is an important caveat: Because retries may occur after partial progress (e.g., after partially processing a Delta log entry), the same data may be ingested more than once. This is consistent with the connector’s at-least-once delivery guarantee. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
4df22f2 to
544e802
Compare
The connector already had retry logic in some places, but mostly relied on delta-rs for retries. This wasn't always enough and we saw timeouts and expired token errors bubbling up.
This commit adds retry loops around all object store accesses. The loops are controlled by the new
max_retriessetting, similar to the output connector. By default, it will retry forever. The retry loops set health status to UNHEALTHY while retrying.If the pipeline is stopped and restarted during a retry, the connector resumes from the last successfully ingested table version. After exhausting retry attempts the connector fails permanently with a fatal error, which eliminates the possibility of data loss.
There is an important caveat:
Because retries may occur after partial progress (e.g., after partially processing a Delta log entry), the same data may be ingested more than once. This is consistent with the connector’s at-least-once delivery guarantee.
Describe Manual Test Plan
Tested using @anandbraman 's e-commerce demo pipeline.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes
In the past if the connector wasn't able to read a table version, it signaled an error and moved to the next version. This could cause data loss. With this change the connector will either retry forever or fail and stop producing input after exhausting retry attempts.
The second behavioral change is that the connector can now produce duplicate inputs even without a pipeline restart.