Skip to content

[connectors] Add custom columns to PG output.#6047

Merged
ryzhyk merged 2 commits intomainfrom
postgres_extra_columns
Apr 16, 2026
Merged

[connectors] Add custom columns to PG output.#6047
ryzhyk merged 2 commits intomainfrom
postgres_extra_columns

Conversation

@ryzhyk
Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk commented Apr 15, 2026

This commit addresses a feature request from a user to have the Postgres connector write additional columns containing user-defined values to the destination Postgres table. The values can change at runtime.

It consists of:

  • A new extra_columns connector config option that lists extra user-defined columns. The columns are currently assumed to be of type VARCHAR.
  • A mechanism to invoke custom connector-specific commands on output connectors: POST /views/<view_name>/connectors/<connector_name>/command. The command takes and returns arbitrary JSON objects. The semantics of the operation is determined by the connector.
  • The set_extra_columns command for the Postgres output connector. The command updates values of a subset of extra columns. Outputs produced by the connector after receiving the command will contain the newly assigned values.

Example assigning an extra column value:

curl -X POST 'http://localhost:8080/v0/pipelines/extra_columns/views/v/connectors/unnamed-0/command' -H 'Content-Type: application/json' -d '{"set_extra_columns": {"extra_column1": "foo"}}'

Caveats:

  • Until extra_columns are assigned values, the connector will write NULLs to these columns.
  • The current values of extra_columns are not checkpointed, and need to be assigned afresh after a restart or failover.

My initial attempt at this added a mechanism to modify connector configuration at runtime, which has the advantage that the last values assigned to extra_columns get checkpointed. However the semantics of this became very messy, with two different sources of truth for connector config after a restart (SQL code and the checkpointed state).

This commit doesn't include docs. This isn't a particularly elegant feature and I'd like to see some user validation before we make it official.

Describe Manual Test Plan

Checked that commands similar to the one listed above work for a simple local pipeline.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

@ryzhyk ryzhyk requested a review from abhizer April 15, 2026 15:54
@ryzhyk ryzhyk added the connectors Issues related to the adapters/connectors crate label Apr 15, 2026
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two blockers, see inline.

/// These connector can write user-defined values, configured using the `set_extra_columns` command,
/// to these columns.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub extra_columns: Vec<String>,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a connector behavior/config change, but the PR adds no integration coverage. Per our connector rule, changes like this need an end-to-end test that exercises the new path against Postgres rather than only unit/manual testing. Please add an integration test that drives extra_columns through actual writes (including the runtime update command), and make the test schema cover the full Feldera SQL type matrix instead of just a narrow subset.

Ok(HttpResponse::Ok().json(state.controller()?.output_endpoint_status(&path)?))
}

#[post("/output_endpoints/{endpoint_name}/command")]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a new user-visible connector API surface (/command) on top of the new extra_columns config, but the PR explicitly leaves both docs and changelog unchecked. That is too stealthy for a connector feature. Please document the new config and command semantics in docs.feldera.com -- including the NULL-before-assignment behavior and the fact that assigned values are not checkpointed across restart/failover -- and add a changelog entry once the feature is meant to ship.

/// The idea is that connectors that support custom commands create separate command
/// handler objects that implement this trait and are returned by
/// `OutputEndpoint::command_handler`.
pub trait CommandHandler: Send + Sync {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, Turing-complete connectors on the horizon!

request_body(
content = Object,
content_type = "text/json",
description = "Command to send to the output connector"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only output?
could this be useful for input connectors?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but none of them support any commands yet, so this can wait until it's actually needed. In the meantime we'll see if output connector commands are useful.

/// The names of the extra columns in the Postgres table that are not part of the view schema.
///
/// These connector can write user-defined values, configured using the `set_extra_columns` command,
/// to these columns.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word "command" is very general. You have to qualify it, at least "connector command". When there is documentation there should be a link to it probably.

fn output_endpoint_command(
&self,
endpoint_name: &str,
command: serde_json::Value,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the command includes the "verb" and the data as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

fn encode_cursor(
&mut self,
cursor: &mut dyn SerCursor,
extra_columns: BTreeMap<String, Option<String>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a more general API which gets a closure that can mutate the buffer be more appropriate? I can't imagine stuffing here more and more arguments that may be needed by future commands.

return Ok(());
}

buf.pop(); // Remove the trailing '}'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An assertion that it really is } would be useful

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the fact this works on a char stream is somewhat unpleasant, a higher-level abstraction (a record?) would be better, but maybe that's too expensive or impossible at this layer.

let value_sql_names: HashSet<String> = value_schema
.fields
.iter()
.map(|f| f.name.name().to_lowercase())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowercase?
This ignores quoting, but I suppose it's safe

This commit addresses a feature request from a user to have the Postgres
connector write additional columns containing user-defined values, which can
change at runtime.

It consists of:

- A new `extra_columns` connector config option that lists extra user-defined
  columns.  The columns are currently assumed to be of type `VARCHAR`.
- A mechanism to invoke custom connector-specific commands on output
  connectors: `POST /views/<view_name>/connectors/<connector_name>/command`.
  The command takes and returns arbitrary JSON objects. The semantics of the
  operation is determined by the connector.
- The `set_extra_columns` command for the Postgres output connector. The command
  updates values of a subset of extra columns. Outputs produced by the
  connector after receiving the command will contain the newly assigned values.

Example assigning an extra column value:

```
curl -X POST 'http://localhost:8080/v0/pipelines/extra_columns/views/v/connectors/unnamed-0/command' -H 'Content-Type: application/json' -d '{"set_extra_columns": {"extra_column1": "foo"}}'
```

Caveats:
- Until extra_columns are assigned values, the connector will write NULLs to
  these columns.
- The current values of extra_columns are not checkpointed, and need to be
  assigned afresh after a restart or failover.

My initial attempt at this added a mechanism to modify connector configuration
at runtime, which has the advantage that the last values assigned to extra_columns
get checkpointed. However the semantics of this became very messy, with
two different sources of truth for connector config after a restart (SQL code and
the checkpointed state).

This commit doesn't include docs. This isn't a particularly elegant feature and
I'd like to see some user validation before we make it official.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk force-pushed the postgres_extra_columns branch from 8ea9252 to f0f0dc0 Compare April 16, 2026 04:08
@ryzhyk ryzhyk enabled auto-merge April 16, 2026 04:08
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@ryzhyk ryzhyk added this pull request to the merge queue Apr 16, 2026
Merged via the queue into main with commit 42775ce Apr 16, 2026
1 check passed
@ryzhyk ryzhyk deleted the postgres_extra_columns branch April 16, 2026 06:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants