From 4ea707836f61cda4a0e2c2e8016998f69d57ad1e Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Thu, 31 Oct 2024 18:04:52 +0200 Subject: [PATCH 1/7] feat(publishing-events): change Kafka event publishing keys for creating/updating holdings and items --- NEWS.md | 1 + .../services/domainevent/HoldingDomainEventPublisher.java | 2 +- .../services/domainevent/ItemDomainEventPublisher.java | 6 +----- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/NEWS.md b/NEWS.md index 9651aed8c..f36adcb17 100644 --- a/NEWS.md +++ b/NEWS.md @@ -42,6 +42,7 @@ * Add Subject source and Subject type to schema ([MODINVSTOR-1205](https://folio-org.atlassian.net/browse/MODINVSTOR-1205)) * Add codes to Subject sources ([MODINVSTOR-1264](https://folio-org.atlassian.net/browse/MODINVSTOR-1264)) * Implement publication period migration, create new InstanceWithoutPubPeriod schema for request/response API ([MODINVSTOR-1271](https://folio-org.atlassian.net/browse/MODINVSTOR-1271)) +* Change Kafka event publishing keys for creating/updating holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281)) ### Bug fixes * Unintended update of instance records \_version (optimistic locking) whenever any of its holdings or items are created, updated or deleted. ([MODINVSTOR-1186](https://folio-org.atlassian.net/browse/MODINVSTOR-1186)) diff --git a/src/main/java/org/folio/services/domainevent/HoldingDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/HoldingDomainEventPublisher.java index b2eb58c65..f2f4370d1 100644 --- a/src/main/java/org/folio/services/domainevent/HoldingDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/HoldingDomainEventPublisher.java @@ -42,7 +42,7 @@ protected Future>> getRecordIds( Collection holdingsRecords) { return succeededFuture(holdingsRecords.stream() - .map(hr -> pair(hr.getInstanceId(), hr)) + .map(hr -> pair(hr.getId(), hr)) .toList()); } diff --git a/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java index bef650f9b..ab8518e91 100644 --- a/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java @@ -76,7 +76,7 @@ public void publishRemoved(String instanceId, String itemRaw) { protected Future>> getRecordIds(Collection items) { return holdingsRepository.getById(items, Item::getHoldingsRecordId) .map(holdings -> items.stream() - .map(item -> pair(getInstanceId(holdings, item), item)) + .map(item -> pair(item.getId(), item)) .toList()); } @@ -97,8 +97,4 @@ private List> mapOldItems oldItems.stream().map(item -> pair(oldHoldings.getInstanceId(), item)).toList(), newItems.stream().map(item -> pair(newHoldings.getInstanceId(), item)).toList()); } - - private String getInstanceId(Map holdings, Item item) { - return holdings.get(item.getHoldingsRecordId()).getInstanceId(); - } } From 7fef30289bf4c473bbbc61443c050ad5acb348fb Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Fri, 1 Nov 2024 09:08:52 +0200 Subject: [PATCH 2/7] feat(publishing-events): change Kafka event publishing keys for holdings and items --- .../AbstractDomainEventPublisher.java | 9 ++- .../domainevent/ItemDomainEventPublisher.java | 65 +++++++++++++++++-- .../folio/rest/api/HoldingsStorageTest.java | 3 +- .../rest/support/kafka/FakeKafkaConsumer.java | 12 +++- .../messages/HoldingsEventMessageChecks.java | 26 +++----- .../messages/ItemEventMessageChecks.java | 8 +-- 6 files changed, 88 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/folio/services/domainevent/AbstractDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/AbstractDomainEventPublisher.java index 0e38c0c16..b382cd530 100644 --- a/src/main/java/org/folio/services/domainevent/AbstractDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/AbstractDomainEventPublisher.java @@ -2,7 +2,6 @@ import static io.vertx.core.Future.succeededFuture; import static java.util.Collections.singletonList; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static org.apache.logging.log4j.LogManager.getLogger; import static org.folio.rest.support.ResponseUtil.isCreateSuccessResponse; @@ -131,20 +130,20 @@ protected List> mapOldRecordsToNew(List> ol var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue())); return triple(newRecordPair.getKey(), convertDomainToEvent(oldRecordPair.getKey(), oldRecordPair.getValue()), convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue())); - }).collect(toList()); + }).toList(); } private Future publishRecordsCreated(Collection records) { return convertDomainsToEvents(records).compose(domainEventService::publishRecordsCreated); } - private Future>> convertDomainsToEvents(Collection domains) { + protected Future>> convertDomainsToEvents(Collection domains) { return getRecordIds(domains).map(pairs -> pairs.stream() .map(pair -> pair(pair.getKey(), convertDomainToEvent(pair.getKey(), pair.getValue()))) - .collect(toList())); + .toList()); } - private Future>> convertDomainsToEvents(Collection newRecords, + protected Future>> convertDomainsToEvents(Collection newRecords, Collection oldRecords) { return getRecordIds(oldRecords).compose(oldRecordsInstanceIds -> getRecordIds(newRecords).map( diff --git a/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java index ab8518e91..150173909 100644 --- a/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java @@ -1,28 +1,37 @@ package org.folio.services.domainevent; import static io.vertx.core.Future.succeededFuture; +import static java.util.stream.Collectors.toMap; import static org.apache.logging.log4j.LogManager.getLogger; import static org.folio.InventoryKafkaTopic.ITEM; import static org.folio.InventoryKafkaTopic.REINDEX_RECORDS; +import static org.folio.rest.support.ResponseUtil.isDeleteSuccessResponse; import static org.folio.rest.tools.utils.TenantTool.tenantId; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.vertx.core.Context; import io.vertx.core.Future; +import io.vertx.core.Handler; import java.util.Collection; import java.util.List; import java.util.Map; +import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.Logger; +import org.folio.dbschema.ObjectMapperTool; import org.folio.persist.HoldingsRepository; import org.folio.persist.ItemRepository; import org.folio.rest.jaxrs.model.HoldingsRecord; import org.folio.rest.jaxrs.model.Item; import org.folio.rest.jaxrs.model.PublishReindexRecords; +import org.folio.rest.support.CollectionUtil; public class ItemDomainEventPublisher extends AbstractDomainEventPublisher { private static final Logger log = getLogger(ItemDomainEventPublisher.class); + private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper(); private final HoldingsRepository holdingsRepository; private final CommonDomainEventPublisher> itemReindexPublisher; @@ -42,7 +51,7 @@ public Future publishUpdated(Item newItem, Item oldItem, HoldingsRecord ne ItemWithInstanceId oldItemWithId = new ItemWithInstanceId(oldItem, oldHoldings.getInstanceId()); ItemWithInstanceId newItemWithId = new ItemWithInstanceId(newItem, newHoldings.getInstanceId()); - return domainEventService.publishRecordUpdated(newHoldings.getInstanceId(), oldItemWithId, newItemWithId); + return domainEventService.publishRecordUpdated(newItem.getId(), oldItemWithId, newItemWithId); } public Future publishUpdated(HoldingsRecord oldHoldings, HoldingsRecord newHoldings, List oldItems) { @@ -68,15 +77,37 @@ public Future publishReindexItems(String key, List> it @Override public void publishRemoved(String instanceId, String itemRaw) { - String instanceIdAndItemRaw = "{\"instanceId\":\"" + instanceId + "\"," + itemRaw.substring(1); - domainEventService.publishRecordRemoved(instanceId, instanceIdAndItemRaw); + var instanceIdAndItemRaw = "{\"instanceId\":\"" + instanceId + "\"," + itemRaw.substring(1); + + try { + var itemId = OBJECT_MAPPER.readTree(itemRaw).get("id").textValue(); + domainEventService.publishRecordRemoved(itemId, instanceIdAndItemRaw); + } catch (JsonProcessingException ex) { + log.error(String.format("publishRemoved:: Failed to parse json : %s", ex.getMessage()), ex); + throw new IllegalArgumentException(ex.getCause()); + } + } + + @Override + public Handler publishRemoved(Item removedRecord) { + return response -> { + if (!isDeleteSuccessResponse(response)) { + log.warn("Item record removal failed, no event will be sent"); + return; + } + getRecordIds(List.of(removedRecord)) + .map(CollectionUtil::getFirst) + .map(Pair::getKey) + .compose(instanceId -> domainEventService.publishRecordRemoved( + removedRecord.getId(), convertDomainToEvent(instanceId, removedRecord))); + }; } @Override protected Future>> getRecordIds(Collection items) { return holdingsRepository.getById(items, Item::getHoldingsRecordId) .map(holdings -> items.stream() - .map(item -> pair(item.getId(), item)) + .map(item -> pair(getInstanceId(holdings, item), item)) .toList()); } @@ -85,11 +116,33 @@ protected ItemWithInstanceId convertDomainToEvent(String instanceId, Item item) return new ItemWithInstanceId(item, instanceId); } + @Override + protected List> mapOldRecordsToNew( + List> oldRecords, List> newRecords) { + + var idToOldRecordPairMap = oldRecords.stream().collect(toMap(pair -> getId(pair.getValue()), pair -> pair)); + + return newRecords.stream().map(newRecordPair -> { + var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue())); + return triple(newRecordPair.getValue().getId(), convertDomainToEvent( + oldRecordPair.getKey(), oldRecordPair.getValue()), + convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue())); + }).toList(); + } + @Override protected String getId(Item item) { return item.getId(); } + @Override + protected Future>> convertDomainsToEvents(Collection domains) { + return getRecordIds(domains) + .map(pairs -> pairs.stream() + .map(pair -> pair(pair.getValue().getId(), convertDomainToEvent(pair.getKey(), pair.getValue()))) + .toList()); + } + private List> mapOldItemsToNew( HoldingsRecord oldHoldings, HoldingsRecord newHoldings, Collection oldItems, Collection newItems) { @@ -97,4 +150,8 @@ private List> mapOldItems oldItems.stream().map(item -> pair(oldHoldings.getInstanceId(), item)).toList(), newItems.stream().map(item -> pair(newHoldings.getInstanceId(), item)).toList()); } + + private String getInstanceId(Map holdings, Item item) { + return holdings.get(item.getHoldingsRecordId()).getInstanceId(); + } } diff --git a/src/test/java/org/folio/rest/api/HoldingsStorageTest.java b/src/test/java/org/folio/rest/api/HoldingsStorageTest.java index 1f5022d83..a868b9cfb 100644 --- a/src/test/java/org/folio/rest/api/HoldingsStorageTest.java +++ b/src/test/java/org/folio/rest/api/HoldingsStorageTest.java @@ -2534,8 +2534,7 @@ public void canUsePutToCreateHoldingsWhenHridIsSupplied() { // Make sure a create event published vs update event holdingsMessageChecks.createdMessagePublished(holdingsFromGet); - holdingsMessageChecks.noHoldingsUpdatedMessagePublished( - instanceId.toString(), holdingsId.toString()); + holdingsMessageChecks.noHoldingsUpdatedMessagePublished(holdingsId.toString()); log.info("Finished canUsePutToCreateAHoldingsWhenHRIDIsSupplied"); } diff --git a/src/test/java/org/folio/rest/support/kafka/FakeKafkaConsumer.java b/src/test/java/org/folio/rest/support/kafka/FakeKafkaConsumer.java index 74ad6277b..0d48da9b8 100644 --- a/src/test/java/org/folio/rest/support/kafka/FakeKafkaConsumer.java +++ b/src/test/java/org/folio/rest/support/kafka/FakeKafkaConsumer.java @@ -90,11 +90,19 @@ public Collection getMessagesForReindexRecords(List ids) { .toList(); } - public Collection getMessagesForHoldings(String instanceId, String holdingsId) { + public Collection getMessagesForHoldings(String holdingsId) { + return collectedHoldingsMessages.messagesByGroupKey(instanceAndIdKey(holdingsId, holdingsId)); + } + + public Collection getMessagesForDeleteAllHoldings(String instanceId, String holdingsId) { return collectedHoldingsMessages.messagesByGroupKey(instanceAndIdKey(instanceId, holdingsId)); } - public Collection getMessagesForItem(String instanceId, String itemId) { + public Collection getMessagesForItem(String itemId) { + return collectedItemMessages.messagesByGroupKey(instanceAndIdKey(itemId, itemId)); + } + + public Collection getMessagesForItemWithInstanceIdKey(String instanceId, String itemId) { return collectedItemMessages.messagesByGroupKey(instanceAndIdKey(instanceId, itemId)); } diff --git a/src/test/java/org/folio/rest/support/messages/HoldingsEventMessageChecks.java b/src/test/java/org/folio/rest/support/messages/HoldingsEventMessageChecks.java index e020aa2aa..9bc8b2e98 100644 --- a/src/test/java/org/folio/rest/support/messages/HoldingsEventMessageChecks.java +++ b/src/test/java/org/folio/rest/support/messages/HoldingsEventMessageChecks.java @@ -25,23 +25,17 @@ private static String getId(JsonObject holdings) { return holdings.getString("id"); } - private static String getInstanceId(JsonObject holdings) { - return holdings.getString("instanceId"); - } - public void createdMessagePublished(JsonObject holdings) { final var holdingsId = getId(holdings); - final var instanceId = getInstanceId(holdings); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId), eventMessageMatchers.hasCreateEventMessageFor(holdings)); } public void createdMessagePublished(JsonObject holdings, String tenantIdExpected, String okapiUrlExpected) { final var holdingsId = getId(holdings); - final var instanceId = getInstanceId(holdings); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId), eventMessageMatchers.hasCreateEventMessageFor(holdings, tenantIdExpected, okapiUrlExpected)); } @@ -49,14 +43,13 @@ public void updatedMessagePublished(JsonObject oldHoldings, JsonObject newHoldings) { final var holdingsId = getId(newHoldings); - final var instanceId = getInstanceId(newHoldings); oldHoldings.remove("holdingsItems"); oldHoldings.remove("bareHoldingsItems"); newHoldings.remove("holdingsItems"); newHoldings.remove("bareHoldingsItems"); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId), eventMessageMatchers.hasUpdateEventMessageFor(oldHoldings, newHoldings)); } @@ -65,36 +58,33 @@ public void updatedMessagePublished(JsonObject oldHoldings, String okapiUrlExpected) { final var holdingsId = getId(newHoldings); - final var instanceId = getInstanceId(newHoldings); oldHoldings.remove("holdingsItems"); oldHoldings.remove("bareHoldingsItems"); newHoldings.remove("holdingsItems"); newHoldings.remove("bareHoldingsItems"); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId), eventMessageMatchers.hasUpdateEventMessageFor(oldHoldings, newHoldings, okapiUrlExpected)); } - public void noHoldingsUpdatedMessagePublished(String instanceId, - String holdingsId) { + public void noHoldingsUpdatedMessagePublished(String holdingsId) { awaitDuring(1, SECONDS) - .until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId), + .until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId), eventMessageMatchers.hasNoUpdateEventMessage()); } public void deletedMessagePublished(JsonObject holdings) { final var holdingsId = getId(holdings); - final var instanceId = getInstanceId(holdings); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId), eventMessageMatchers.hasDeleteEventMessageFor(holdings)); } public void allHoldingsDeletedMessagePublished() { awaitAtMost() - .until(() -> kafkaConsumer.getMessagesForHoldings(NULL_ID, null), + .until(() -> kafkaConsumer.getMessagesForDeleteAllHoldings(NULL_ID, null), eventMessageMatchers.hasDeleteAllEventMessage()); } } diff --git a/src/test/java/org/folio/rest/support/messages/ItemEventMessageChecks.java b/src/test/java/org/folio/rest/support/messages/ItemEventMessageChecks.java index 1816b8887..69726dacf 100644 --- a/src/test/java/org/folio/rest/support/messages/ItemEventMessageChecks.java +++ b/src/test/java/org/folio/rest/support/messages/ItemEventMessageChecks.java @@ -47,7 +47,7 @@ public void createdMessagePublished(JsonObject item) { final var itemId = getId(item); final var instanceId = getInstanceIdForItem(item); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForItem(instanceId, itemId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForItem(itemId), EVENT_MESSAGE_MATCHERS.hasCreateEventMessageFor( addInstanceIdToItem(item, instanceId))); } @@ -64,7 +64,7 @@ public void updatedMessagePublished(JsonObject oldItem, final var itemId = getId(newItem); final var newInstanceId = getInstanceIdForItem(newItem); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForItem(newInstanceId, itemId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForItem(itemId), EVENT_MESSAGE_MATCHERS.hasUpdateEventMessageFor( addInstanceIdToItem(oldItem, oldInstanceId), addInstanceIdToItem(newItem, newInstanceId))); @@ -74,14 +74,14 @@ public void deletedMessagePublished(JsonObject item) { final var itemId = getId(item); final var instanceId = getInstanceIdForItem(item); - awaitAtMost().until(() -> kafkaConsumer.getMessagesForItem(instanceId, itemId), + awaitAtMost().until(() -> kafkaConsumer.getMessagesForItem(itemId), EVENT_MESSAGE_MATCHERS.hasDeleteEventMessageFor( addInstanceIdToItem(item, instanceId))); } public void allItemsDeletedMessagePublished() { awaitAtMost() - .until(() -> kafkaConsumer.getMessagesForItem(NULL_ID, null), + .until(() -> kafkaConsumer.getMessagesForItemWithInstanceIdKey(NULL_ID, null), EVENT_MESSAGE_MATCHERS.hasDeleteAllEventMessage()); } } From e4e1491b58d6befac5cce862e88f27af430e0e59 Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Fri, 1 Nov 2024 09:23:31 +0200 Subject: [PATCH 3/7] fix(publishing-events): temporary ignore "canDeleteHoldingsByCql" test --- src/test/java/org/folio/rest/api/HoldingsStorageTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/org/folio/rest/api/HoldingsStorageTest.java b/src/test/java/org/folio/rest/api/HoldingsStorageTest.java index a868b9cfb..1099d0309 100644 --- a/src/test/java/org/folio/rest/api/HoldingsStorageTest.java +++ b/src/test/java/org/folio/rest/api/HoldingsStorageTest.java @@ -735,6 +735,7 @@ public void canDeleteAllHoldings() { @SneakyThrows @Test + @Ignore public void canDeleteHoldingsByCql() { UUID instanceId1 = UUID.randomUUID(); UUID instanceId2 = UUID.randomUUID(); From bb0e4a679e7bdd4f89a6b9003e2bf691976e2b5d Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Fri, 1 Nov 2024 11:20:00 +0200 Subject: [PATCH 4/7] test(publishing-events): enable "canDeleteHoldingsByCql" test --- src/test/java/org/folio/rest/api/HoldingsStorageTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/org/folio/rest/api/HoldingsStorageTest.java b/src/test/java/org/folio/rest/api/HoldingsStorageTest.java index 1099d0309..a868b9cfb 100644 --- a/src/test/java/org/folio/rest/api/HoldingsStorageTest.java +++ b/src/test/java/org/folio/rest/api/HoldingsStorageTest.java @@ -735,7 +735,6 @@ public void canDeleteAllHoldings() { @SneakyThrows @Test - @Ignore public void canDeleteHoldingsByCql() { UUID instanceId1 = UUID.randomUUID(); UUID instanceId2 = UUID.randomUUID(); From 02d2b9ddc9563df1873dc21b357b4f66e7087f24 Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Fri, 1 Nov 2024 13:35:42 +0200 Subject: [PATCH 5/7] feat(publishing-events): update Holdings deletion by cql to publish event with holdingId key --- .../folio/services/holding/HoldingsService.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/folio/services/holding/HoldingsService.java b/src/main/java/org/folio/services/holding/HoldingsService.java index fe718c970..7feb03bfc 100644 --- a/src/main/java/org/folio/services/holding/HoldingsService.java +++ b/src/main/java/org/folio/services/holding/HoldingsService.java @@ -16,6 +16,8 @@ import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext; import static org.folio.validator.HridValidators.refuseWhenHridChanged; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; import io.vertx.core.Context; @@ -29,6 +31,7 @@ import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; +import org.folio.dbschema.ObjectMapperTool; import org.folio.persist.HoldingsRepository; import org.folio.persist.InstanceRepository; import org.folio.rest.jaxrs.model.HoldingsRecord; @@ -51,6 +54,8 @@ public class HoldingsService { private static final Logger log = getLogger(HoldingsService.class); + private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper(); + private final Context vertxContext; private final Map okapiHeaders; private final PostgresClient postgresClient; @@ -146,8 +151,15 @@ public Future deleteHoldings(String cql) { // https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602 return holdingsRepository.delete(cql) .onSuccess(rowSet -> vertxContext.runOnContext(runLater -> - rowSet.iterator().forEachRemaining(row -> - domainEventPublisher.publishRemoved(row.getString(0), row.getString(1)) + rowSet.iterator().forEachRemaining(row -> { + try { + var holdingId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue(); + domainEventPublisher.publishRemoved(holdingId, row.getString(1)); + } catch (JsonProcessingException ex) { + log.error(String.format("deleteHoldings:: Failed to parse json : %s", ex.getMessage()), ex); + throw new IllegalArgumentException(ex.getCause()); + } + } ) )) .map(Response.noContent().build()); From b18b8d43e46842c4c727bbb8840ad29abfa422f6 Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Fri, 1 Nov 2024 17:33:16 +0200 Subject: [PATCH 6/7] feat(publishing-events): update deleteItems method in ItemService --- NEWS.md | 2 +- .../domainevent/ItemDomainEventPublisher.java | 17 ------------- .../org/folio/services/item/ItemService.java | 25 +++++++++++++++++-- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/NEWS.md b/NEWS.md index f36adcb17..5b5d58991 100644 --- a/NEWS.md +++ b/NEWS.md @@ -42,7 +42,7 @@ * Add Subject source and Subject type to schema ([MODINVSTOR-1205](https://folio-org.atlassian.net/browse/MODINVSTOR-1205)) * Add codes to Subject sources ([MODINVSTOR-1264](https://folio-org.atlassian.net/browse/MODINVSTOR-1264)) * Implement publication period migration, create new InstanceWithoutPubPeriod schema for request/response API ([MODINVSTOR-1271](https://folio-org.atlassian.net/browse/MODINVSTOR-1271)) -* Change Kafka event publishing keys for creating/updating holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281)) +* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281)) ### Bug fixes * Unintended update of instance records \_version (optimistic locking) whenever any of its holdings or items are created, updated or deleted. ([MODINVSTOR-1186](https://folio-org.atlassian.net/browse/MODINVSTOR-1186)) diff --git a/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java index 150173909..faa1d35f2 100644 --- a/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/ItemDomainEventPublisher.java @@ -8,8 +8,6 @@ import static org.folio.rest.support.ResponseUtil.isDeleteSuccessResponse; import static org.folio.rest.tools.utils.TenantTool.tenantId; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -21,7 +19,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.Logger; -import org.folio.dbschema.ObjectMapperTool; import org.folio.persist.HoldingsRepository; import org.folio.persist.ItemRepository; import org.folio.rest.jaxrs.model.HoldingsRecord; @@ -31,7 +28,6 @@ public class ItemDomainEventPublisher extends AbstractDomainEventPublisher { private static final Logger log = getLogger(ItemDomainEventPublisher.class); - private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper(); private final HoldingsRepository holdingsRepository; private final CommonDomainEventPublisher> itemReindexPublisher; @@ -75,19 +71,6 @@ public Future publishReindexItems(String key, List> it return itemReindexPublisher.publishReindexRecords(key, PublishReindexRecords.RecordType.ITEM, items); } - @Override - public void publishRemoved(String instanceId, String itemRaw) { - var instanceIdAndItemRaw = "{\"instanceId\":\"" + instanceId + "\"," + itemRaw.substring(1); - - try { - var itemId = OBJECT_MAPPER.readTree(itemRaw).get("id").textValue(); - domainEventService.publishRecordRemoved(itemId, instanceIdAndItemRaw); - } catch (JsonProcessingException ex) { - log.error(String.format("publishRemoved:: Failed to parse json : %s", ex.getMessage()), ex); - throw new IllegalArgumentException(ex.getCause()); - } - } - @Override public Handler publishRemoved(Item removedRecord) { return response -> { diff --git a/src/main/java/org/folio/services/item/ItemService.java b/src/main/java/org/folio/services/item/ItemService.java index d6ac207ee..d177d7d4f 100644 --- a/src/main/java/org/folio/services/item/ItemService.java +++ b/src/main/java/org/folio/services/item/ItemService.java @@ -5,6 +5,7 @@ import static java.util.Objects.isNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.logging.log4j.LogManager.getLogger; import static org.folio.dbschema.ObjectMapperTool.readValue; import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE; import static org.folio.rest.impl.ItemStorageApi.ITEM_TABLE; @@ -24,6 +25,8 @@ import static org.folio.validator.HridValidators.refuseWhenHridChanged; import static org.folio.validator.NotesValidators.refuseLongNotes; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Future; @@ -42,6 +45,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.ws.rs.core.Response; +import org.apache.logging.log4j.Logger; +import org.folio.dbschema.ObjectMapperTool; import org.folio.okapi.common.XOkapiHeaders; import org.folio.persist.HoldingsRepository; import org.folio.persist.ItemRepository; @@ -63,10 +68,16 @@ import org.folio.validator.NotesValidators; public class ItemService { + + private static final Logger log = getLogger(ItemService.class); + private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper(); private static final Pattern KEY_ALREADY_EXISTS_PATTERN = Pattern.compile( ": Key \\(([^=]+)\\)=\\((.*)\\) already exists.$"); private static final Pattern KEY_NOT_PRESENT_PATTERN = Pattern.compile( ": Key \\(([^=]+)\\)=\\((.*)\\) is not present in table \"(.*)\".$"); + private static final String INSTANCE_ID_WITH_ITEM_JSON = """ + {"instanceId": "%s",%s + """; private final HridManager hridManager; private final ItemEffectiveValuesService effectiveValuesService; @@ -207,8 +218,18 @@ public Future deleteItems(String cql) { // https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602 return itemRepository.delete(cql) .onSuccess(rowSet -> vertxContext.runOnContext(runLater -> - rowSet.iterator().forEachRemaining(row -> - domainEventService.publishRemoved(row.getString(0), row.getString(1)) + rowSet.iterator().forEachRemaining(row -> { + try { + var instanceIdAndItemRaw = INSTANCE_ID_WITH_ITEM_JSON.formatted( + row.getString(0), row.getString(1).substring(1)); + var itemId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue(); + + domainEventService.publishRemoved(itemId, instanceIdAndItemRaw); + } catch (JsonProcessingException ex) { + log.error(String.format("deleteItems:: Failed to parse json : %s", ex.getMessage()), ex); + throw new IllegalArgumentException(ex.getCause()); + } + } ) )) .map(Response.noContent().build()); From 8b23d597207d75f873f71a6c560bb7569ee18c6a Mon Sep 17 00:00:00 2001 From: SvitlanaKovalova1 Date: Tue, 5 Nov 2024 14:34:56 +0200 Subject: [PATCH 7/7] docs(publishing-events): update NEWS.md --- NEWS.md | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/NEWS.md b/NEWS.md index 5b5d58991..226e7a65b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,28 @@ -## v28.0.0 In progress +## v28.1.0 YYYY-mm-DD +### Breaking changes +* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE)) + +### New APIs versions +* Provides `API_NAME vX.Y` +* Requires `API_NAME vX.Y` + +### Features +* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281)) + +### Bug fixes +* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE)) + +### Tech Dept +* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE)) + +### Dependencies +* Bump `LIB_NAME` from `OLD_VERSION` to `NEW_VERSION` +* Add `LIB_NAME VERSION` +* Remove `LIB_NAME` + +--- + +## v28.0.0 2024-11-01 ### Breaking changes * Migrate "publicationPeriod" data to the Dates object and remove it from the Instance schema ([MODINVSTOR-1232](https://folio-org.atlassian.net/browse/MODINVSTOR-1232)) * Delete deprecated `shelf-locations` API ([MODINVSTOR-1183](https://folio-org.atlassian.net/browse/MODINVSTOR-1183)) @@ -42,7 +66,6 @@ * Add Subject source and Subject type to schema ([MODINVSTOR-1205](https://folio-org.atlassian.net/browse/MODINVSTOR-1205)) * Add codes to Subject sources ([MODINVSTOR-1264](https://folio-org.atlassian.net/browse/MODINVSTOR-1264)) * Implement publication period migration, create new InstanceWithoutPubPeriod schema for request/response API ([MODINVSTOR-1271](https://folio-org.atlassian.net/browse/MODINVSTOR-1271)) -* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281)) ### Bug fixes * Unintended update of instance records \_version (optimistic locking) whenever any of its holdings or items are created, updated or deleted. ([MODINVSTOR-1186](https://folio-org.atlassian.net/browse/MODINVSTOR-1186))