-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[MODINVOSTO-187] Add outbox event log and internal lock repos
- Loading branch information
1 parent
61ba6df
commit aa606e7
Showing
6 changed files
with
152 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package org.folio.dao; | ||
|
||
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard; | ||
|
||
public class DbUtils { | ||
|
||
private static final String TABLE_NAME_TEMPLATE = "%s.%s"; | ||
|
||
public static String getTenantTableName(String tenantId, String tableName) { | ||
return TABLE_NAME_TEMPLATE.formatted(convertToPsqlStandard(tenantId), tableName); | ||
} | ||
|
||
} |
18 changes: 18 additions & 0 deletions
18
src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package org.folio.dao.audit; | ||
|
||
import java.util.List; | ||
|
||
import org.folio.rest.jaxrs.model.OutboxEventLog; | ||
import org.folio.rest.persist.Conn; | ||
|
||
import io.vertx.core.Future; | ||
|
||
public interface AuditOutboxEventLogDAO { | ||
|
||
Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId); | ||
|
||
Future<String> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId); | ||
|
||
Future<Integer> deleteEventLogs(Conn conn, List<String> eventIds, String tenantId); | ||
|
||
} |
72 changes: 72 additions & 0 deletions
72
src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package org.folio.dao.audit; | ||
|
||
import static org.folio.dao.DbUtils.getTenantTableName; | ||
|
||
import java.util.List; | ||
import java.util.UUID; | ||
|
||
import org.folio.rest.jaxrs.model.OutboxEventLog; | ||
import org.folio.rest.persist.Conn; | ||
import org.folio.rest.persist.Criteria.Criterion; | ||
import org.folio.rest.persist.Criteria.Limit; | ||
import org.folio.rest.persist.interfaces.Results; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.sqlclient.SqlResult; | ||
import io.vertx.sqlclient.Tuple; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
@Log4j2 | ||
public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO { | ||
|
||
public static final String OUTBOX_TABLE_NAME = "outbox_event_log"; | ||
private static final String BATCH_DELETE = "DELETE from %s where event_id = ANY ($1)"; | ||
|
||
/** | ||
* Get all event logs from outbox table. | ||
* | ||
* @param conn the sql connection from transaction | ||
* @param tenantId the tenant id | ||
* @return future of a list of fetched event logs | ||
*/ | ||
public Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId) { | ||
log.trace("getEventLogs:: Fetching event logs from outbox table for tenantId: '{}'", tenantId); | ||
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); | ||
return conn.get(tableName, OutboxEventLog.class, new Criterion().setLimit(new Limit(1000))) | ||
.map(Results::getResults) | ||
.onFailure(t -> log.warn("getEventLogs:: Failed to fetch event logs for tenantId: '{}'", tenantId, t)); | ||
} | ||
|
||
/** | ||
* Saves event log to outbox table. | ||
* | ||
* @param conn the sql connection from transaction | ||
* @param eventLog the event log to save | ||
* @param tenantId the tenant id | ||
* @return future of id of the inserted entity | ||
*/ | ||
public Future<String> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) { | ||
log.debug("saveEventLog:: Saving event log to outbox table with eventId: '{}'", eventLog.getEventId()); | ||
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); | ||
return conn.save(tableName, eventLog) | ||
.onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t)); | ||
} | ||
|
||
/** | ||
* Deletes outbox logs by event ids in batch. | ||
* | ||
* @param conn the sql connection from transaction | ||
* @param eventIds the event ids to delete | ||
* @param tenantId the tenant id | ||
* @return future of row count for deleted records | ||
*/ | ||
public Future<Integer> deleteEventLogs(Conn conn, List<String> eventIds, String tenantId) { | ||
log.debug("deleteEventLogs:: Deleting outbox logs by event ids in batch: '{}'", eventIds); | ||
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); | ||
var param = eventIds.stream().map(UUID::fromString).toArray(UUID[]::new); | ||
return conn.execute(BATCH_DELETE.formatted(tableName), Tuple.of(param)) | ||
.map(SqlResult::rowCount) | ||
.onFailure(t -> log.warn("deleteEventLogs: Failed to delete event logs by ids: '{}'", eventIds, t)); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package org.folio.dao.lock; | ||
|
||
import org.folio.rest.persist.Conn; | ||
|
||
import io.vertx.core.Future; | ||
|
||
public interface InternalLockDAO { | ||
|
||
Future<Integer> selectWithLocking(Conn conn, String lockName, String tenantId); | ||
|
||
} |
34 changes: 34 additions & 0 deletions
34
src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package org.folio.dao.lock; | ||
|
||
import static org.folio.dao.DbUtils.getTenantTableName; | ||
|
||
import org.folio.rest.persist.Conn; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.sqlclient.SqlResult; | ||
import io.vertx.sqlclient.Tuple; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
@Log4j2 | ||
public class InternalLockPostgresDAO implements InternalLockDAO { | ||
|
||
private static final String LOCK_TABLE_NAME = "internal_lock"; | ||
private static final String SELECT_WITH_LOCKING = "SELECT * FROM %s WHERE lock_name = $1 FOR UPDATE"; | ||
|
||
/** | ||
* Performs SELECT FOR UPDATE statement in order to implement locking. | ||
* Lock released after the transaction is committed. | ||
* | ||
* @param conn connection with active transaction | ||
* @param lockName the lock name | ||
* @param tenantId the tenant id | ||
* @return future with 1 row if lock was acquired | ||
*/ | ||
public Future<Integer> selectWithLocking(Conn conn, String lockName, String tenantId) { | ||
log.debug("selectWithLocking:: Locking row with lockName: '{}'", lockName); | ||
var tableName = getTenantTableName(tenantId, LOCK_TABLE_NAME); | ||
return conn.execute(SELECT_WITH_LOCKING.formatted(tableName), Tuple.of(lockName)) | ||
.map(SqlResult::rowCount) | ||
.onFailure(t -> log.warn("selectWithLocking:: Unable to select row with lockName: '{}'", lockName, t)); | ||
} | ||
} |