diff --git a/pom.xml b/pom.xml index a938fdef..89c8c303 100644 --- a/pom.xml +++ b/pom.xml @@ -343,6 +343,10 @@ true false true + + **/impl/*.java + **/*.aj + org.folio diff --git a/src/main/java/org/folio/dao/DbUtils.java b/src/main/java/org/folio/dao/DbUtils.java new file mode 100644 index 00000000..b964be1a --- /dev/null +++ b/src/main/java/org/folio/dao/DbUtils.java @@ -0,0 +1,13 @@ +package org.folio.dao; + +import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard; + +public class DbUtils { + + private static final String TABLE_NAME_TEMPLATE = "%s.%s"; + + public static String getTenantTableName(String tenantId, String tableName) { + return TABLE_NAME_TEMPLATE.formatted(convertToPsqlStandard(tenantId), tableName); + } + +} diff --git a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java new file mode 100644 index 00000000..a145b4ba --- /dev/null +++ b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java @@ -0,0 +1,18 @@ +package org.folio.dao.audit; + +import java.util.List; + +import org.folio.rest.jaxrs.model.OutboxEventLog; +import org.folio.rest.persist.Conn; + +import io.vertx.core.Future; + +public interface AuditOutboxEventLogDAO { + + Future> getEventLogs(Conn conn, String tenantId); + + Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId); + + Future deleteEventLogs(Conn conn, List eventIds, String tenantId); + +} diff --git a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java new file mode 100644 index 00000000..ac85ed9b --- /dev/null +++ b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java @@ -0,0 +1,72 @@ +package org.folio.dao.audit; + +import static org.folio.dao.DbUtils.getTenantTableName; + +import java.util.List; +import java.util.UUID; + +import org.folio.rest.jaxrs.model.OutboxEventLog; +import org.folio.rest.persist.Conn; +import org.folio.rest.persist.Criteria.Criterion; +import org.folio.rest.persist.Criteria.Limit; +import org.folio.rest.persist.interfaces.Results; + +import io.vertx.core.Future; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.Tuple; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO { + + public static final String OUTBOX_TABLE_NAME = "outbox_event_log"; + private static final String BATCH_DELETE = "DELETE from %s where event_id = ANY ($1)"; + + /** + * Get all event logs from outbox table. + * + * @param conn the sql connection from transaction + * @param tenantId the tenant id + * @return future of a list of fetched event logs + */ + public Future> getEventLogs(Conn conn, String tenantId) { + log.trace("getEventLogs:: Fetching event logs from outbox table for tenantId: '{}'", tenantId); + var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); + return conn.get(tableName, OutboxEventLog.class, new Criterion().setLimit(new Limit(1000))) + .map(Results::getResults) + .onFailure(t -> log.warn("getEventLogs:: Failed to fetch event logs for tenantId: '{}'", tenantId, t)); + } + + /** + * Saves event log to outbox table. + * + * @param conn the sql connection from transaction + * @param eventLog the event log to save + * @param tenantId the tenant id + * @return future of id of the inserted entity + */ + public Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) { + log.debug("saveEventLog:: Saving event log to outbox table with eventId: '{}'", eventLog.getEventId()); + var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); + return conn.save(tableName, eventLog) + .onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t)); + } + + /** + * Deletes outbox logs by event ids in batch. + * + * @param conn the sql connection from transaction + * @param eventIds the event ids to delete + * @param tenantId the tenant id + * @return future of row count for deleted records + */ + public Future deleteEventLogs(Conn conn, List eventIds, String tenantId) { + log.debug("deleteEventLogs:: Deleting outbox logs by event ids in batch: '{}'", eventIds); + var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); + var param = eventIds.stream().map(UUID::fromString).toArray(UUID[]::new); + return conn.execute(BATCH_DELETE.formatted(tableName), Tuple.of(param)) + .map(SqlResult::rowCount) + .onFailure(t -> log.warn("deleteEventLogs: Failed to delete event logs by ids: '{}'", eventIds, t)); + } + +} diff --git a/src/main/java/org/folio/dao/lock/InternalLockDAO.java b/src/main/java/org/folio/dao/lock/InternalLockDAO.java new file mode 100644 index 00000000..e1069686 --- /dev/null +++ b/src/main/java/org/folio/dao/lock/InternalLockDAO.java @@ -0,0 +1,11 @@ +package org.folio.dao.lock; + +import org.folio.rest.persist.Conn; + +import io.vertx.core.Future; + +public interface InternalLockDAO { + + Future selectWithLocking(Conn conn, String lockName, String tenantId); + +} diff --git a/src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java b/src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java new file mode 100644 index 00000000..170cf417 --- /dev/null +++ b/src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java @@ -0,0 +1,34 @@ +package org.folio.dao.lock; + +import static org.folio.dao.DbUtils.getTenantTableName; + +import org.folio.rest.persist.Conn; + +import io.vertx.core.Future; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.Tuple; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class InternalLockPostgresDAO implements InternalLockDAO { + + private static final String LOCK_TABLE_NAME = "internal_lock"; + private static final String SELECT_WITH_LOCKING = "SELECT * FROM %s WHERE lock_name = $1 FOR UPDATE"; + + /** + * Performs SELECT FOR UPDATE statement in order to implement locking. + * Lock released after the transaction is committed. + * + * @param conn connection with active transaction + * @param lockName the lock name + * @param tenantId the tenant id + * @return future with 1 row if lock was acquired + */ + public Future selectWithLocking(Conn conn, String lockName, String tenantId) { + log.debug("selectWithLocking:: Locking row with lockName: '{}'", lockName); + var tableName = getTenantTableName(tenantId, LOCK_TABLE_NAME); + return conn.execute(SELECT_WITH_LOCKING.formatted(tableName), Tuple.of(lockName)) + .map(SqlResult::rowCount) + .onFailure(t -> log.warn("selectWithLocking:: Unable to select row with lockName: '{}'", lockName, t)); + } +}