[connectors] Add custom columns to PG output.#6047
Conversation
| /// These connector can write user-defined values, configured using the `set_extra_columns` command, | ||
| /// to these columns. | ||
| #[serde(default, skip_serializing_if = "Vec::is_empty")] | ||
| pub extra_columns: Vec<String>, |
There was a problem hiding this comment.
This is a connector behavior/config change, but the PR adds no integration coverage. Per our connector rule, changes like this need an end-to-end test that exercises the new path against Postgres rather than only unit/manual testing. Please add an integration test that drives extra_columns through actual writes (including the runtime update command), and make the test schema cover the full Feldera SQL type matrix instead of just a narrow subset.
| Ok(HttpResponse::Ok().json(state.controller()?.output_endpoint_status(&path)?)) | ||
| } | ||
|
|
||
| #[post("/output_endpoints/{endpoint_name}/command")] |
There was a problem hiding this comment.
This introduces a new user-visible connector API surface (/command) on top of the new extra_columns config, but the PR explicitly leaves both docs and changelog unchecked. That is too stealthy for a connector feature. Please document the new config and command semantics in docs.feldera.com -- including the NULL-before-assignment behavior and the fact that assigned values are not checkpointed across restart/failover -- and add a changelog entry once the feature is meant to ship.
| /// The idea is that connectors that support custom commands create separate command | ||
| /// handler objects that implement this trait and are returned by | ||
| /// `OutputEndpoint::command_handler`. | ||
| pub trait CommandHandler: Send + Sync { |
There was a problem hiding this comment.
Hey, Turing-complete connectors on the horizon!
| request_body( | ||
| content = Object, | ||
| content_type = "text/json", | ||
| description = "Command to send to the output connector"), |
There was a problem hiding this comment.
why only output?
could this be useful for input connectors?
There was a problem hiding this comment.
Sure, but none of them support any commands yet, so this can wait until it's actually needed. In the meantime we'll see if output connector commands are useful.
| /// The names of the extra columns in the Postgres table that are not part of the view schema. | ||
| /// | ||
| /// These connector can write user-defined values, configured using the `set_extra_columns` command, | ||
| /// to these columns. |
There was a problem hiding this comment.
The word "command" is very general. You have to qualify it, at least "connector command". When there is documentation there should be a link to it probably.
| fn output_endpoint_command( | ||
| &self, | ||
| endpoint_name: &str, | ||
| command: serde_json::Value, |
There was a problem hiding this comment.
so the command includes the "verb" and the data as well
| fn encode_cursor( | ||
| &mut self, | ||
| cursor: &mut dyn SerCursor, | ||
| extra_columns: BTreeMap<String, Option<String>>, |
There was a problem hiding this comment.
Wouldn't a more general API which gets a closure that can mutate the buffer be more appropriate? I can't imagine stuffing here more and more arguments that may be needed by future commands.
| return Ok(()); | ||
| } | ||
|
|
||
| buf.pop(); // Remove the trailing '}' |
There was a problem hiding this comment.
An assertion that it really is } would be useful
There was a problem hiding this comment.
Also, the fact this works on a char stream is somewhat unpleasant, a higher-level abstraction (a record?) would be better, but maybe that's too expensive or impossible at this layer.
| let value_sql_names: HashSet<String> = value_schema | ||
| .fields | ||
| .iter() | ||
| .map(|f| f.name.name().to_lowercase()) |
There was a problem hiding this comment.
lowercase?
This ignores quoting, but I suppose it's safe
This commit addresses a feature request from a user to have the Postgres
connector write additional columns containing user-defined values, which can
change at runtime.
It consists of:
- A new `extra_columns` connector config option that lists extra user-defined
columns. The columns are currently assumed to be of type `VARCHAR`.
- A mechanism to invoke custom connector-specific commands on output
connectors: `POST /views/<view_name>/connectors/<connector_name>/command`.
The command takes and returns arbitrary JSON objects. The semantics of the
operation is determined by the connector.
- The `set_extra_columns` command for the Postgres output connector. The command
updates values of a subset of extra columns. Outputs produced by the
connector after receiving the command will contain the newly assigned values.
Example assigning an extra column value:
```
curl -X POST 'http://localhost:8080/v0/pipelines/extra_columns/views/v/connectors/unnamed-0/command' -H 'Content-Type: application/json' -d '{"set_extra_columns": {"extra_column1": "foo"}}'
```
Caveats:
- Until extra_columns are assigned values, the connector will write NULLs to
these columns.
- The current values of extra_columns are not checkpointed, and need to be
assigned afresh after a restart or failover.
My initial attempt at this added a mechanism to modify connector configuration
at runtime, which has the advantage that the last values assigned to extra_columns
get checkpointed. However the semantics of this became very messy, with
two different sources of truth for connector config after a restart (SQL code and
the checkpointed state).
This commit doesn't include docs. This isn't a particularly elegant feature and
I'd like to see some user validation before we make it official.
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
8ea9252 to
f0f0dc0
Compare
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
This commit addresses a feature request from a user to have the Postgres connector write additional columns containing user-defined values to the destination Postgres table. The values can change at runtime.
It consists of:
extra_columnsconnector config option that lists extra user-defined columns. The columns are currently assumed to be of typeVARCHAR.POST /views/<view_name>/connectors/<connector_name>/command. The command takes and returns arbitrary JSON objects. The semantics of the operation is determined by the connector.set_extra_columnscommand for the Postgres output connector. The command updates values of a subset of extra columns. Outputs produced by the connector after receiving the command will contain the newly assigned values.Example assigning an extra column value:
Caveats:
My initial attempt at this added a mechanism to modify connector configuration at runtime, which has the advantage that the last values assigned to extra_columns get checkpointed. However the semantics of this became very messy, with two different sources of truth for connector config after a restart (SQL code and the checkpointed state).
This commit doesn't include docs. This isn't a particularly elegant feature and I'd like to see some user validation before we make it official.
Describe Manual Test Plan
Checked that commands similar to the one listed above work for a simple local pipeline.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes