Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(publishing-events): change Kafka event publishing keys for holdings and items. #1108

Merged
merged 8 commits into from
Nov 5, 2024
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* Add Subject source and Subject type to schema ([MODINVSTOR-1205](https://folio-org.atlassian.net/browse/MODINVSTOR-1205))
* Add codes to Subject sources ([MODINVSTOR-1264](https://folio-org.atlassian.net/browse/MODINVSTOR-1264))
* Implement publication period migration, create new InstanceWithoutPubPeriod schema for request/response API ([MODINVSTOR-1271](https://folio-org.atlassian.net/browse/MODINVSTOR-1271))
* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281))

### Bug fixes
* Unintended update of instance records \_version (optimistic locking) whenever any of its holdings or items are created, updated or deleted. ([MODINVSTOR-1186](https://folio-org.atlassian.net/browse/MODINVSTOR-1186))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.vertx.core.Future.succeededFuture;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.rest.support.ResponseUtil.isCreateSuccessResponse;
Expand Down Expand Up @@ -131,20 +130,20 @@ protected List<Triple<String, E, E>> mapOldRecordsToNew(List<Pair<String, D>> ol
var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue()));
return triple(newRecordPair.getKey(), convertDomainToEvent(oldRecordPair.getKey(), oldRecordPair.getValue()),
convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue()));
}).collect(toList());
}).toList();
}

private Future<Void> publishRecordsCreated(Collection<D> records) {
return convertDomainsToEvents(records).compose(domainEventService::publishRecordsCreated);
}

private Future<List<Pair<String, E>>> convertDomainsToEvents(Collection<D> domains) {
protected Future<List<Pair<String, E>>> convertDomainsToEvents(Collection<D> domains) {
return getRecordIds(domains).map(pairs -> pairs.stream()
.map(pair -> pair(pair.getKey(), convertDomainToEvent(pair.getKey(), pair.getValue())))
.collect(toList()));
.toList());
}

private Future<List<Triple<String, E, E>>> convertDomainsToEvents(Collection<D> newRecords,
protected Future<List<Triple<String, E, E>>> convertDomainsToEvents(Collection<D> newRecords,
Collection<D> oldRecords) {

return getRecordIds(oldRecords).compose(oldRecordsInstanceIds -> getRecordIds(newRecords).map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected Future<List<Pair<String, HoldingsRecord>>> getRecordIds(
Collection<HoldingsRecord> holdingsRecords) {

return succeededFuture(holdingsRecords.stream()
.map(hr -> pair(hr.getInstanceId(), hr))
.map(hr -> pair(hr.getId(), hr))
.toList());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.folio.services.domainevent;

import static io.vertx.core.Future.succeededFuture;
import static java.util.stream.Collectors.toMap;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.InventoryKafkaTopic.ITEM;
import static org.folio.InventoryKafkaTopic.REINDEX_RECORDS;
import static org.folio.rest.support.ResponseUtil.isDeleteSuccessResponse;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
Expand All @@ -20,6 +24,7 @@
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.jaxrs.model.PublishReindexRecords;
import org.folio.rest.support.CollectionUtil;

public class ItemDomainEventPublisher extends AbstractDomainEventPublisher<Item, ItemWithInstanceId> {
private static final Logger log = getLogger(ItemDomainEventPublisher.class);
Expand All @@ -42,7 +47,7 @@ public Future<Void> publishUpdated(Item newItem, Item oldItem, HoldingsRecord ne
ItemWithInstanceId oldItemWithId = new ItemWithInstanceId(oldItem, oldHoldings.getInstanceId());
ItemWithInstanceId newItemWithId = new ItemWithInstanceId(newItem, newHoldings.getInstanceId());

return domainEventService.publishRecordUpdated(newHoldings.getInstanceId(), oldItemWithId, newItemWithId);
return domainEventService.publishRecordUpdated(newItem.getId(), oldItemWithId, newItemWithId);
}

public Future<Void> publishUpdated(HoldingsRecord oldHoldings, HoldingsRecord newHoldings, List<Item> oldItems) {
Expand All @@ -67,9 +72,18 @@ public Future<Void> publishReindexItems(String key, List<Map<String, Object>> it
}

@Override
public void publishRemoved(String instanceId, String itemRaw) {
String instanceIdAndItemRaw = "{\"instanceId\":\"" + instanceId + "\"," + itemRaw.substring(1);
domainEventService.publishRecordRemoved(instanceId, instanceIdAndItemRaw);
public Handler<Response> publishRemoved(Item removedRecord) {
return response -> {
if (!isDeleteSuccessResponse(response)) {
log.warn("Item record removal failed, no event will be sent");
return;
}
getRecordIds(List.of(removedRecord))
.map(CollectionUtil::getFirst)
.map(Pair::getKey)
.compose(instanceId -> domainEventService.publishRecordRemoved(
removedRecord.getId(), convertDomainToEvent(instanceId, removedRecord)));
};
}

@Override
Expand All @@ -85,11 +99,33 @@ protected ItemWithInstanceId convertDomainToEvent(String instanceId, Item item)
return new ItemWithInstanceId(item, instanceId);
}

@Override
protected List<Triple<String, ItemWithInstanceId, ItemWithInstanceId>> mapOldRecordsToNew(
List<Pair<String, Item>> oldRecords, List<Pair<String, Item>> newRecords) {

var idToOldRecordPairMap = oldRecords.stream().collect(toMap(pair -> getId(pair.getValue()), pair -> pair));

return newRecords.stream().map(newRecordPair -> {
var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue()));
return triple(newRecordPair.getValue().getId(), convertDomainToEvent(
oldRecordPair.getKey(), oldRecordPair.getValue()),
convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue()));
}).toList();
}

@Override
protected String getId(Item item) {
return item.getId();
}

@Override
protected Future<List<Pair<String, ItemWithInstanceId>>> convertDomainsToEvents(Collection<Item> domains) {
return getRecordIds(domains)
.map(pairs -> pairs.stream()
.map(pair -> pair(pair.getValue().getId(), convertDomainToEvent(pair.getKey(), pair.getValue())))
.toList());
}

private List<Triple<String, ItemWithInstanceId, ItemWithInstanceId>> mapOldItemsToNew(
HoldingsRecord oldHoldings, HoldingsRecord newHoldings, Collection<Item> oldItems, Collection<Item> newItems) {

Expand Down
16 changes: 14 additions & 2 deletions src/main/java/org/folio/services/holding/HoldingsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext;
import static org.folio.validator.HridValidators.refuseWhenHridChanged;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
Expand All @@ -29,6 +31,7 @@
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.persist.HoldingsRepository;
import org.folio.persist.InstanceRepository;
import org.folio.rest.jaxrs.model.HoldingsRecord;
Expand All @@ -51,6 +54,8 @@

public class HoldingsService {
private static final Logger log = getLogger(HoldingsService.class);
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();

private final Context vertxContext;
private final Map<String, String> okapiHeaders;
private final PostgresClient postgresClient;
Expand Down Expand Up @@ -146,8 +151,15 @@ public Future<Response> deleteHoldings(String cql) {
// https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602
return holdingsRepository.delete(cql)
.onSuccess(rowSet -> vertxContext.runOnContext(runLater ->
rowSet.iterator().forEachRemaining(row ->
domainEventPublisher.publishRemoved(row.getString(0), row.getString(1))
rowSet.iterator().forEachRemaining(row -> {
try {
var holdingId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue();
domainEventPublisher.publishRemoved(holdingId, row.getString(1));
} catch (JsonProcessingException ex) {
log.error(String.format("deleteHoldings:: Failed to parse json : %s", ex.getMessage()), ex);
throw new IllegalArgumentException(ex.getCause());
}
}
)
))
.map(Response.noContent().build());
Expand Down
25 changes: 23 additions & 2 deletions src/main/java/org/folio/services/item/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.dbschema.ObjectMapperTool.readValue;
import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE;
import static org.folio.rest.impl.ItemStorageApi.ITEM_TABLE;
Expand All @@ -24,6 +25,8 @@
import static org.folio.validator.HridValidators.refuseWhenHridChanged;
import static org.folio.validator.NotesValidators.refuseLongNotes;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
Expand All @@ -42,6 +45,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.ws.rs.core.Response;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.okapi.common.XOkapiHeaders;
import org.folio.persist.HoldingsRepository;
import org.folio.persist.ItemRepository;
Expand All @@ -63,10 +68,16 @@
import org.folio.validator.NotesValidators;

public class ItemService {

private static final Logger log = getLogger(ItemService.class);
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();
private static final Pattern KEY_ALREADY_EXISTS_PATTERN = Pattern.compile(
": Key \\(([^=]+)\\)=\\((.*)\\) already exists.$");
private static final Pattern KEY_NOT_PRESENT_PATTERN = Pattern.compile(
": Key \\(([^=]+)\\)=\\((.*)\\) is not present in table \"(.*)\".$");
private static final String INSTANCE_ID_WITH_ITEM_JSON = """
{"instanceId": "%s",%s
""";

private final HridManager hridManager;
private final ItemEffectiveValuesService effectiveValuesService;
Expand Down Expand Up @@ -207,8 +218,18 @@ public Future<Response> deleteItems(String cql) {
// https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602
return itemRepository.delete(cql)
.onSuccess(rowSet -> vertxContext.runOnContext(runLater ->
rowSet.iterator().forEachRemaining(row ->
domainEventService.publishRemoved(row.getString(0), row.getString(1))
rowSet.iterator().forEachRemaining(row -> {
try {
var instanceIdAndItemRaw = INSTANCE_ID_WITH_ITEM_JSON.formatted(
row.getString(0), row.getString(1).substring(1));
var itemId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue();

domainEventService.publishRemoved(itemId, instanceIdAndItemRaw);
} catch (JsonProcessingException ex) {
log.error(String.format("deleteItems:: Failed to parse json : %s", ex.getMessage()), ex);
throw new IllegalArgumentException(ex.getCause());
}
}
)
))
.map(Response.noContent().build());
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/org/folio/rest/api/HoldingsStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2534,8 +2534,7 @@ public void canUsePutToCreateHoldingsWhenHridIsSupplied() {

// Make sure a create event published vs update event
holdingsMessageChecks.createdMessagePublished(holdingsFromGet);
holdingsMessageChecks.noHoldingsUpdatedMessagePublished(
instanceId.toString(), holdingsId.toString());
holdingsMessageChecks.noHoldingsUpdatedMessagePublished(holdingsId.toString());

log.info("Finished canUsePutToCreateAHoldingsWhenHRIDIsSupplied");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,19 @@ public Collection<EventMessage> getMessagesForReindexRecords(List<String> ids) {
.toList();
}

public Collection<EventMessage> getMessagesForHoldings(String instanceId, String holdingsId) {
public Collection<EventMessage> getMessagesForHoldings(String holdingsId) {
return collectedHoldingsMessages.messagesByGroupKey(instanceAndIdKey(holdingsId, holdingsId));
}

public Collection<EventMessage> getMessagesForDeleteAllHoldings(String instanceId, String holdingsId) {
return collectedHoldingsMessages.messagesByGroupKey(instanceAndIdKey(instanceId, holdingsId));
}

public Collection<EventMessage> getMessagesForItem(String instanceId, String itemId) {
public Collection<EventMessage> getMessagesForItem(String itemId) {
return collectedItemMessages.messagesByGroupKey(instanceAndIdKey(itemId, itemId));
}

public Collection<EventMessage> getMessagesForItemWithInstanceIdKey(String instanceId, String itemId) {
return collectedItemMessages.messagesByGroupKey(instanceAndIdKey(instanceId, itemId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,31 @@ private static String getId(JsonObject holdings) {
return holdings.getString("id");
}

private static String getInstanceId(JsonObject holdings) {
return holdings.getString("instanceId");
}

public void createdMessagePublished(JsonObject holdings) {
final var holdingsId = getId(holdings);
final var instanceId = getInstanceId(holdings);

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasCreateEventMessageFor(holdings));
}

public void createdMessagePublished(JsonObject holdings, String tenantIdExpected, String okapiUrlExpected) {
final var holdingsId = getId(holdings);
final var instanceId = getInstanceId(holdings);

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasCreateEventMessageFor(holdings, tenantIdExpected, okapiUrlExpected));
}

public void updatedMessagePublished(JsonObject oldHoldings,
JsonObject newHoldings) {

final var holdingsId = getId(newHoldings);
final var instanceId = getInstanceId(newHoldings);

oldHoldings.remove("holdingsItems");
oldHoldings.remove("bareHoldingsItems");
newHoldings.remove("holdingsItems");
newHoldings.remove("bareHoldingsItems");

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasUpdateEventMessageFor(oldHoldings, newHoldings));
}

Expand All @@ -65,36 +58,33 @@ public void updatedMessagePublished(JsonObject oldHoldings,
String okapiUrlExpected) {

final var holdingsId = getId(newHoldings);
final var instanceId = getInstanceId(newHoldings);

oldHoldings.remove("holdingsItems");
oldHoldings.remove("bareHoldingsItems");
newHoldings.remove("holdingsItems");
newHoldings.remove("bareHoldingsItems");

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasUpdateEventMessageFor(oldHoldings, newHoldings, okapiUrlExpected));
}

public void noHoldingsUpdatedMessagePublished(String instanceId,
String holdingsId) {
public void noHoldingsUpdatedMessagePublished(String holdingsId) {

awaitDuring(1, SECONDS)
.until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
.until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasNoUpdateEventMessage());
}

public void deletedMessagePublished(JsonObject holdings) {
final var holdingsId = getId(holdings);
final var instanceId = getInstanceId(holdings);

awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(instanceId, holdingsId),
awaitAtMost().until(() -> kafkaConsumer.getMessagesForHoldings(holdingsId),
eventMessageMatchers.hasDeleteEventMessageFor(holdings));
}

public void allHoldingsDeletedMessagePublished() {
awaitAtMost()
.until(() -> kafkaConsumer.getMessagesForHoldings(NULL_ID, null),
.until(() -> kafkaConsumer.getMessagesForDeleteAllHoldings(NULL_ID, null),
eventMessageMatchers.hasDeleteAllEventMessage());
}
}
Loading