Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging checks added. #39

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ public DefaultSuccessMAP handleValidated(InfrastructureComponentMAP messageAndPa
try {
//Message is either ConnectorUpdateMessage or ConnectorUnavailableMessage
if (msg instanceof ConnectorUpdateMessage) {
logger.info("Connector Notification Message: "+msg.toRdf());
//Updating existing Connector or registering new Connector
if(messageAndPayload.getPayload().isPresent()) {
if(messageAndPayload.getMessage().getProperties() != null) {
logger.info("Checking the type of request sent");
//POST is not idempotent. Making sure that, in case POST is used, the connector does not exist yet
if (messageAndPayload.getMessage().getProperties().containsKey("https://w3id.org/idsa/core/method")) {
String method = messageAndPayload.getMessage().getProperties().get("https://w3id.org/idsa/core/method").toString().replace("\"", "").replace("^^http://www.w3.org/2001/XMLSchema#string", "").toLowerCase();
Expand All @@ -83,13 +85,16 @@ public DefaultSuccessMAP handleValidated(InfrastructureComponentMAP messageAndPa
|| repositoryFacade.graphIsActive(((ConnectorUpdateMessage) messageAndPayload.getMessage()).getAffectedConnector().toString()))
{
//TOO_MANY_RESULTS is not exactly an ideal term... No better choice available though
logger.error("The connector you are posting already exists. To overwrite it please use a PUT request.");
throw new RejectMessageException(RejectionReason.TOO_MANY_RESULTS, new Exception("The connector you are posting already exists. Use PUT instead to overwrite it."));
}
}
}
}
//Rewrite URL to match our REST scheme

rewrittenUri = infrastructureComponentStatusHandler.updated(messageAndPayload.getPayload().get());
logger.info("Updated URI is" + rewrittenUri);
}
else
{
Expand All @@ -98,13 +103,15 @@ public DefaultSuccessMAP handleValidated(InfrastructureComponentMAP messageAndPa
}
else if (msg instanceof ConnectorUnavailableMessage) {
//To unregister, no payload is required
logger.info("Issuer Connector " + messageAndPayload.getMessage().getIssuerConnector());
infrastructureComponentStatusHandler.unavailable(messageAndPayload.getMessage().getIssuerConnector());
}
}
catch (Exception e) {
if(e instanceof RejectMessageException)
{
//If it already is a RejectMessageException, throw it as-is
logger.error("Rejected Message", e);
throw (RejectMessageException) e;
}
//For some reason, ConnectExceptions sometimes do not provide an exception message.
Expand All @@ -113,24 +120,30 @@ else if (msg instanceof ConnectorUnavailableMessage) {
if(e.getMessage() == null)
{
try {

e = new Exception(e.getCause().getMessage());
logger.error("Failed to process message.", e);
}
catch (NullPointerException ignored)
{
e = new Exception(e.getClass().getName() + " with empty message.");
logger.error(e.getClass().getName() + " with empty message.");
}
}
//Some unknown error has occurred, returning an internal error
logger.error("Internal Recipient Error "+ RejectionReason.INTERNAL_RECIPIENT_ERROR);
throw new RejectMessageException(RejectionReason.INTERNAL_RECIPIENT_ERROR, e);
}

try {
//Let the connector know that the update was successful

DefaultSuccessMAP returnValue = new DefaultSuccessMAP(infrastructureComponent.getId(),
infrastructureComponent.getOutboundModelVersion(),
messageAndPayload.getMessage().getId(),
securityTokenProvider.getSecurityTokenAsDAT(),
responseSenderUri);

if(rewrittenUri != null)
{
//Attach the rewritten URI to the response, so that the recipient knows under which address the resource can be found
Expand All @@ -140,7 +153,8 @@ else if (msg instanceof ConnectorUnavailableMessage) {
}
//Thrown in case the broker is unable to obtain its own security token from the DAPS
catch (TokenRetrievalException e)
{
{
logger.error("Message rejected due to internal error");
throw new RejectMessageException(RejectionReason.INTERNAL_RECIPIENT_ERROR, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package de.fraunhofer.iais.eis.ids.broker.core.common.persistence;

import de.fraunhofer.iais.eis.*;
import de.fraunhofer.iais.eis.ids.component.core.ContractRejectMessageException;

import de.fraunhofer.iais.eis.ids.component.core.RejectMessageException;
import de.fraunhofer.iais.eis.ids.component.core.SecurityTokenProvider;
import de.fraunhofer.iais.eis.ids.component.core.TokenRetrievalException;
Expand All @@ -15,6 +17,11 @@
import java.util.Arrays;
import java.util.Collection;

import javax.xml.datatype.XMLGregorianCalendar;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is a message handler for messages about the status of resources,
* such as ResourceAvailableMessages, ResourceUpdateMessages, and ResourceUnavailableMessages
Expand All @@ -26,7 +33,7 @@ public class ResourceMessageHandler extends ValidatingMessageHandler<ResourceMAP
private final SecurityTokenProvider securityTokenProvider;
private final URI responseSenderAgent;
private final RepositoryFacade repositoryFacade;

private final Logger logger = LoggerFactory.getLogger(ResourceMessageHandler.class);
/**
* Constructor
* @param resourceStatusHandler The component which then takes care of persisting the changes
Expand All @@ -52,14 +59,25 @@ public ResourceMessageHandler(ResourceStatusHandler resourceStatusHandler, Infra
*/
@Override
public DefaultSuccessMAP handleValidated(ResourceMAP messageAndPayload) throws RejectMessageException {
XMLGregorianCalendar issueAt = messageAndPayload.getMessage().getIssued();

ResourceNotificationMessage msg = (ResourceNotificationMessage) messageAndPayload.getMessage();
MessageLogger.logMessage(messageAndPayload, true, "affectedResource");
URI rewrittenUri = null;
try {
if (msg instanceof ResourceUpdateMessage) {
//ResourceUpdateMessages have the affected Resource in their payload
if (msg.getAffectedResource() != null && messageAndPayload.getPayload().isPresent()) {


if (msg.getAffectedResource() != null && messageAndPayload.getPayload().isPresent()) {

// logger.info(issueAt.getYear()+":"+issueAt.getMonth()+ ":"+ issueAt.getDay()+":"+issueAt.getHour()+":"+ issueAt.getMinute());
// logger.info("Component ID :"+ infrastructureComponent.getId());
// logger.info("Maintainer :"+ infrastructureComponent.getMaintainer());
// logger.info("Incoming message with ID :"+ messageAndPayload.getMessage().getId());
// logger.info("Issuer Connector :"+ messageAndPayload.getMessage().getIssuerConnector());
// logger.info("Sender Connector :"+ messageAndPayload.getMessage().getSenderAgent());

//TODO: Check if method is POST and, if so, if Resource already exists
if(messageAndPayload.getMessage().getProperties() != null) {
//POST is not idempotent. Making sure that, in case POST is used, the connector does not exist yet
Expand All @@ -70,11 +88,14 @@ public DefaultSuccessMAP handleValidated(ResourceMAP messageAndPayload) throws R
//Check if resource exists yet
if (resourceStatusHandler.resourceExists(((ResourceUpdateMessage) messageAndPayload.getMessage()).getAffectedResource())
|| resourceStatusHandler.resourceExists(ResourcePersistenceAndIndexing.tryGetRewrittenResourceUri(messageAndPayload.getMessage().getIssuerConnector(), ((ResourceUpdateMessage) messageAndPayload.getMessage()).getAffectedResource()))) {
throw new RejectMessageException(RejectionReason.TOO_MANY_RESULTS, new Exception("The resource you are trying to post already exists. To update it, use PUT instead."));
logger.error("The resource you are trying to post already exists. To update it, use PUT instead.");
throw new RejectMessageException(RejectionReason.TOO_MANY_RESULTS, new Exception("The resource you are trying to post already exists. To update it, use PUT instead."));
}
}
catch (RejectMessageException ignored)
catch (RejectMessageException e)
{
logger.info("New resource registered");

//RejectMessageException is thrown by ResourcePersistenceAndIndexing.tryGetRewrittenResourceUri, in case the resource does not exist
//This may very well happen here, particularly if the resources is posted correctly (i.e. didn't exist before)
}
Expand All @@ -84,6 +105,7 @@ public DefaultSuccessMAP handleValidated(ResourceMAP messageAndPayload) throws R
rewrittenUri = resourceStatusHandler.updated(messageAndPayload.getPayload().get(), msg.getIssuerConnector());
} else {
//If no payload present, Resource cannot be updated

throw new RejectMessageException(RejectionReason.BAD_PARAMETERS, new NullPointerException("Affected Resource is null or payload is missing"));
}
} else if (msg instanceof ResourceUnavailableMessage) {
Expand Down Expand Up @@ -112,6 +134,10 @@ public DefaultSuccessMAP handleValidated(ResourceMAP messageAndPayload) throws R
try {
//No Exception occurred. Send MessageProcessedNotificationMessage
DefaultSuccessMAP returnValue = new DefaultSuccessMAP(infrastructureComponent.getId(), infrastructureComponent.getOutboundModelVersion(), messageAndPayload.getMessage().getId(), securityTokenProvider.getSecurityTokenAsDAT(), responseSenderAgent);
//Log incoming message via a central logger function.
// logger.info("Component ID :"+ infrastructureComponent.getId().toString());
// logger.info("Maintainer :"+ infrastructureComponent.getMaintainer());
// logger.info("Incoming message with ID :"+ messageAndPayload.getMessage().getId());
if(rewrittenUri != null)
{
//Attach the rewritten URI to the response, so that the recipient knows under which address the resource can be found
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void refreshIndex() {
List<String> activeGraphs = repositoryFacade.getActiveGraphs();
if(activeGraphs.isEmpty()) //Nothing to index. Return here to make sure that in case no active graphs exist, inactive ones are also ignored
{
logger.warn("No graphs to index");
return;
}

Expand Down Expand Up @@ -254,6 +255,7 @@ private InfrastructureComponent replaceIds(InfrastructureComponent infrastructur
replacedIds = new HashMap<>();
//TODO: Ideally, use relative URIs: "./ + hashCode" instead, but Serializer (Jena) fails on that. We don't really want to store the full URI here, as that makes the broker un-portable
if (infrastructureComponent.getId() == null) {
logger.error("Connector did not provide a URI");
throw new RejectMessageException(RejectionReason.MALFORMED_MESSAGE, new NullPointerException("Connector did not provide a URI"));
}

Expand Down Expand Up @@ -319,8 +321,11 @@ public URI updated(InfrastructureComponent infrastructureComponent) throws IOExc
//Replace URIs in this infrastructureComponent with URIs matching our scheme. This is required for a RESTful API
//TODO: Do the same for resources (or at ParIS, for participants)
try {
logger.info("Replacing URIs of Id "+ infrastructureComponent.getId().toString() + " and maintainer"+ infrastructureComponent.getMaintainer().toString() + " of component" + infrastructureComponent);
infrastructureComponent = replaceIds(infrastructureComponent);

} catch (URISyntaxException e) {
logger.warn("Replacing URIs of "+infrastructureComponent+" failed.");
throw new IOException(e);
}
if (!existed) {
Expand All @@ -334,19 +339,24 @@ public URI updated(InfrastructureComponent infrastructureComponent) throws IOExc
//If the connector was passive before, the document was deleted from the index, so we need to recreate it
if (wasActive) { //Connector exists in index - update it
try {
logger.info("Updating "+infrastructureComponent+" in index");
indexing.update(infrastructureComponent);
} catch (Exception e) {
if (e.getMessage().contains("document_missing_exception")) {
indexing.add(infrastructureComponent);
logger.info("Adding "+infrastructureComponent+" in index");
indexing.add(infrastructureComponent);
} else {
logger.error("ElasticsearchStatusException caught with message " + e.getMessage());
throw new RejectMessageException(RejectionReason.INTERNAL_RECIPIENT_ERROR, e);
}
}
} else { //Connector does not exist in index - create it
indexing.add(infrastructureComponent);
logger.info("Creating new "+ infrastructureComponent+" as it was not available earlier.");
indexing.add(infrastructureComponent);
}
//return the (rewritten) URI of the infrastructure component
logger.info("The rewritten URI of the " + infrastructureComponent.getId().toString());

return infrastructureComponent.getId();
}

Expand Down Expand Up @@ -386,11 +396,14 @@ public void unavailable(URI issuerConnector) throws IOException, RejectMessageEx
URI rewrittenConnectorUri = rewriteConnectorUri(issuerConnector);
if (repositoryFacade.graphIsActive(rewrittenConnectorUri.toString())) {
repositoryFacade.changePassivationOfGraph(rewrittenConnectorUri.toString(), false);
logger.info("Connector deleted from the index successfully.");
} else {
logger.error("The connector you are trying to remove was not found");
throw new RejectMessageException(RejectionReason.NOT_FOUND, new NullPointerException("The connector you are trying to remove was not found"));
}

//Remove the passivated graph from indexing. Upon re-activating, this will be undone
logger.info("Removing passivated graph " + rewrittenConnectorUri+" from indexing ");
indexing.delete(rewrittenConnectorUri);
}

Expand Down