Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database triggers #4860

Open
wants to merge 57 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
6315ace
feat: init database triggers
dieriba Nov 13, 2024
3a41d0c
Merge branch 'main' into database_triggers
dieriba Nov 17, 2024
aa298c8
feat: :sparkles: wip: database_triggers
dieriba Nov 17, 2024
80cc8e0
feat: :sparkles: add database triggers front view
dieriba Nov 17, 2024
3e367e4
feat: :construction: database_triggers
dieriba Nov 17, 2024
c70ca15
feat: :construction: add definition in yaml, updated backend code and…
dieriba Nov 17, 2024
68c3a3c
feat: :construction: updated migration file, update openapi.yml, upda…
dieriba Nov 17, 2024
bc3e19b
fix: struct rust
dieriba Nov 24, 2024
9a1ad78
feat: :construction: update migrate, database trigger backend functio…
dieriba Nov 24, 2024
fa456db
feat: :construction: add resource picker front, update backend function
dieriba Nov 26, 2024
cdb272c
feat: :construction: edit inner database inner, update triggers
dieriba Nov 26, 2024
3cdf0a2
feat: :construction: database_triggers
dieriba Nov 27, 2024
fabe469
feat: :construction: update openapi yaml, prettied websocker trigger
dieriba Nov 30, 2024
64e9938
feat: :construction: database_triggers
dieriba Dec 1, 2024
cdfad9c
feat: :construction: add resource module, update variable file, worki…
dieriba Dec 2, 2024
378b3ed
feat: :bug: working sqlx query
dieriba Dec 2, 2024
7300370
feat: :construction: fix query with sqlx, added main loop
dieriba Dec 3, 2024
9751ffe
feat: :construction: add new column database_trigg
dieriba Dec 3, 2024
939e4a1
feat: :passport_control: run jobs works
dieriba Dec 3, 2024
716c18e
feat: :construction: handling slot name and replication
dieriba Dec 3, 2024
8b8a3e7
feat: restructring triggers, decoding trigger message on work
dieriba Dec 5, 2024
a864b03
feat: :construction: database_trigger
dieriba Dec 7, 2024
18b96cc
feat: :construction:
dieriba Dec 8, 2024
23d942c
feat: :construction: converter done, work on custom script
dieriba Dec 11, 2024
da4264e
feat: :construction: multiple trigger
dieriba Dec 12, 2024
51c807a
feat: :construction: adding new argument function
dieriba Dec 12, 2024
0928c8e
feat: :construction: database_trigger
dieriba Dec 12, 2024
f8814a1
feat: :construction: add generate template for front, update script p…
dieriba Dec 16, 2024
e41965a
feat: :construction: template script fix bug, work on restructing bac…
dieriba Dec 16, 2024
0215d5a
feat: :construction: update autogenerated script, add persistence sta…
dieriba Dec 17, 2024
d85e25f
feat: :construction: update structure client
dieriba Dec 19, 2024
5813063
feat: :construction: rewrited crud function
dieriba Dec 20, 2024
86877af
feat: :construction: added publication handler
dieriba Dec 21, 2024
69dcf6d
feat: :construction: new ui finished
dieriba Dec 21, 2024
c0f624a
feat: :construction: added slot function hanlder finish front ux
dieriba Dec 22, 2024
c1b5c6d
feat: :sparkles: ux improvement done, backend logic done
dieriba Dec 22, 2024
351a6b7
feat: :bug: fix where clause
dieriba Dec 22, 2024
7cb0b6c
Merge branch 'main' into database_triggers
dieriba Dec 24, 2024
348922d
feat:
dieriba Dec 24, 2024
7011cda
feat:
dieriba Dec 25, 2024
e431b28
chore: update .sqlx and remove empty package
dieriba Dec 25, 2024
8d46b20
Merge branch 'main' into database_triggers
dieriba Dec 26, 2024
c30d032
fix: update publication and remove unneccessary print
dieriba Dec 28, 2024
f85ec3f
feat: finish converter to json, remove save button, remove unused crate
dieriba Dec 29, 2024
01b6ff3
feat: update migration for database trigger, fixed query, fixed front…
dieriba Dec 30, 2024
230034b
chore: update .sqlx
dieriba Dec 30, 2024
a0ca315
Merge branch 'main' into database_triggers
dieriba Dec 30, 2024
eaf47f5
chore: update sqlx
dieriba Dec 30, 2024
da30536
chore: .sqlx
dieriba Dec 30, 2024
66e305d
chore: add unused
dieriba Dec 30, 2024
681f193
Merge branch 'main' into database_triggers
dieriba Jan 7, 2025
4c05214
fix: use right database resource in back and front
dieriba Jan 8, 2025
d183b59
Merge branch 'main' into database_triggers
dieriba Jan 8, 2025
b44263c
fix build
HugoCasa Jan 9, 2025
c26c0f8
Merge remote-tracking branch 'origin/main' into database_triggers
HugoCasa Jan 9, 2025
81d3ec8
refactor:
dieriba Jan 10, 2025
beed5fd
fix sqlx
HugoCasa Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ mysql = ["windmill-worker/mysql"]
mssql = ["windmill-worker/mssql"]
bigquery = ["windmill-worker/bigquery"]
websocket = ["windmill-api/websocket"]
database = ["windmill-api/database"]
python = ["windmill-worker/python"]
smtp = ["windmill-api/smtp", "windmill-common/smtp"]
csharp = ["windmill-worker/csharp"]
Expand Down
5 changes: 3 additions & 2 deletions backend/migrations/20241123152203_database_triggers.up.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
-- Add up migration script here
CREATE TYPE transaction AS ENUM ('Insert', 'Update', 'Delete');
CREATE TABLE database_trigger(
path VARCHAR(255) NOT NULL,
script_path VARCHAR(255) NOT NULL,
Expand All @@ -16,5 +15,7 @@ CREATE TABLE database_trigger(
replication_slot_name VARCHAR(255) NOT NULL,
publication_name VARCHAR(255) NOT NULL,
enabled BOOLEAN NOT NULL,
CONSTRAINT PK_database_trigger PRIMARY KEY (path,workspace_id)
CONSTRAINT PK_database_trigger PRIMARY KEY (path,workspace_id),
CONSTRAINT fk_database_trigger_workspace FOREIGN KEY (workspace_id)
REFERENCES workspace(id) ON DELETE CASCADE
);
13 changes: 7 additions & 6 deletions backend/windmill-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ zip = ["dep:async_zip"]
oauth2 = ["dep:async-oauth2"]
http_trigger = ["dep:matchit"]
static_frontend = ["dep:rust-embed"]
database = ["dep:rust-postgres", "dep:pg_escape", "dep:byteorder", "dep:memchr", "dep:thiserror", "dep:rust_decimal"]

[dependencies]
windmill-queue.workspace = true
Expand Down Expand Up @@ -104,13 +105,13 @@ matchit = { workspace = true, optional = true }
tokio-tungstenite = { workspace = true, optional = true}
rdkafka = { workspace = true, optional = true }
const_format.workspace = true
rust-postgres.workspace = true
pin-project.workspace = true
http.workspace = true
async-stream.workspace = true
ulid.workspace = true
pg_escape.workspace = true
byteorder.workspace = true
memchr.workspace = true
thiserror.workspace = true
rust_decimal.workspace = true
rust-postgres = { workspace = true, optional = true }
pg_escape = { workspace = true, optional = true }
byteorder = { workspace = true, optional = true }
memchr = { workspace = true, optional = true }
thiserror = { workspace = true, optional = true }
rust_decimal = { workspace = true, optional = true }
2 changes: 1 addition & 1 deletion backend/windmill-api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8005,7 +8005,7 @@ paths:
schema:
type: string

/w/{workspace}/database_triggers/get-template-script:
/w/{workspace}/database_triggers/get_template_script:
post:
summary: get template script
operationId: getTemplateScript
Expand Down
93 changes: 68 additions & 25 deletions backend/windmill-api/src/database_triggers/handler.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're missing auth for a bunch of these APIs
also you should use authed to get the resource with get_resource_value_interpolated_internal

Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ use windmill_common::error::Error;
use windmill_common::{
db::UserDB,
error::{self, JsonResult},
resource::get_resource,
utils::{not_found_if_none, paginate, Pagination, StripPath},
variables::get_variable_or_self,
worker::CLOUD_HOSTED,
};

use crate::{
database_triggers::mapper::{Mapper, MappingInfo},
db::{ApiAuthed, DB},
resources::get_resource_value_interpolated_internal,
};

#[derive(FromRow, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -228,19 +227,36 @@ pub struct DatabaseTrigger {
}

pub async fn get_database_resource(
authed: ApiAuthed,
user_db: UserDB,
db: &DB,
database_resource_path: &str,
w_id: &str,
) -> Result<Database, Error> {
let mut resource = get_resource::<Database>(db, database_resource_path, w_id)
.await
.map_err(Error::SqlErr)?;

if !resource.value.password.is_empty() {
resource.value.password = get_variable_or_self(resource.value.password, db, w_id).await?;
}
let resource = get_resource_value_interpolated_internal(
&authed,
Some(user_db),
&db,
&w_id,
&database_resource_path,
None,
"",
)
.await
.map_err(|_| Error::NotFound("Database resource do not exist".to_string()))?;

let resource = match resource {
Some(resource) => serde_json::from_value::<Database>(resource).map_err(Error::SerdeJson)?,
None => {
return {
Err(Error::NotFound(
"Database resource do not exist".to_string(),
))
}
}
};

Ok(resource.value)
Ok(resource)
}

#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -453,10 +469,13 @@ pub struct SlotList {
}

pub async fn list_slot_name(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, database_resource_path)): Path<(String, String)>,
) -> error::Result<Json<Vec<SlotList>>> {
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

Expand Down Expand Up @@ -485,11 +504,14 @@ pub struct Slot {
}

pub async fn create_slot(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, database_resource_path)): Path<(String, String)>,
Json(Slot { name }): Json<Slot>,
) -> error::Result<String> {
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

Expand All @@ -508,11 +530,14 @@ pub async fn create_slot(
}

pub async fn drop_slot_name(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, database_resource_path)): Path<(String, String)>,
Json(Slot { name }): Json<Slot>,
) -> error::Result<String> {
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

Expand All @@ -527,10 +552,13 @@ struct PublicationName {
}

pub async fn list_database_publication(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, database_resource_path)): Path<(String, String)>,
) -> error::Result<Json<Vec<String>>> {
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

Expand All @@ -550,10 +578,13 @@ pub async fn list_database_publication(
}

pub async fn get_publication_info(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, publication_name, database_resource_path)): Path<(String, String, String)>,
) -> error::Result<Json<PublicationData>> {
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

Expand Down Expand Up @@ -649,13 +680,16 @@ async fn new_publication(
}

pub async fn create_publication(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, publication_name, database_resource_path)): Path<(String, String, String)>,
Json(publication_data): Json<PublicationData>,
) -> error::Result<String> {
let PublicationData { table_to_track, transaction_to_track } = publication_data;

let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

Expand Down Expand Up @@ -686,10 +720,14 @@ async fn drop_publication(
}

pub async fn delete_publication(
Path((w_id, publication_name, database_resource_path)): Path<(String, String, String)>,
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, publication_name, database_resource_path)): Path<(String, String, String)>,
) -> error::Result<String> {
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

drop_publication(&publication_name, &mut connection).await?;
Expand All @@ -701,12 +739,16 @@ pub async fn delete_publication(
}

pub async fn alter_publication(
Path((w_id, publication_name, database_resource_path)): Path<(String, String, String)>,
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path((w_id, publication_name, database_resource_path)): Path<(String, String, String)>,
Json(publication_data): Json<PublicationData>,
) -> error::Result<String> {
let PublicationData { table_to_track, transaction_to_track } = publication_data;
let database = get_database_resource(&db, &database_resource_path, &w_id).await?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut connection = get_raw_postgres_connection(&database).await?;

let (all_table, _) =
Expand Down Expand Up @@ -1124,6 +1166,8 @@ pub async fn set_enabled(
}

pub async fn get_template_script(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
Extension(db): Extension<DB>,
Path(w_id): Path<String>,
Json(template_script): Json<TemplateScript>,
Expand All @@ -1132,16 +1176,15 @@ pub async fn get_template_script(

let TemplateScript { database_resource_path, relations, language } = template_script;
if relations.is_none() {
return Err(error::Error::BadRequest(
return Err(Error::BadRequest(
"You must at least choose schema to fetch table from".to_string(),
));
}

let resource = get_resource::<Database>(&db, &database_resource_path, &w_id)
.await
.map_err(|_| Error::NotFound("Database resource do not exist".to_string()))?;
let database =
get_database_resource(authed, user_db, &db, &database_resource_path, &w_id).await?;

let mut pg_connection = get_raw_postgres_connection(&resource.value).await?;
let mut pg_connection = get_raw_postgres_connection(&database).await?;

#[derive(Debug, FromRow, Deserialize)]
struct ColumnInfo {
Expand Down
30 changes: 24 additions & 6 deletions backend/windmill-api/src/database_triggers/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, pin::Pin};

use crate::{
database_triggers::{
handler::get_database_resource,
relation::RelationConverter,
replication_message::{
LogicalReplicationMessage::{Begin, Commit, Delete, Insert, Relation, Type, Update},
Expand All @@ -19,7 +18,7 @@ use pg_escape::{quote_identifier, quote_literal};
use rand::seq::SliceRandom;
use rust_postgres::{Client, Config, CopyBothDuplex, NoTls, SimpleQueryMessage};
use serde_json::to_value;
use windmill_common::{worker::to_raw_value, INSTANCE_NAME};
use windmill_common::{variables::get_variable_or_self, worker::to_raw_value, INSTANCE_NAME};

use super::{
handler::{Database, DatabaseTrigger},
Expand Down Expand Up @@ -210,13 +209,32 @@ async fn listen_to_transactions(
db: DB,
mut killpill_rx: tokio::sync::broadcast::Receiver<()>,
) -> Result<(), Error> {
let resource = get_database_resource(
&db,
let resource = sqlx::query_scalar!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should also use get_resource_value_interpolated here. You can use fetch_api_authed with the trigger editor/email fields

"SELECT value from resource WHERE path = $1 AND workspace_id = $2",
&database_trigger.database_resource_path,
&database_trigger.workspace_id,
&database_trigger.workspace_id
)
.fetch_optional(&db)
.await
.map_err(Error::Common)?;
.map_err(|e| Error::Common(windmill_common::error::Error::SqlErr(e)))?
.flatten();

let mut resource = match resource {
Some(resource) => serde_json::from_value::<Database>(resource)
.map_err(|e| Error::Common(windmill_common::error::Error::SerdeJson(e)))?,
None => {
return {
Err(Error::Common(windmill_common::error::Error::NotFound(
"Database resource do not exist".to_string(),
)))
}
}
};

resource.password =
get_variable_or_self(resource.password, &db, &database_trigger.workspace_id)
.await
.map_err(Error::Common)?;

let client = PostgresSimpleClient::new(&resource).await?;

Expand Down
21 changes: 15 additions & 6 deletions backend/windmill-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mod auth;
mod capture;
mod concurrency_groups;
mod configs;
#[cfg(feature = "database")]
mod database_triggers;
mod db;
mod drafts;
Expand Down Expand Up @@ -287,8 +288,11 @@ pub async fn run_server(
let kafka_killpill_rx = rx.resubscribe();
kafka_triggers_ee::start_kafka_consumers(db.clone(), kafka_killpill_rx).await;
}
let db_killpill_rx = rx.resubscribe();
database_triggers::start_database(db.clone(), db_killpill_rx).await;
#[cfg(feature = "database")]
{
let db_killpill_rx = rx.resubscribe();
database_triggers::start_database(db.clone(), db_killpill_rx).await;
}
}

// build our application with a route
Expand Down Expand Up @@ -358,10 +362,15 @@ pub async fn run_server(
Router::new()
})
.nest("/kafka_triggers", kafka_triggers_service)
.nest(
"/database_triggers",
database_triggers::workspaced_service(),
),
.nest("/database_triggers", {
#[cfg(feature = "database")]
{
database_triggers::workspaced_service()
}

#[cfg(not(feature = "database"))]
Router::new()
}),
)
.nest("/workspaces", workspaces::global_service())
.nest(
Expand Down
1 change: 0 additions & 1 deletion backend/windmill-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub mod tracing_init;
pub mod users;
pub mod utils;
pub mod variables;
pub mod resource;
pub mod worker;
pub mod workspaces;

Expand Down
Loading
Loading