Skip to content

[adapters] Delta input: revamp error handling and retry logic.#6015

Merged
ryzhyk merged 1 commit intomainfrom
delta_input_retry
Apr 12, 2026
Merged

[adapters] Delta input: revamp error handling and retry logic.#6015
ryzhyk merged 1 commit intomainfrom
delta_input_retry

Conversation

@ryzhyk
Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk commented Apr 10, 2026

The connector already had retry logic in some places, but mostly relied on delta-rs for retries. This wasn't always enough and we saw timeouts and expired token errors bubbling up.

This commit adds retry loops around all object store accesses. The loops are controlled by the new max_retries setting, similar to the output connector. By default, it will retry forever. The retry loops set health status to UNHEALTHY while retrying.

If the pipeline is stopped and restarted during a retry, the connector resumes from the last successfully ingested table version. After exhausting retry attempts the connector fails permanently with a fatal error, which eliminates the possibility of data loss.

There is an important caveat:

Because retries may occur after partial progress (e.g., after partially processing a Delta log entry), the same data may be ingested more than once. This is consistent with the connector’s at-least-once delivery guarantee.

Describe Manual Test Plan

Tested using @anandbraman 's e-commerce demo pipeline.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

In the past if the connector wasn't able to read a table version, it signaled an error and moved to the next version. This could cause data loss. With this change the connector will either retry forever or fail and stop producing input after exhausting retry attempts.

The second behavioral change is that the connector can now produce duplicate inputs even without a pipeline restart.

@ryzhyk ryzhyk requested a review from swanandx April 10, 2026 00:40
@ryzhyk ryzhyk added the connectors Issues related to the adapters/connectors crate label Apr 10, 2026
@ryzhyk ryzhyk force-pushed the delta_input_retry branch from 0f16560 to 03479af Compare April 10, 2026 00:48
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One blocker: please squash the commit. Dirty history is still a hard no for ready PRs.

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be explicit: the commit is 2f741c2f with subject [ci] apply automatic fixes. Please squash it into the main commit before merge.

@ryzhyk ryzhyk force-pushed the delta_input_retry branch from 2f741c2 to 6b18d0e Compare April 10, 2026 03:45
@ryzhyk
Copy link
Copy Markdown
Contributor Author

ryzhyk commented Apr 10, 2026

One blocker: please squash the commit. Dirty history is still a hard no for ready PRs.

It's literally a single commit 🤷‍♂️

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ryzhyk ryzhyk force-pushed the delta_input_retry branch from 6b18d0e to e0be55e Compare April 10, 2026 07:12
Copy link
Copy Markdown
Contributor

@swanandx swanandx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this thoroughly, but LGTM

| `checkpoint_interval` | <p>Checkpoint interval (i.e., the number of commits after which a new checkpoint should be created) for newly created Delta tables.</p><p>The option is only available when creating the Delta table (`mode = append` and there is no existing table at the target location or `mode = truncate`). It configures the `checkpointInterval` table property, which determines the number of commits after which a new checkpoint should be created.</p><p>0 means no checkpoints are created.</p><p>Default: 10.</p>|
| `max_retries`|<p>Maximum number of retries for failed Delta Lake operations like writing Parquet files and committing transactions.</p><p>The connector performs retries on several levels: individual S3 operations, Delta Lake transaction commits, and overall operation retries. This setting controls the overall operation retries. When a write to the table fails, because of an S3 timeout or any other reason that was not resolved by lower-level retries, the connector will retry the entire operation.</p><p>When not specified, the connector performs infinite retries. When set to 0, the connector doesn't retry failed operations.</p>|
| `threads` | Number of parallel threads used by the connector. Increasing this value can improve Delta Lake write throughput by enabling concurrent writes. Default: `1`. |
| `max_retries`| |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a mistake

Suggested change
| `max_retries`| |

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

input_stream: &mut dyn ArrowStream,
receiver: &mut Receiver<PipelineState>,
transaction: &Option<Option<String>>,
) -> Result<usize, String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return String here? AnyError doesn't work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnyError would work, but we only use it to form an error message one level up the stack, so String seems more straightforward.

Comment on lines +147 to +148
result.sort();
result.dedup();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there be unexpected duplicates in result that we fail to catch now due to calling dedup?

if yes, shall we call dedup based on inject_failure was used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good idea.

@ryzhyk ryzhyk force-pushed the delta_input_retry branch from e0be55e to 4df22f2 Compare April 10, 2026 15:41
The connector already had retry logic in some places, but mostly relied on
delta-rs for retries. This wasn't always enough and we saw timeouts and expired
token errors bubbling up.

This commit adds retry loops around all object store accesses. The loops are
controlled by the new `max_retries` setting, similar to the output connector.
By default, it will retry forever. The retry loops set health status to UNHEALTHY
while retrying.

If the pipeline is stopped and restarted during a retry, the connector resumes
from the last successfully ingested table version.  After exhausting retry
attempts the connector fails permanently with a fatal error, which eliminates
the possibility of data loss.

There is an important caveat:

Because retries may occur after partial progress (e.g., after partially
processing a Delta log entry), the same data may be ingested more than once.
This is consistent with the connector’s at-least-once delivery guarantee.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk force-pushed the delta_input_retry branch from 4df22f2 to 544e802 Compare April 10, 2026 15:52
@ryzhyk ryzhyk added this pull request to the merge queue Apr 12, 2026
Merged via the queue into main with commit c0f7eae Apr 12, 2026
38 checks passed
@ryzhyk ryzhyk deleted the delta_input_retry branch April 12, 2026 21:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants