From 1b84312087e170c72d798fea8d14c6c966203804 Mon Sep 17 00:00:00 2001 From: Ashi1993 Date: Wed, 6 Nov 2024 15:41:24 +0530 Subject: [PATCH 1/7] Adding real time event notification --- .../common/util/HTTPClientUtils.java | 12 +- .../internal/EventNotificationComponent.java | 7 +- .../internal/EventNotificationDataHolder.java | 20 +- .../model/RealtimeEventNotification.java | 60 +++++ ...timeEventNotificationRequestGenerator.java | 43 ++++ .../EventNotificationProducerService.java | 104 +++++++++ ...ealtimeEventNotificationLoaderService.java | 104 +++++++++ ...timeEventNotificationRequestGenerator.java | 46 ++++ ...ealtimeEventNotificationSenderService.java | 213 ++++++++++++++++++ ...EventNotificationConsumerJobActivator.java | 91 ++++++++ .../job/EventNotificationConsumerJob.java | 90 ++++++++ ...EventNotificationConsumerJobScheduler.java | 78 +++++++ .../util/EventNotificationServiceUtil.java | 24 +- 13 files changed, 864 insertions(+), 28 deletions(-) create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java index 633ea10c..b32578a4 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java @@ -89,13 +89,13 @@ public static CloseableHttpClient getHttpsClient() throws FinancialServicesExcep } /** - * Get closeable https client to send realtime event notifications. + * Get closeable https client with given max Total and max per route values. * * @return Closeable https client * @throws FinancialServicesException FinancialServicesException exception */ - @Generated(message = "Ignoring since method contains no logics") - public static CloseableHttpClient getRealtimeEventNotificationHttpsClient() throws FinancialServicesException { + @Generated(message = "Ignoring because ServerConfiguration cannot be mocked") + public static CloseableHttpClient getHttpsClient(int maxTotal, int maxPerRoute) throws FinancialServicesException { SSLConnectionSocketFactory sslsf = createSSLConnectionSocketFactory(); @@ -109,10 +109,8 @@ public static CloseableHttpClient getRealtimeEventNotificationHttpsClient() thro new PoolingHttpClientConnectionManager(); // configuring default maximum connections - connectionManager.setMaxTotal(FinancialServicesConfigParser.getInstance() - .getRealtimeEventNotificationMaxRetries() + 1); - connectionManager.setDefaultMaxPerRoute(FinancialServicesConfigParser.getInstance() - .getRealtimeEventNotificationMaxRetries() + 1); + connectionManager.setMaxTotal(maxTotal); + connectionManager.setDefaultMaxPerRoute(maxPerRoute); return HttpClients.custom().setConnectionManager(connectionManager).build(); } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java index 547d5f15..fb924777 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java @@ -29,6 +29,8 @@ import org.wso2.carbon.identity.oauth2.OAuth2Service; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationLoaderService; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator.PeriodicalEventNotificationConsumerJobActivator; /** * The Component class for activating event notification osgi service. @@ -52,9 +54,8 @@ protected void activate(ComponentContext context) { * Initialize the quartz job for consuming the realtime event notifications * Initialize the thread for producing the open state realtime event notifications */ - //TODO: -// new Thread(new RealtimeEventNotificationLoaderService()).start(); -// new PeriodicalEventNotificationConsumerJobActivator().activate(); + new Thread(new RealtimeEventNotificationLoaderService()).start(); + new PeriodicalEventNotificationConsumerJobActivator().activate(); } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java index d16bb2f7..b92a32ef 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java @@ -21,6 +21,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; + +import java.util.concurrent.LinkedBlockingQueue; /** * Data holder for Open Banking Event Notifications. @@ -28,12 +31,11 @@ public class EventNotificationDataHolder { private static Log log = LogFactory.getLog(EventNotificationDataHolder.class); private static volatile EventNotificationDataHolder instance; -// private volatile LinkedBlockingQueue realtimeEventNotificationQueue; + private volatile LinkedBlockingQueue realtimeEventNotificationQueue; private FinancialServicesConfigurationService configService; private EventNotificationDataHolder() { - //TODO -// this.realtimeEventNotificationQueue = new LinkedBlockingQueue<>(); + this.realtimeEventNotificationQueue = new LinkedBlockingQueue<>(); } /** @@ -52,9 +54,9 @@ public static synchronized EventNotificationDataHolder getInstance() { return instance; } -// public LinkedBlockingQueue getRealtimeEventNotificationQueue() { -// return realtimeEventNotificationQueue; -// } + public LinkedBlockingQueue getRealtimeEventNotificationQueue() { + return realtimeEventNotificationQueue; + } public FinancialServicesConfigurationService getFinancialServicesConfigurationService() { @@ -67,7 +69,7 @@ public void setFinancialServicesConfigurationService( this.configService = configService; } -// public void setRealtimeEventNotificationQueue(LinkedBlockingQueue queue) { -// this.realtimeEventNotificationQueue = queue; -// } + public void setRealtimeEventNotificationQueue(LinkedBlockingQueue queue) { + this.realtimeEventNotificationQueue = queue; + } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java new file mode 100644 index 00000000..92ed3483 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.model; + +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +/** + * Model class for real time event notifications. + */ +public class RealtimeEventNotification { + private String callbackUrl = null; + private String securityEventToken = null; // Security Event Token to hold the Event Notification Data + private Notification notification = null; + + public void setCallbackUrl(String callbackUrl) { + this.callbackUrl = callbackUrl; + } + + public void setSecurityEventToken(String notification) { + this.securityEventToken = notification; + } + + public void setNotification(Notification notification) { + this.notification = notification; + } + + public String getCallbackUrl() { + return callbackUrl; + } + + public JSONObject getJsonPayload() { + RealtimeEventNotificationRequestGenerator eventNotificationRequestGenerator = + EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator(); + return eventNotificationRequestGenerator + .getRealtimeEventNotificationPayload(notification, securityEventToken); + } + + public String getNotificationId() { + return notification.getNotificationId(); + } + +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java new file mode 100644 index 00000000..a6198eb9 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; + +import java.util.HashMap; +import java.util.Map; + +/** + * Default class for realtime event notification request generation. + * This is to generate the realtime event notification request payload and headers. + */ +public class DefaultRealtimeEventNotificationRequestGenerator implements RealtimeEventNotificationRequestGenerator { + + @Override + public JSONObject getRealtimeEventNotificationPayload(Notification notificationDTO, String eventSET) { + return new JSONObject("{\"notificationId\": " + notificationDTO.getNotificationId() + ", \"SET\": " + + eventSET + "}"); + } + + @Override + public Map getAdditionalHeaders() { + return new HashMap<>(); + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java new file mode 100644 index 00000000..f09bf18f --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import com.nimbusds.jose.JOSEException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; +import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import org.wso2.financial.services.accelerator.event.notifications.service.model.EventSubscription; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationResponse; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This thread is used to produce the event notification and put it into the realtime event notification queue. + */ +public class EventNotificationProducerService implements Runnable { + private static final Log log = LogFactory.getLog(EventNotificationProducerService.class); + private final Notification notification; + private final List notificationEvents; + + public EventNotificationProducerService( + Notification notification, List notificationEvents) { + this.notification = notification; + this.notificationEvents = notificationEvents; + } + + @Override + public void run() { + + try { + List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() + .getEventSubscriptionsByClientId(notification.getClientId()); + if (subscriptionList.isEmpty()) { + throw new FSEventNotificationException("No subscriptions found for the client ID: " + + notification.getClientId()); + } + + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). + getRealtimeEventNotificationQueue(); + EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. + getEventNotificationGenerator(); + + for (EventSubscription subscription : subscriptionList) { + + List allowedEvents = new ArrayList<>(); + notificationEvents.forEach(notificationEvent -> { + if (subscription.getEventTypes().contains(notificationEvent.getEventType())) { + allowedEvents.add(notificationEvent); + } + }); + + if (allowedEvents.isEmpty()) { + continue; + } + + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setNotification(notification); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + + NotificationResponse notificationResponse = eventNotificationGenerator.generateEventNotificationBody( + notification, allowedEvents); + realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator.generateEventNotification( + NotificationResponse.getJsonNode(notificationResponse))); + + queue.put(realtimeEventNotification); // put the notification into the queue + } + } catch (InterruptedException e) { + log.error("Error when adding the Realtime Notification with notification ID " + + notification.getNotificationId().replaceAll("[\r\n]", "") + + " into the RealtimeEventNotification Queue", e); + } catch (FSEventNotificationException e) { + log.error("Error when generating the event notification", e); + } catch (IOException | JOSEException | IdentityOAuth2Exception e) { + log.error("Error while processing event notification JSON object", e); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java new file mode 100644 index 00000000..89fc705a --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2023, WSO2 LLC. (https://www.wso2.com). All Rights Reserved. + * + * This software is the property of WSO2 LLC. and its suppliers, if any. + * Dissemination of any information or reproduction of any material contained + * herein in any form is strictly forbidden, unless permitted by WSO2 expressly. + * You may not alter or remove any copyright or other notice from copies of this content. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import com.nimbusds.jose.JOSEException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; +import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; +import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; +import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import org.wso2.financial.services.accelerator.event.notifications.service.model.EventSubscription; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationResponse; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; +import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This service is used to add open state event notifications to the realtime event notification queue. + * This service is called whenever the server starts. + */ +public class RealtimeEventNotificationLoaderService implements Runnable { + private static final Log log = LogFactory.getLog(RealtimeEventNotificationLoaderService.class); + + @Override + public void run() { + // Get all open state event notifications from the database and add them to the queue + Connection connection = DatabaseUtils.getDBConnection(); + try { + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). + getRealtimeEventNotificationQueue(); + EventNotificationDAO pollingDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. + getEventNotificationGenerator(); + List openNotifications = pollingDAO.getNotificationsByStatus(connection, + EventNotificationConstants.OPEN); + + for (Notification notification : openNotifications) { + //Get events by notificationId + List notificationEvents = pollingDAO. + getEventsByNotificationID(connection, notification.getNotificationId()); + DatabaseUtils.commitTransaction(connection); + + List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() + .getEventSubscriptionsByClientId(notification.getClientId()); + if (subscriptionList.isEmpty()) { + throw new FSEventNotificationException("No subscriptions found for the client ID: " + + notification.getClientId()); + } + + for (EventSubscription subscription : subscriptionList) { + List allowedEvents = new ArrayList<>(); + notificationEvents.forEach(notificationEvent -> { + if (subscription.getEventTypes().contains(notificationEvent.getEventType())) { + allowedEvents.add(notificationEvent); + } + }); + + if (allowedEvents.isEmpty()) { + continue; + } + + NotificationResponse responseNotification = eventNotificationGenerator. + generateEventNotificationBody(notification, allowedEvents); + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator. + generateEventNotification(NotificationResponse.getJsonNode(responseNotification))); + realtimeEventNotification.setNotification(notification); + queue.put(realtimeEventNotification); // put the notification into the queue + } + } + } catch (InterruptedException e) { + DatabaseUtils.rollbackTransaction(connection); + log.error("Error when adding the Realtime Notification into the RealtimeEventNotification Queue", e); + } catch (FSEventNotificationException e) { + DatabaseUtils.rollbackTransaction(connection); + log.error("Error when generating the event notification", e); + } catch (IOException | JOSEException | IdentityOAuth2Exception e) { + DatabaseUtils.rollbackTransaction(connection); + log.error("Error while processing event notification JSON object", e); + } finally { + DatabaseUtils.closeConnection(connection); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java new file mode 100644 index 00000000..8509a2dc --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; + +import java.util.Map; + +/** + * Interface for event notification request metadata generation. For custom class extensions the class name + * is to be referred from the realtime_event_notification_request_generator in deployment.toml + */ +public interface RealtimeEventNotificationRequestGenerator { + /** + * This method is to generate realtime event notification payload. To generate custom values + * for the body this method should be extended. + * + * @return String payload + */ + JSONObject getRealtimeEventNotificationPayload(Notification notificationDTO, String eventSET); + + /** + * This method is to generate realtime event notification request headers. To generate custom values + * for the body this method should be extended. + * + * @return Map headers + */ + Map getAdditionalHeaders(); +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java new file mode 100644 index 00000000..50d42e38 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java @@ -0,0 +1,213 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.exception.FinancialServicesException; +import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; +import org.wso2.financial.services.accelerator.common.util.HTTPClientUtils; +import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; +import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +import java.io.IOException; +import java.net.URI; +import java.sql.Connection; +import java.time.Duration; +import java.time.LocalTime; +import java.util.Map; + +/** + * This method is used to send the HTTP requests to the TPP provided callback URL. + * Exponential backoff and Circuit breaker based retry policy is used to retry failed POST requests. + */ +public class RealtimeEventNotificationSenderService implements Runnable { + + private static final Log log = LogFactory.getLog(RealtimeEventNotificationSenderService.class); + + private static final FinancialServicesConfigParser configParser = FinancialServicesConfigParser.getInstance(); + private static final int MAX_RETRIES = configParser.getRealtimeEventNotificationMaxRetries(); + private static final int INITIAL_BACKOFF_TIME_IN_SECONDS = + configParser.getRealtimeEventNotificationInitialBackoffTimeInSeconds(); + private static final String BACKOFF_FUNCTION = + configParser.getRealtimeEventNotificationBackoffFunction().replaceAll("[\r\n]", ""); + private static final int CIRCUIT_BREAKER_OPEN_TIMEOUT_IN_SECONDS = + configParser.getRealtimeEventNotificationCircuitBreakerOpenTimeoutInSeconds(); + private static final int TIMEOUT_IN_SECONDS = configParser.getRealtimeEventNotificationTimeoutInSeconds(); + + private CloseableHttpClient httpClient; + private RealtimeEventNotificationRequestGenerator httpRequestGenerator; + private String notificationId; + private String callbackUrl; + private JSONObject payloadJson; + + public RealtimeEventNotificationSenderService(String callbackUrl, JSONObject payloadJson, + String notificationId) { + int maxRetryCount = FinancialServicesConfigParser.getInstance() + .getRealtimeEventNotificationMaxRetries() + 1; + + try { + this.httpClient = HTTPClientUtils.getHttpsClient(maxRetryCount, maxRetryCount); + } catch (FinancialServicesException e) { + log.error("Failed to initialize the HTTP client for the realtime event notification", e); + } + + this.httpRequestGenerator = EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator(); + this.notificationId = notificationId; + this.callbackUrl = callbackUrl; + this.payloadJson = payloadJson; + } + + public void run() { + try { + postWithRetry(); + } catch (FSEventNotificationException e) { + log.error("Failed to send the Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", ""), e); + } + } + + /** + * This method is used to send the HTTP requests to the TPP provided callback URL. + * Exponential backoff and Circuit breaker based retry policy is used to retry failed POST requests. + * + * @throws FSEventNotificationException + */ + private void postWithRetry() throws FSEventNotificationException { + EventNotificationDAO pollingDAO = + EventNotificationStoreInitializer.getEventNotificationDAO(); + int retryCount = 0; + long backoffTimeMs = INITIAL_BACKOFF_TIME_IN_SECONDS * 1000L; + boolean circuitBreakerOpen = false; + LocalTime startTime = LocalTime.now(); + Connection connection = DatabaseUtils.getDBConnection(); + + while (retryCount <= MAX_RETRIES && !circuitBreakerOpen) { + try { + // This if closure will execute only if the initial POST request is failed. + // This includes the retry policy and will execute according to the configurations. + if (retryCount > 0) { + if (log.isDebugEnabled()) { + log.debug("HTTP request Retry #" + retryCount + " - waiting for " + + backoffTimeMs + " ms before trying again"); + } + Thread.sleep(backoffTimeMs); + + switch (BACKOFF_FUNCTION) { + case "CONSTANT": + // Backoff time will not be changed + // Retries will happen in constant time frames + break; + case "LINEAR": + // Backoff time will be doubled after each retry + // nextWaitingTime = 2 x previousWaitingTime + backoffTimeMs *= 2; + break; + case "EX": + // Backoff time will be increased exponentially + // nextWaitingTime = startWaitingTime x e^(retryCount) + backoffTimeMs = (long) + (INITIAL_BACKOFF_TIME_IN_SECONDS + * 1000 * Math.exp(retryCount)); + break; + default: + log.error("Invalid backoff function for the realtime event notification retry policy: " + + BACKOFF_FUNCTION); + throw new IllegalArgumentException( + "Invalid backoff function for the realtime event notification retry policy: " + + BACKOFF_FUNCTION); + } + } + + HttpPost httpPost = new HttpPost(URI.create(callbackUrl)); + + for (Map.Entry entry : httpRequestGenerator.getAdditionalHeaders().entrySet()) { + String headerName = entry.getKey(); + String headerValue = entry.getValue(); + httpPost.setHeader(headerName, headerValue); + } + + httpPost.setEntity(new StringEntity(String.valueOf(payloadJson), ContentType.APPLICATION_JSON)); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(TIMEOUT_IN_SECONDS * 1000) + .setConnectionRequestTimeout(TIMEOUT_IN_SECONDS * 1000) + .setSocketTimeout(TIMEOUT_IN_SECONDS * 1000) + .build(); + httpPost.setConfig(requestConfig); + + HttpResponse response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.SC_ACCEPTED) { + if (log.isDebugEnabled()) { + log.debug("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + " sent successfully"); + } + pollingDAO.updateNotificationStatusById(connection, notificationId, EventNotificationConstants.ACK); + DatabaseUtils.commitTransaction(DatabaseUtils.getDBConnection()); + return; + } else { + if (log.isDebugEnabled()) { + log.debug("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + + " sent failed with status code: " + statusCode); + } + } + + // Circuit breaker will be opened if the retrying time exceeds the configured circuit breaker timeout. + if (Duration.between(startTime, LocalTime.now()).toMillis() + > CIRCUIT_BREAKER_OPEN_TIMEOUT_IN_SECONDS * 1000) { + circuitBreakerOpen = true; + if (log.isDebugEnabled()) { + log.debug("Circuit breaker open for the realtime event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "")); + } + } + retryCount++; + + // If the circuit breaker is opened or the maximum retry count is exceeded, + // the notification status will be updated as ERROR. + pollingDAO.updateNotificationStatusById(connection, notificationId, EventNotificationConstants.ERROR); + DatabaseUtils.commitTransaction(DatabaseUtils.getDBConnection()); + } catch (IOException | InterruptedException e) { + log.error("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); + } catch (FSEventNotificationException e) { + log.error("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); + DatabaseUtils.rollbackTransaction(DatabaseUtils.getDBConnection()); + } finally { + DatabaseUtils.closeConnection(DatabaseUtils.getDBConnection()); + } + + } + + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java new file mode 100644 index 00000000..6d450e01 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.quartz.CronExpression; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.util.Generated; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.job.EventNotificationConsumerJob; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.scheduler.PeriodicalEventNotificationConsumerJobScheduler; + +import java.text.ParseException; +import java.util.Date; + +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + +/** + * Scheduled Task definition and trigger to perform realtime event notification sending based on the cron string. + */ +@Generated(message = "Excluding from code coverage") +public class PeriodicalEventNotificationConsumerJobActivator { + + private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobActivator.class); + private static final String PERIODIC_CRON_EXPRESSION = FinancialServicesConfigParser + .getInstance().getRealtimeEventNotificationSchedulerCronExpression().replaceAll("[\r\n]", ""); + + public void activate() { + int cronInSeconds = 60; + + try { + CronExpression cron = new CronExpression(PERIODIC_CRON_EXPRESSION); + + Date nextValidTime = cron.getNextValidTimeAfter(new Date()); + Date secondValidTime = cron.getNextValidTimeAfter(nextValidTime); + + cronInSeconds = (int) (secondValidTime.getTime() - nextValidTime.getTime()) / 1000; + + } catch (ParseException e) { + log.error("Error while parsing the event notification scheduler cron expression : " + + PERIODIC_CRON_EXPRESSION, e); + } + + JobDetail job = newJob(EventNotificationConsumerJob.class) + .withIdentity("RealtimeEventNotificationJob", "group2") + .build(); + + Trigger trigger = newTrigger() + .withIdentity("periodicalEvenNotificationTrigger", "group2") + .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(cronInSeconds) + .repeatForever()) + .build(); + + try { + Scheduler scheduler = PeriodicalEventNotificationConsumerJobScheduler.getInstance().getScheduler(); + // this check is to remove already stored jobs in clustered mode. + if (scheduler.checkExists(job.getKey())) { + scheduler.deleteJob(job.getKey()); + } + + scheduler.scheduleJob(job, trigger); + log.info("Periodical Realtime Event Notification sender Started with cron : " + + PERIODIC_CRON_EXPRESSION); + } catch (SchedulerException e) { + log.error("Error while starting Periodical Realtime Event Notification sender", e); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java new file mode 100644 index 00000000..8f16605a --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.job; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONObject; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.util.Generated; +import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationSenderService; + +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Scheduled Task to send realtime event notifications to callback Urls. + * This task is scheduled to run periodically. + * This task consumes all the notifications in the queue and send them to the callback urls. + */ +@Generated(message = "Excluding from code coverage") +@DisallowConcurrentExecution +public class EventNotificationConsumerJob implements Job { + + private static final Log log = LogFactory.getLog(EventNotificationConsumerJob.class); + private static final int THREAD_POOL_SIZE = FinancialServicesConfigParser + .getInstance().getEventNotificationThreadPoolSize(); + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + ArrayList notifications = consumeNotifications(); + // send notifications to the callback urls + int threads = Math.min(notifications.size(), THREAD_POOL_SIZE); + int threadPoolSize = Math.max(threads, 2); + + ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); + + for (RealtimeEventNotification notification : notifications) { + String callbackUrl = notification.getCallbackUrl(); + JSONObject payload = notification.getJsonPayload(); + Runnable worker = new RealtimeEventNotificationSenderService(callbackUrl, + payload, notification.getNotificationId()); + executor.execute(worker); + } + + executor.shutdown(); + } + + private static ArrayList consumeNotifications() { + + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance() + .getRealtimeEventNotificationQueue(); + ArrayList notifications = new ArrayList<>(); + + // consume all notifications in the queue + int key = 0; + while (!queue.isEmpty() && key < THREAD_POOL_SIZE) { + key++; + try { + RealtimeEventNotification notification = queue.take(); + notifications.add(notification); + } catch (InterruptedException ex) { + log.error("Error while consuming notifications from the event notification queue", ex); + } + } + return notifications; + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java new file mode 100644 index 00000000..0c63162d --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.impl.StdSchedulerFactory; +import org.wso2.financial.services.accelerator.common.util.Generated; + +/** + * Periodic realtime event notification job scheduler class. + * This class initialize the scheduler and schedule configured jobs and triggers. + */ +@Generated(message = "Excluding from code coverage") +public class PeriodicalEventNotificationConsumerJobScheduler { + private static volatile PeriodicalEventNotificationConsumerJobScheduler instance; + private static volatile Scheduler scheduler; + private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobScheduler.class); + + private PeriodicalEventNotificationConsumerJobScheduler() { + initScheduler(); + } + + public static synchronized PeriodicalEventNotificationConsumerJobScheduler getInstance() { + + if (instance == null) { + synchronized (PeriodicalEventNotificationConsumerJobScheduler.class) { + if (instance == null) { + instance = new PeriodicalEventNotificationConsumerJobScheduler(); + } + } + } + return instance; + } + + private void initScheduler() { + + if (instance != null) { + return; + } + synchronized (PeriodicalEventNotificationConsumerJobScheduler.class) { + try { + scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.start(); + } catch (SchedulerException e) { + log.error("Exception while initializing the Real-time Event notification scheduler", e); + } + } + } + + /** + * Returns the scheduler. + * + * @return Scheduler scheduler. + */ + public Scheduler getScheduler() { + return scheduler; + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java index 94427176..251a98ae 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java @@ -32,8 +32,10 @@ import org.wso2.financial.services.accelerator.common.util.Generated; import org.wso2.financial.services.accelerator.consent.mgt.service.impl.ConsentCoreServiceImpl; import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.EventSubscriptionService; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator; import java.util.Optional; @@ -62,15 +64,14 @@ public static EventNotificationGenerator getEventNotificationGenerator() { * * @return RealtimeEventNotificationRequestGenerator */ - //TODO -// public static RealtimeEventNotificationRequestGenerator getRealtimeEventNotificationRequestGenerator() { -// -// RealtimeEventNotificationRequestGenerator realtimeEventNotificationRequestGenerator = -// (RealtimeEventNotificationRequestGenerator) FinancialServicesUtils -// .getClassInstanceFromFQN(FinancialServicesConfigParser.getInstance(). -// getRealtimeEventNotificationRequestGenerator()); -// return realtimeEventNotificationRequestGenerator; -// } + public static RealtimeEventNotificationRequestGenerator getRealtimeEventNotificationRequestGenerator() { + + RealtimeEventNotificationRequestGenerator realtimeEventNotificationRequestGenerator = + (RealtimeEventNotificationRequestGenerator) FinancialServicesUtils + .getClassInstanceFromFQN(FinancialServicesConfigParser.getInstance(). + getRealtimeEventNotificationRequestGenerator()); + return realtimeEventNotificationRequestGenerator; + } /** * Method to modify event notification payload with custom eventValues. @@ -153,4 +154,9 @@ public static String getErrorDTO(String error, String errorDescription) { eventNotificationError.put(EventNotificationConstants.ERROR_DESCRIPTION_FIELD, errorDescription); return eventNotificationError.toString(); } + + public static EventSubscriptionService getEventSubscriptionService() { + return new EventSubscriptionService(); + } + } From 3ce236496ffbab7816ad8aa3592dde5e563eba5c Mon Sep 17 00:00:00 2001 From: Ashi1993 Date: Mon, 11 Nov 2024 12:04:59 +0530 Subject: [PATCH 2/7] Adding realtime event notification --- .../accelerator/common/util/JWTUtils.java | 10 +- .../pom.xml | 5 + .../service/EventCreationService.java | 11 +- .../service/EventPollingService.java | 2 +- .../service/EventSubscriptionService.java | 2 +- .../service/RealtimeNotificationService.java | 139 ++++++++++++++++++ .../internal/EventNotificationComponent.java | 53 +------ .../internal/EventNotificationDataHolder.java | 11 -- .../EventNotificationProducerService.java | 25 ++-- ...ealtimeEventNotificationLoaderService.java | 44 +++--- ...timeEventNotificationRequestGenerator.java | 2 +- ...ealtimeEventNotificationSenderService.java | 21 +-- ...EventNotificationConsumerJobActivator.java | 10 +- .../util/EventNotificationServiceUtil.java | 5 + 14 files changed, 210 insertions(+), 130 deletions(-) create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java index 234fc03f..1b6bace0 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java @@ -373,8 +373,14 @@ public static KeyStore getTrustStore() throws ConsentManagementException { */ public static String signJWTWithDefaultKey(String body) throws Exception { KeyStoreManager keyStoreManager = KeyStoreManager.getInstance(-1234); - Key privateKey = keyStoreManager.getDefaultPrivateKey(); - return generateJWT(body, privateKey); + KeyStore primaryKeyStore = keyStoreManager.getPrimaryKeyStore(); + if (primaryKeyStore != null) { + Key privateKey = keyStoreManager.getDefaultPrivateKey(); + return generateJWT(body, privateKey); + } else { + throw new FinancialServicesRuntimeException("Error while retrieving the Primary Keystore"); + } + } /** diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml index 4c3f4f99..9802ff14 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml @@ -197,6 +197,8 @@ org.apache.commons.lang3;version="${commons-lang3.version}", org.apache.commons.logging;version="${commons.logging.version}", org.json;version="${org.json.version}", + + com.fasterxml.jackson.annotation;version="${jackson.databinding.version}", com.fasterxml.jackson.databind;version="${jackson.databinding.version}", com.nimbusds.jose;version="${org.wso2.orbit.nimbus.version.range}", @@ -205,7 +207,10 @@ org.wso2.financial.services.accelerator.common.*;version="${project.version}", org.wso2.financial.services.accelerator.consent.mgt.dao.*;version="${project.version}", org.wso2.financial.services.accelerator.consent.mgt.service.*;version="${project.version}", + + + * !org.wso2.financial.services.accelerator.event.notifications.service.internal, org.wso2.financial.services.accelerator.event.notifications.service.*;version="${project.version}", diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java index acefc46a..852aeeb3 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.json.JSONObject; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; @@ -29,6 +30,7 @@ import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.EventNotificationProducerService; import java.sql.Connection; import java.util.ArrayList; @@ -36,7 +38,7 @@ import java.util.UUID; /** - * This is the event creation service class. + * Event creation service class. */ public class EventCreationService { @@ -65,10 +67,9 @@ public String publishEventNotification(NotificationCreationDTO notificationCreat DatabaseUtils.commitTransaction(connection); //TODO: - // Check whether the real time event notification is enabled. -// if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { -// new Thread(new EventNotificationProducerService(notification, eventsList)).start(); -// } + if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { + new Thread(new EventNotificationProducerService(notification, eventsList)).start(); + } return eventResponse; } catch (FSEventNotificationException e) { DatabaseUtils.rollbackTransaction(connection); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java index e2513844..bcda12d5 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java @@ -42,7 +42,7 @@ import java.util.Map; /** - * This is the event polling service. + * Event polling service. */ public class EventPollingService { diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java index 4a4f6499..75d714b6 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java @@ -33,7 +33,7 @@ import java.util.List; /** - * This is the event subscription service class. + * Event subscription service class. */ public class EventSubscriptionService { private static final Log log = LogFactory.getLog(EventSubscriptionService.class); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java new file mode 100644 index 00000000..6e6e46b4 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java @@ -0,0 +1,139 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONException; +import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; +import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; +import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; +import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; + +import java.sql.Connection; +import java.util.List; + +/** + * Realtime Notification service class. + */ +public class RealtimeNotificationService { + + private static final Log log = LogFactory.getLog(RealtimeNotificationService.class); + + /** + * Method to retrieve notification by status. + * + * @param status Notification status to retrieve + * @return List of notifications by status + * @throws FSEventNotificationException Exception when retrieving notifications by status + */ + public List getNotificationsByStatus(String status) + throws FSEventNotificationException { + + EventNotificationDAO eventNotificationDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + + Connection connection = DatabaseUtils.getDBConnection(); + + try { + List events = eventNotificationDAO. + getNotificationsByStatus(connection, status); + if (log.isDebugEnabled()) { + log.debug(String.format("Event Notifications with %s status retrieved successfully.", + status.replaceAll("[\r\n]", ""))); + } + DatabaseUtils.commitTransaction(connection); + return events; + } catch (FSEventNotificationException e) { + log.error("Error while retrieving event notification.", e); + DatabaseUtils.rollbackTransaction(connection); + throw new FSEventNotificationException(EventNotificationConstants.ERROR_STORING_EVENT_SUBSCRIPTION, e); + } finally { + DatabaseUtils.closeConnection(connection); + } + } + + /** + * Method to retrieve notifications by NotificationID. + * + * @param notificationId Notification ID to retrieve + * @return List of notifications by notification ID + * @throws FSEventNotificationException Exception when retrieving notifications by notification ID + */ + public List getEventsByNotificationID(String notificationId) + throws FSEventNotificationException { + + EventNotificationDAO eventNotificationDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + + Connection connection = DatabaseUtils.getDBConnection(); + + try { + //store event subscription data in the database + List notificationEvents = eventNotificationDAO. + getEventsByNotificationID(connection, notificationId); + if (log.isDebugEnabled()) { + log.debug(String.format("Event Notifications with notification id %s retrieved successfully.", + notificationId.replaceAll("[\r\n]", ""))); + } + DatabaseUtils.commitTransaction(connection); + return notificationEvents; + } catch (FSEventNotificationException e) { + log.error("Error while retrieving event notification.", e); + DatabaseUtils.rollbackTransaction(connection); + throw new FSEventNotificationException(EventNotificationConstants.ERROR_STORING_EVENT_SUBSCRIPTION, e); + } finally { + DatabaseUtils.closeConnection(connection); + } + } + + /** + * Method to update the notification status by ID, allowed values are. + * OPEN,ACK and ERR + * + * @param notificationId Notification ID to update + * @param notificationStatus Notification status to update + * @throws FSEventNotificationException Exception when updating notification status by ID + */ + public void updateNotificationStatusById(String notificationId, String notificationStatus) + throws FSEventNotificationException { + + Connection connection = DatabaseUtils.getDBConnection(); + + EventNotificationDAO eventNotificationDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + + try { + //update the stored event notification + eventNotificationDAO.updateNotificationStatusById(connection, notificationId, notificationStatus); + + log.debug("Event Notification updated successfully."); + DatabaseUtils.commitTransaction(connection); + } catch (JSONException e) { + log.error("Error while Parsing the stored request Object", e); + throw new FSEventNotificationException("Error while Parsing the stored request Object", e); + } catch (FSEventNotificationException e) { + log.error("Error while updating event notification.", e); + DatabaseUtils.rollbackTransaction(connection); + throw new FSEventNotificationException(e.getMessage(), e); + } finally { + DatabaseUtils.closeConnection(connection); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java index fb924777..63530205 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java @@ -23,14 +23,8 @@ import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Reference; -import org.osgi.service.component.annotations.ReferenceCardinality; -import org.osgi.service.component.annotations.ReferencePolicy; -import org.wso2.carbon.identity.oauth2.OAuth2Service; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; -import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService; import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationLoaderService; -import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator.PeriodicalEventNotificationConsumerJobActivator; /** * The Component class for activating event notification osgi service. @@ -43,9 +37,7 @@ public class EventNotificationComponent { @Activate protected void activate(ComponentContext context) { - if (log.isDebugEnabled()) { - log.debug("Event Notification Service Component Activated"); - } + log.debug("Event Notification Service Component Activated"); // Check if realtime event notification enabled if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { @@ -54,48 +46,9 @@ protected void activate(ComponentContext context) { * Initialize the quartz job for consuming the realtime event notifications * Initialize the thread for producing the open state realtime event notifications */ + log.debug("Event Notification####"); new Thread(new RealtimeEventNotificationLoaderService()).start(); - new PeriodicalEventNotificationConsumerJobActivator().activate(); +// new PeriodicalEventNotificationConsumerJobActivator().activate(); } } - - /** - * Setters for the descendent OSGI services of the EventNotificationComponent. - * This is added to run the EventNotification OSGI component after the Common module - * @param configService OpenBankingConfigurationService - */ - @Reference( - service = FinancialServicesConfigurationService.class, - cardinality = ReferenceCardinality.MANDATORY, - policy = ReferencePolicy.DYNAMIC, - unbind = "unsetConfigService" - ) - public void setConfigService(FinancialServicesConfigurationService configService) { - EventNotificationDataHolder.getInstance().setFinancialServicesConfigurationService(configService); - } - - public void unsetConfigService(FinancialServicesConfigurationService configService) { - EventNotificationDataHolder.getInstance().setFinancialServicesConfigurationService(null); - } - - /** - * Setters for the descendent OSGI services of the EventNotificationComponent. - * This is added to run the EventNotification OSGI component after the OAuth2Service - */ - @Reference( - service = OAuth2Service.class, - cardinality = ReferenceCardinality.MANDATORY, - policy = ReferencePolicy.DYNAMIC, - unbind = "unsetOAuth2Service" - ) - - /** - * Setters for the descendent OSGI services of the EventNotificationComponent. - * @param oAuth2Service OAuth2Service - */ - public void setOAuth2Service(OAuth2Service oAuth2Service) { - } - - public void unsetOAuth2Service(OAuth2Service oAuth2Service) { - } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java index b92a32ef..bfa2b7c2 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java @@ -58,17 +58,6 @@ public LinkedBlockingQueue getRealtimeEventNotificati return realtimeEventNotificationQueue; } - public FinancialServicesConfigurationService getFinancialServicesConfigurationService() { - - return configService; - } - - public void setFinancialServicesConfigurationService( - FinancialServicesConfigurationService configService) { - - this.configService = configService; - } - public void setRealtimeEventNotificationQueue(LinkedBlockingQueue queue) { this.realtimeEventNotificationQueue = queue; } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java index f09bf18f..2ff5902e 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java @@ -19,6 +19,7 @@ package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; import com.nimbusds.jose.JOSEException; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; @@ -57,7 +58,7 @@ public void run() { try { List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() .getEventSubscriptionsByClientId(notification.getClientId()); - if (subscriptionList.isEmpty()) { + if (CollectionUtils.isEmpty(subscriptionList)) { throw new FSEventNotificationException("No subscriptions found for the client ID: " + notification.getClientId()); } @@ -76,20 +77,18 @@ public void run() { } }); - if (allowedEvents.isEmpty()) { - continue; - } - - RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); - realtimeEventNotification.setNotification(notification); - realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + if (!allowedEvents.isEmpty()) { + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setNotification(notification); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); - NotificationResponse notificationResponse = eventNotificationGenerator.generateEventNotificationBody( - notification, allowedEvents); - realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator.generateEventNotification( - NotificationResponse.getJsonNode(notificationResponse))); + NotificationResponse notificationResponse = eventNotificationGenerator + .generateEventNotificationBody(notification, allowedEvents); + realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator + .generateEventNotification(NotificationResponse.getJsonNode(notificationResponse))); - queue.put(realtimeEventNotification); // put the notification into the queue + queue.put(realtimeEventNotification); // put the notification into the queue + } } } catch (InterruptedException e) { log.error("Error when adding the Realtime Notification with notification ID " + diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java index 89fc705a..d4e069f3 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java @@ -13,10 +13,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; -import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; -import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; import org.wso2.financial.services.accelerator.event.notifications.service.model.EventSubscription; @@ -24,11 +23,9 @@ import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationResponse; import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; -import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; import java.io.IOException; -import java.sql.Connection; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -38,26 +35,26 @@ * This service is called whenever the server starts. */ public class RealtimeEventNotificationLoaderService implements Runnable { + private static final Log log = LogFactory.getLog(RealtimeEventNotificationLoaderService.class); @Override public void run() { // Get all open state event notifications from the database and add them to the queue - Connection connection = DatabaseUtils.getDBConnection(); try { LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). getRealtimeEventNotificationQueue(); - EventNotificationDAO pollingDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. getEventNotificationGenerator(); - List openNotifications = pollingDAO.getNotificationsByStatus(connection, - EventNotificationConstants.OPEN); + RealtimeNotificationService realtimeNotificationService = EventNotificationServiceUtil + .getRealtimeNotificationService(); + List openNotifications = realtimeNotificationService + .getNotificationsByStatus(EventNotificationConstants.OPEN); for (Notification notification : openNotifications) { //Get events by notificationId - List notificationEvents = pollingDAO. - getEventsByNotificationID(connection, notification.getNotificationId()); - DatabaseUtils.commitTransaction(connection); + List notificationEvents = realtimeNotificationService. + getEventsByNotificationID(notification.getNotificationId()); List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() .getEventSubscriptionsByClientId(notification.getClientId()); @@ -74,31 +71,24 @@ public void run() { } }); - if (allowedEvents.isEmpty()) { - continue; + if (!allowedEvents.isEmpty()) { + NotificationResponse responseNotification = eventNotificationGenerator. + generateEventNotificationBody(notification, allowedEvents); + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator. + generateEventNotification(NotificationResponse.getJsonNode(responseNotification))); + realtimeEventNotification.setNotification(notification); + queue.put(realtimeEventNotification); // put the notification into the queue } - - NotificationResponse responseNotification = eventNotificationGenerator. - generateEventNotificationBody(notification, allowedEvents); - RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); - realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); - realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator. - generateEventNotification(NotificationResponse.getJsonNode(responseNotification))); - realtimeEventNotification.setNotification(notification); - queue.put(realtimeEventNotification); // put the notification into the queue } } } catch (InterruptedException e) { - DatabaseUtils.rollbackTransaction(connection); log.error("Error when adding the Realtime Notification into the RealtimeEventNotification Queue", e); } catch (FSEventNotificationException e) { - DatabaseUtils.rollbackTransaction(connection); log.error("Error when generating the event notification", e); } catch (IOException | JOSEException | IdentityOAuth2Exception e) { - DatabaseUtils.rollbackTransaction(connection); log.error("Error while processing event notification JSON object", e); - } finally { - DatabaseUtils.closeConnection(connection); } } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java index 8509a2dc..703f645a 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java @@ -40,7 +40,7 @@ public interface RealtimeEventNotificationRequestGenerator { * This method is to generate realtime event notification request headers. To generate custom values * for the body this method should be extended. * - * @return Map headers + * @return Map of headers */ Map getAdditionalHeaders(); } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java index 50d42e38..c119bc1d 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java @@ -30,17 +30,14 @@ import org.json.JSONObject; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; import org.wso2.financial.services.accelerator.common.exception.FinancialServicesException; -import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; import org.wso2.financial.services.accelerator.common.util.HTTPClientUtils; +import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; -import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; -import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; import java.io.IOException; import java.net.URI; -import java.sql.Connection; import java.time.Duration; import java.time.LocalTime; import java.util.Map; @@ -102,13 +99,12 @@ public void run() { * @throws FSEventNotificationException */ private void postWithRetry() throws FSEventNotificationException { - EventNotificationDAO pollingDAO = - EventNotificationStoreInitializer.getEventNotificationDAO(); + RealtimeNotificationService realtimeNotificationService = EventNotificationServiceUtil + .getRealtimeNotificationService(); int retryCount = 0; long backoffTimeMs = INITIAL_BACKOFF_TIME_IN_SECONDS * 1000L; boolean circuitBreakerOpen = false; LocalTime startTime = LocalTime.now(); - Connection connection = DatabaseUtils.getDBConnection(); while (retryCount <= MAX_RETRIES && !circuitBreakerOpen) { try { @@ -170,8 +166,8 @@ private void postWithRetry() throws FSEventNotificationException { log.debug("Real-time event notification with notificationId: " + notificationId.replaceAll("[\r\n]", "") + " sent successfully"); } - pollingDAO.updateNotificationStatusById(connection, notificationId, EventNotificationConstants.ACK); - DatabaseUtils.commitTransaction(DatabaseUtils.getDBConnection()); + realtimeNotificationService.updateNotificationStatusById(notificationId, + EventNotificationConstants.ACK); return; } else { if (log.isDebugEnabled()) { @@ -194,17 +190,14 @@ private void postWithRetry() throws FSEventNotificationException { // If the circuit breaker is opened or the maximum retry count is exceeded, // the notification status will be updated as ERROR. - pollingDAO.updateNotificationStatusById(connection, notificationId, EventNotificationConstants.ERROR); - DatabaseUtils.commitTransaction(DatabaseUtils.getDBConnection()); + realtimeNotificationService.updateNotificationStatusById(notificationId, + EventNotificationConstants.ERROR); } catch (IOException | InterruptedException e) { log.error("Real-time event notification with notificationId: " + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); } catch (FSEventNotificationException e) { log.error("Real-time event notification with notificationId: " + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); - DatabaseUtils.rollbackTransaction(DatabaseUtils.getDBConnection()); - } finally { - DatabaseUtils.closeConnection(DatabaseUtils.getDBConnection()); } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java index 6d450e01..cb61616c 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java @@ -44,14 +44,14 @@ public class PeriodicalEventNotificationConsumerJobActivator { private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobActivator.class); - private static final String PERIODIC_CRON_EXPRESSION = FinancialServicesConfigParser - .getInstance().getRealtimeEventNotificationSchedulerCronExpression().replaceAll("[\r\n]", ""); public void activate() { int cronInSeconds = 60; + String periodicCronExpression = FinancialServicesConfigParser.getInstance() + .getRealtimeEventNotificationSchedulerCronExpression().replaceAll("[\r\n]", ""); try { - CronExpression cron = new CronExpression(PERIODIC_CRON_EXPRESSION); + CronExpression cron = new CronExpression(periodicCronExpression); Date nextValidTime = cron.getNextValidTimeAfter(new Date()); Date secondValidTime = cron.getNextValidTimeAfter(nextValidTime); @@ -60,7 +60,7 @@ public void activate() { } catch (ParseException e) { log.error("Error while parsing the event notification scheduler cron expression : " - + PERIODIC_CRON_EXPRESSION, e); + + periodicCronExpression, e); } JobDetail job = newJob(EventNotificationConsumerJob.class) @@ -83,7 +83,7 @@ public void activate() { scheduler.scheduleJob(job, trigger); log.info("Periodical Realtime Event Notification sender Started with cron : " - + PERIODIC_CRON_EXPRESSION); + + periodicCronExpression); } catch (SchedulerException e) { log.error("Error while starting Periodical Realtime Event Notification sender", e); } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java index 251a98ae..eba932f1 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java @@ -33,6 +33,7 @@ import org.wso2.financial.services.accelerator.consent.mgt.service.impl.ConsentCoreServiceImpl; import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; import org.wso2.financial.services.accelerator.event.notifications.service.EventSubscriptionService; +import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator; @@ -159,4 +160,8 @@ public static EventSubscriptionService getEventSubscriptionService() { return new EventSubscriptionService(); } + public static RealtimeNotificationService getRealtimeNotificationService() { + return new RealtimeNotificationService(); + } + } From 8f40d67292cb1bf40e911bef2b07755684a84137 Mon Sep 17 00:00:00 2001 From: Ashi1993 Date: Mon, 11 Nov 2024 17:06:49 +0530 Subject: [PATCH 3/7] Adding realtime event notification --- .../internal/EventNotificationComponent.java | 56 +++++++++++++++---- .../internal/EventNotificationDataHolder.java | 10 ++++ 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java index 63530205..01a3e674 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java @@ -6,7 +6,7 @@ * in compliance with the License. * You may obtain a copy of the License at *

- * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -23,8 +23,14 @@ import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.wso2.carbon.identity.oauth2.OAuth2Service; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService; import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationLoaderService; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator.PeriodicalEventNotificationConsumerJobActivator; /** * The Component class for activating event notification osgi service. @@ -33,22 +39,52 @@ name = "org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationComponent", immediate = true) public class EventNotificationComponent { - private static Log log = LogFactory.getLog(EventNotificationComponent.class); + private static final Log log = LogFactory.getLog(EventNotificationComponent.class); @Activate - protected void activate(ComponentContext context) { - log.debug("Event Notification Service Component Activated"); + protected void activate(ComponentContext context) throws Exception { + log.info("Event Notification Service Component Activated"); // Check if realtime event notification enabled if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { - /* - * Initialize the blocking queue for storing the realtime event notifications - * Initialize the quartz job for consuming the realtime event notifications - * Initialize the thread for producing the open state realtime event notifications - */ log.debug("Event Notification####"); new Thread(new RealtimeEventNotificationLoaderService()).start(); -// new PeriodicalEventNotificationConsumerJobActivator().activate(); + new PeriodicalEventNotificationConsumerJobActivator().activate(); } } + + /** + * Setters for the descendent OSGI services of the EventNotificationComponent. + * This is added to run the EventNotification OSGI component after the Common module + */ + @Reference( + service = FinancialServicesConfigurationService.class, + cardinality = ReferenceCardinality.MANDATORY, + policy = ReferencePolicy.DYNAMIC, + unbind = "unsetConfigService" + ) + public void setConfigService(FinancialServicesConfigurationService configurationService) { + EventNotificationDataHolder.getInstance().setConfigService(configurationService); + } + + public void unsetConfigService(FinancialServicesConfigurationService configurationService) { + EventNotificationDataHolder.getInstance().setConfigService(null); + } + + /** + * Setters for the descendent OSGI services of the EventNotificationComponent. + * This is added to run the EventNotification OSGI component after the OAuth2Service + */ + @Reference( + service = OAuth2Service.class, + cardinality = ReferenceCardinality.MANDATORY, + policy = ReferencePolicy.DYNAMIC, + unbind = "unsetOAuth2Service" + ) + + public void setOAuth2Service(OAuth2Service oAuth2Service) { + } + + public void unsetOAuth2Service(OAuth2Service oAuth2Service) { + } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java index bfa2b7c2..63d08354 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java @@ -61,4 +61,14 @@ public LinkedBlockingQueue getRealtimeEventNotificati public void setRealtimeEventNotificationQueue(LinkedBlockingQueue queue) { this.realtimeEventNotificationQueue = queue; } + + public FinancialServicesConfigurationService getConfigService() { + + return configService; + } + + public void setConfigService(FinancialServicesConfigurationService configService) { + + this.configService = configService; + } } From c2997c3468bb17faf167d0b51d35e5bceddc8b33 Mon Sep 17 00:00:00 2001 From: Ashi1993 Date: Wed, 4 Dec 2024 14:45:22 +0530 Subject: [PATCH 4/7] Adding realtime notification code --- .../accelerator/common/util/HTTPClientUtils.java | 4 ++-- .../notifications/service/EventCreationService.java | 1 - .../service/RealtimeNotificationService.java | 3 +++ .../service/internal/EventNotificationComponent.java | 12 +++++++++--- .../service/EventNotificationProducerService.java | 1 + .../RealtimeEventNotificationSenderService.java | 2 -- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java index b32578a4..59cb2767 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java @@ -66,7 +66,7 @@ public class HTTPClientUtils { * @return Closeable https client * @throws FinancialServicesException FinancialServicesException exception */ - @Generated(message = "Ignoring because ServerConfiguration cannot be mocked") + @Generated(message = "Ignoring since method contains no logics") public static CloseableHttpClient getHttpsClient() throws FinancialServicesException { SSLConnectionSocketFactory sslsf = createSSLConnectionSocketFactory(); @@ -94,7 +94,7 @@ public static CloseableHttpClient getHttpsClient() throws FinancialServicesExcep * @return Closeable https client * @throws FinancialServicesException FinancialServicesException exception */ - @Generated(message = "Ignoring because ServerConfiguration cannot be mocked") + @Generated(message = "Ignoring since method contains no logics") public static CloseableHttpClient getHttpsClient(int maxTotal, int maxPerRoute) throws FinancialServicesException { SSLConnectionSocketFactory sslsf = createSSLConnectionSocketFactory(); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java index 852aeeb3..cd7e4d87 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java @@ -66,7 +66,6 @@ public String publishEventNotification(NotificationCreationDTO notificationCreat eventResponse = eventCreationDAO.persistEventNotification(connection, notification, eventsList); DatabaseUtils.commitTransaction(connection); - //TODO: if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { new Thread(new EventNotificationProducerService(notification, eventsList)).start(); } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java index 6e6e46b4..43dc3fea 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java @@ -67,6 +67,7 @@ public List getNotificationsByStatus(String status) DatabaseUtils.rollbackTransaction(connection); throw new FSEventNotificationException(EventNotificationConstants.ERROR_STORING_EVENT_SUBSCRIPTION, e); } finally { + log.debug(EventNotificationConstants.DATABASE_CONNECTION_CLOSE_LOG_MSG); DatabaseUtils.closeConnection(connection); } } @@ -100,6 +101,7 @@ public List getEventsByNotificationID(String notificationId) DatabaseUtils.rollbackTransaction(connection); throw new FSEventNotificationException(EventNotificationConstants.ERROR_STORING_EVENT_SUBSCRIPTION, e); } finally { + log.debug(EventNotificationConstants.DATABASE_CONNECTION_CLOSE_LOG_MSG); DatabaseUtils.closeConnection(connection); } } @@ -133,6 +135,7 @@ public void updateNotificationStatusById(String notificationId, String notificat DatabaseUtils.rollbackTransaction(connection); throw new FSEventNotificationException(e.getMessage(), e); } finally { + log.debug(EventNotificationConstants.DATABASE_CONNECTION_CLOSE_LOG_MSG); DatabaseUtils.closeConnection(connection); } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java index d23a9bf0..53b6ff8a 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java @@ -39,15 +39,21 @@ name = "org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationComponent", immediate = true) public class EventNotificationComponent { + private static final Log log = LogFactory.getLog(EventNotificationComponent.class); @Activate protected void activate(ComponentContext context) throws Exception { - log.info("Event Notification Service Component Activated"); + log.debug("Event Notification Service Component Activated"); // Check if realtime event notification enabled if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { - log.info("Realtime Event Notification Service Activated"); + /* + * Initialize the blocking queue for storing the realtime event notifications + * Initialize the quartz job for consuming the realtime event notifications + * Initialize the thread for producing the open state realtime event notifications + */ + log.debug("Realtime Event Notification Service Activated"); new Thread(new RealtimeEventNotificationLoaderService()).start(); new PeriodicalEventNotificationConsumerJobActivator().activate(); } @@ -56,6 +62,7 @@ protected void activate(ComponentContext context) throws Exception { /** * Setters for the descendent OSGI services of the EventNotificationComponent. * This is added to run the EventNotification OSGI component after the Common module + * @param configurationService FinancialServicesConfigurationService */ @Reference( service = FinancialServicesConfigurationService.class, @@ -81,7 +88,6 @@ public void unsetConfigService(FinancialServicesConfigurationService configurati policy = ReferencePolicy.DYNAMIC, unbind = "unsetOAuth2Service" ) - public void setOAuth2Service(OAuth2Service oAuth2Service) { } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java index 2ff5902e..e86462f5 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java @@ -48,6 +48,7 @@ public class EventNotificationProducerService implements Runnable { public EventNotificationProducerService( Notification notification, List notificationEvents) { + this.notification = notification; this.notificationEvents = notificationEvents; } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java index c119bc1d..6957423e 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java @@ -199,8 +199,6 @@ private void postWithRetry() throws FSEventNotificationException { log.error("Real-time event notification with notificationId: " + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); } - } - } } From e4069662724a674173f0200ab53a26c54be887e7 Mon Sep 17 00:00:00 2001 From: ashirwadadayarathne Date: Mon, 9 Dec 2024 14:09:30 +0530 Subject: [PATCH 5/7] fix review comments --- financial-services-accelerator/accelerators/fs-is/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/financial-services-accelerator/accelerators/fs-is/pom.xml b/financial-services-accelerator/accelerators/fs-is/pom.xml index e53ceba6..c33a60a5 100644 --- a/financial-services-accelerator/accelerators/fs-is/pom.xml +++ b/financial-services-accelerator/accelerators/fs-is/pom.xml @@ -48,6 +48,7 @@ **/commons-beanutils-1.9.4.jar **/hibernate-validator-6.0.20.Final.jar **/validation-api-2.0.1.Final.jar + **/quartz-2.3.2.jar @@ -117,6 +118,13 @@ regex="org.wso2.financial.services.accelerator.consent.mgt.dao-(\d.*?)\.jar$"/> + + + + + From ba4ed70ea820b5c5b7ede32adbf04320e3e584f2 Mon Sep 17 00:00:00 2001 From: ashirwadadayarathne Date: Mon, 9 Dec 2024 15:42:30 +0530 Subject: [PATCH 6/7] Adding realtime notification fixes --- .../pom.xml | 6 +----- .../realtime/service/EventNotificationProducerService.java | 3 ++- .../service/RealtimeEventNotificationLoaderService.java | 3 ++- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml index 9802ff14..edc8d0a5 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml @@ -197,8 +197,6 @@ org.apache.commons.lang3;version="${commons-lang3.version}", org.apache.commons.logging;version="${commons.logging.version}", org.json;version="${org.json.version}", - - com.fasterxml.jackson.annotation;version="${jackson.databinding.version}", com.fasterxml.jackson.databind;version="${jackson.databinding.version}", com.nimbusds.jose;version="${org.wso2.orbit.nimbus.version.range}", @@ -206,9 +204,7 @@ org.wso2.carbon.identity.oauth2.*;version="${identity.inbound.auth.oauth.version.range}", org.wso2.financial.services.accelerator.common.*;version="${project.version}", org.wso2.financial.services.accelerator.consent.mgt.dao.*;version="${project.version}", - org.wso2.financial.services.accelerator.consent.mgt.service.*;version="${project.version}", - - + org.wso2.financial.services.accelerator.consent.mgt.service.*;version="${project.version}" * diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java index e86462f5..54692e26 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java @@ -20,6 +20,7 @@ import com.nimbusds.jose.JOSEException; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; @@ -78,7 +79,7 @@ public void run() { } }); - if (!allowedEvents.isEmpty()) { + if (!allowedEvents.isEmpty() && StringUtils.isNotEmpty(subscription.getCallbackUrl())) { RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); realtimeEventNotification.setNotification(notification); realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java index d4e069f3..01fd2e64 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java @@ -10,6 +10,7 @@ package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; import com.nimbusds.jose.JOSEException; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; @@ -71,7 +72,7 @@ public void run() { } }); - if (!allowedEvents.isEmpty()) { + if (!allowedEvents.isEmpty() && StringUtils.isNotEmpty(subscription.getCallbackUrl())) { NotificationResponse responseNotification = eventNotificationGenerator. generateEventNotificationBody(notification, allowedEvents); RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); From 00babc11f6c634ec7023e46cba1a28b73d4377e7 Mon Sep 17 00:00:00 2001 From: ashirwadadayarathne Date: Wed, 11 Dec 2024 15:30:14 +0530 Subject: [PATCH 7/7] fixed review comments --- .../service/RealtimeNotificationService.java | 6 ++- .../constants/EventNotificationConstants.java | 38 +++++++++++++++++++ ...ealtimeEventNotificationSenderService.java | 9 ++--- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java index 43dc3fea..162b172d 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java @@ -114,7 +114,8 @@ public List getEventsByNotificationID(String notificationId) * @param notificationStatus Notification status to update * @throws FSEventNotificationException Exception when updating notification status by ID */ - public void updateNotificationStatusById(String notificationId, String notificationStatus) + public void updateNotificationStatusById(String notificationId, + EventNotificationConstants.EventNotificationStatusEnum notificationStatus) throws FSEventNotificationException { Connection connection = DatabaseUtils.getDBConnection(); @@ -123,7 +124,8 @@ public void updateNotificationStatusById(String notificationId, String notificat try { //update the stored event notification - eventNotificationDAO.updateNotificationStatusById(connection, notificationId, notificationStatus); + eventNotificationDAO.updateNotificationStatusById(connection, notificationId, + notificationStatus.toString()); log.debug("Event Notification updated successfully."); DatabaseUtils.commitTransaction(connection); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java index 78943682..7d4caf2e 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java @@ -18,6 +18,10 @@ package org.wso2.financial.services.accelerator.event.notifications.service.constants; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + /** * Event Notification Constants. */ @@ -31,6 +35,40 @@ public class EventNotificationConstants { public static final String ERROR = "ERR"; public static final String OPEN = "OPEN"; + /** + * Specifies the Schema Names of Debtor Account. + */ + public enum EventNotificationStatusEnum { + + ACK("ACK"), + + ERROR("ERR"), + + OPEN("OPEN"); + + private final String value; + + EventNotificationStatusEnum(String value) { + this.value = value; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + public static EventNotificationStatusEnum fromValue(String text) { + + List valueList = Arrays.asList(EventNotificationStatusEnum.values()); + Optional accountOpt = valueList + .stream() + .filter(i -> String.valueOf(i.value).equals(text)) + .findAny(); + + return accountOpt.orElse(null); + } + } + //Response Status public static final String NOT_FOUND = "NOTFOUND"; public static final String OK = "OK"; diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java index 6957423e..455d481d 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java @@ -167,7 +167,7 @@ private void postWithRetry() throws FSEventNotificationException { notificationId.replaceAll("[\r\n]", "") + " sent successfully"); } realtimeNotificationService.updateNotificationStatusById(notificationId, - EventNotificationConstants.ACK); + EventNotificationConstants.EventNotificationStatusEnum.ACK); return; } else { if (log.isDebugEnabled()) { @@ -191,11 +191,8 @@ private void postWithRetry() throws FSEventNotificationException { // If the circuit breaker is opened or the maximum retry count is exceeded, // the notification status will be updated as ERROR. realtimeNotificationService.updateNotificationStatusById(notificationId, - EventNotificationConstants.ERROR); - } catch (IOException | InterruptedException e) { - log.error("Real-time event notification with notificationId: " + - notificationId.replaceAll("[\r\n]", "") + " sent failed", e); - } catch (FSEventNotificationException e) { + EventNotificationConstants.EventNotificationStatusEnum.ERROR); + } catch (IOException | InterruptedException | FSEventNotificationException e) { log.error("Real-time event notification with notificationId: " + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); }