+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.json.JSONException;
+import org.wso2.financial.services.accelerator.common.util.DatabaseUtils;
+import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants;
+import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO;
+import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent;
+import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer;
+
+import java.sql.Connection;
+import java.util.List;
+
+/**
+ * Realtime Notification service class.
+ */
+public class RealtimeNotificationService {
+
+ private static final Log log = LogFactory.getLog(RealtimeNotificationService.class);
+
+ /**
+ * Method to retrieve notification by status.
+ *
+ * @param status Notification status to retrieve
+ * @return List of notifications by status
+ * @throws FSEventNotificationException Exception when retrieving notifications by status
+ */
+ public List
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -29,6 +29,8 @@
import org.wso2.carbon.identity.oauth2.OAuth2Service;
import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser;
import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationLoaderService;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator.PeriodicalEventNotificationConsumerJobActivator;
/**
* The Component class for activating event notification osgi service.
@@ -37,10 +39,11 @@
name = "org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationComponent",
immediate = true)
public class EventNotificationComponent {
- private static Log log = LogFactory.getLog(EventNotificationComponent.class);
+
+ private static final Log log = LogFactory.getLog(EventNotificationComponent.class);
@Activate
- protected void activate(ComponentContext context) {
+ protected void activate(ComponentContext context) throws Exception {
log.debug("Event Notification Service Component Activated");
// Check if realtime event notification enabled
@@ -50,16 +53,16 @@ protected void activate(ComponentContext context) {
* Initialize the quartz job for consuming the realtime event notifications
* Initialize the thread for producing the open state realtime event notifications
*/
- //TODO:
-// new Thread(new RealtimeEventNotificationLoaderService()).start();
-// new PeriodicalEventNotificationConsumerJobActivator().activate();
+ log.debug("Realtime Event Notification Service Activated");
+ new Thread(new RealtimeEventNotificationLoaderService()).start();
+ new PeriodicalEventNotificationConsumerJobActivator().activate();
}
}
/**
* Setters for the descendent OSGI services of the EventNotificationComponent.
* This is added to run the EventNotification OSGI component after the Common module
- * @param configService OpenBankingConfigurationService
+ * @param configurationService FinancialServicesConfigurationService
*/
@Reference(
service = FinancialServicesConfigurationService.class,
@@ -67,12 +70,12 @@ protected void activate(ComponentContext context) {
policy = ReferencePolicy.DYNAMIC,
unbind = "unsetConfigService"
)
- public void setConfigService(FinancialServicesConfigurationService configService) {
- EventNotificationDataHolder.getInstance().setFinancialServicesConfigurationService(configService);
+ public void setConfigService(FinancialServicesConfigurationService configurationService) {
+ EventNotificationDataHolder.getInstance().setConfigService(configurationService);
}
- public void unsetConfigService(FinancialServicesConfigurationService configService) {
- EventNotificationDataHolder.getInstance().setFinancialServicesConfigurationService(null);
+ public void unsetConfigService(FinancialServicesConfigurationService configurationService) {
+ EventNotificationDataHolder.getInstance().setConfigService(null);
}
/**
@@ -85,11 +88,6 @@ public void unsetConfigService(FinancialServicesConfigurationService configServi
policy = ReferencePolicy.DYNAMIC,
unbind = "unsetOAuth2Service"
)
-
- /**
- * Setters for the descendent OSGI services of the EventNotificationComponent.
- * @param oAuth2Service OAuth2Service
- */
public void setOAuth2Service(OAuth2Service oAuth2Service) {
}
diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java
index d16bb2f7..63d08354 100644
--- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java
+++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java
@@ -21,6 +21,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification;
+
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Data holder for Open Banking Event Notifications.
@@ -28,12 +31,11 @@
public class EventNotificationDataHolder {
private static Log log = LogFactory.getLog(EventNotificationDataHolder.class);
private static volatile EventNotificationDataHolder instance;
-// private volatile LinkedBlockingQueue
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.model;
+
+import org.json.JSONObject;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator;
+import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
+
+/**
+ * Model class for real time event notifications.
+ */
+public class RealtimeEventNotification {
+ private String callbackUrl = null;
+ private String securityEventToken = null; // Security Event Token to hold the Event Notification Data
+ private Notification notification = null;
+
+ public void setCallbackUrl(String callbackUrl) {
+ this.callbackUrl = callbackUrl;
+ }
+
+ public void setSecurityEventToken(String notification) {
+ this.securityEventToken = notification;
+ }
+
+ public void setNotification(Notification notification) {
+ this.notification = notification;
+ }
+
+ public String getCallbackUrl() {
+ return callbackUrl;
+ }
+
+ public JSONObject getJsonPayload() {
+ RealtimeEventNotificationRequestGenerator eventNotificationRequestGenerator =
+ EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator();
+ return eventNotificationRequestGenerator
+ .getRealtimeEventNotificationPayload(notification, securityEventToken);
+ }
+
+ public String getNotificationId() {
+ return notification.getNotificationId();
+ }
+
+}
diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java
new file mode 100644
index 00000000..a6198eb9
--- /dev/null
+++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
+ *
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service;
+
+import org.json.JSONObject;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Default class for realtime event notification request generation.
+ * This is to generate the realtime event notification request payload and headers.
+ */
+public class DefaultRealtimeEventNotificationRequestGenerator implements RealtimeEventNotificationRequestGenerator {
+
+ @Override
+ public JSONObject getRealtimeEventNotificationPayload(Notification notificationDTO, String eventSET) {
+ return new JSONObject("{\"notificationId\": " + notificationDTO.getNotificationId() + ", \"SET\": "
+ + eventSET + "}");
+ }
+
+ @Override
+ public Map
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service;
+
+import com.nimbusds.jose.JOSEException;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception;
+import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator;
+import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException;
+import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.EventSubscription;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationResponse;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification;
+import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This thread is used to produce the event notification and put it into the realtime event notification queue.
+ */
+public class EventNotificationProducerService implements Runnable {
+ private static final Log log = LogFactory.getLog(EventNotificationProducerService.class);
+ private final Notification notification;
+ private final List
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service;
+
+import org.json.JSONObject;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification;
+
+import java.util.Map;
+
+/**
+ * Interface for event notification request metadata generation. For custom class extensions the class name
+ * is to be referred from the realtime_event_notification_request_generator in deployment.toml
+ */
+public interface RealtimeEventNotificationRequestGenerator {
+ /**
+ * This method is to generate realtime event notification payload. To generate custom values
+ * for the body this method should be extended.
+ *
+ * @return String payload
+ */
+ JSONObject getRealtimeEventNotificationPayload(Notification notificationDTO, String eventSET);
+
+ /**
+ * This method is to generate realtime event notification request headers. To generate custom values
+ * for the body this method should be extended.
+ *
+ * @return Map of headers
+ */
+ Map
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.json.JSONObject;
+import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser;
+import org.wso2.financial.services.accelerator.common.exception.FinancialServicesException;
+import org.wso2.financial.services.accelerator.common.util.HTTPClientUtils;
+import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService;
+import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants;
+import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException;
+import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.util.Map;
+
+/**
+ * This method is used to send the HTTP requests to the TPP provided callback URL.
+ * Exponential backoff and Circuit breaker based retry policy is used to retry failed POST requests.
+ */
+public class RealtimeEventNotificationSenderService implements Runnable {
+
+ private static final Log log = LogFactory.getLog(RealtimeEventNotificationSenderService.class);
+
+ private static final FinancialServicesConfigParser configParser = FinancialServicesConfigParser.getInstance();
+ private static final int MAX_RETRIES = configParser.getRealtimeEventNotificationMaxRetries();
+ private static final int INITIAL_BACKOFF_TIME_IN_SECONDS =
+ configParser.getRealtimeEventNotificationInitialBackoffTimeInSeconds();
+ private static final String BACKOFF_FUNCTION =
+ configParser.getRealtimeEventNotificationBackoffFunction().replaceAll("[\r\n]", "");
+ private static final int CIRCUIT_BREAKER_OPEN_TIMEOUT_IN_SECONDS =
+ configParser.getRealtimeEventNotificationCircuitBreakerOpenTimeoutInSeconds();
+ private static final int TIMEOUT_IN_SECONDS = configParser.getRealtimeEventNotificationTimeoutInSeconds();
+
+ private CloseableHttpClient httpClient;
+ private RealtimeEventNotificationRequestGenerator httpRequestGenerator;
+ private String notificationId;
+ private String callbackUrl;
+ private JSONObject payloadJson;
+
+ public RealtimeEventNotificationSenderService(String callbackUrl, JSONObject payloadJson,
+ String notificationId) {
+ int maxRetryCount = FinancialServicesConfigParser.getInstance()
+ .getRealtimeEventNotificationMaxRetries() + 1;
+
+ try {
+ this.httpClient = HTTPClientUtils.getHttpsClient(maxRetryCount, maxRetryCount);
+ } catch (FinancialServicesException e) {
+ log.error("Failed to initialize the HTTP client for the realtime event notification", e);
+ }
+
+ this.httpRequestGenerator = EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator();
+ this.notificationId = notificationId;
+ this.callbackUrl = callbackUrl;
+ this.payloadJson = payloadJson;
+ }
+
+ public void run() {
+ try {
+ postWithRetry();
+ } catch (FSEventNotificationException e) {
+ log.error("Failed to send the Real-time event notification with notificationId: "
+ + notificationId.replaceAll("[\r\n]", ""), e);
+ }
+ }
+
+ /**
+ * This method is used to send the HTTP requests to the TPP provided callback URL.
+ * Exponential backoff and Circuit breaker based retry policy is used to retry failed POST requests.
+ *
+ * @throws FSEventNotificationException
+ */
+ private void postWithRetry() throws FSEventNotificationException {
+ RealtimeNotificationService realtimeNotificationService = EventNotificationServiceUtil
+ .getRealtimeNotificationService();
+ int retryCount = 0;
+ long backoffTimeMs = INITIAL_BACKOFF_TIME_IN_SECONDS * 1000L;
+ boolean circuitBreakerOpen = false;
+ LocalTime startTime = LocalTime.now();
+
+ while (retryCount <= MAX_RETRIES && !circuitBreakerOpen) {
+ try {
+ // This if closure will execute only if the initial POST request is failed.
+ // This includes the retry policy and will execute according to the configurations.
+ if (retryCount > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("HTTP request Retry #" + retryCount + " - waiting for "
+ + backoffTimeMs + " ms before trying again");
+ }
+ Thread.sleep(backoffTimeMs);
+
+ switch (BACKOFF_FUNCTION) {
+ case "CONSTANT":
+ // Backoff time will not be changed
+ // Retries will happen in constant time frames
+ break;
+ case "LINEAR":
+ // Backoff time will be doubled after each retry
+ // nextWaitingTime = 2 x previousWaitingTime
+ backoffTimeMs *= 2;
+ break;
+ case "EX":
+ // Backoff time will be increased exponentially
+ // nextWaitingTime = startWaitingTime x e^(retryCount)
+ backoffTimeMs = (long)
+ (INITIAL_BACKOFF_TIME_IN_SECONDS
+ * 1000 * Math.exp(retryCount));
+ break;
+ default:
+ log.error("Invalid backoff function for the realtime event notification retry policy: "
+ + BACKOFF_FUNCTION);
+ throw new IllegalArgumentException(
+ "Invalid backoff function for the realtime event notification retry policy: "
+ + BACKOFF_FUNCTION);
+ }
+ }
+
+ HttpPost httpPost = new HttpPost(URI.create(callbackUrl));
+
+ for (Map.Entry
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.CronExpression;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.Trigger;
+import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser;
+import org.wso2.financial.services.accelerator.common.util.Generated;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.job.EventNotificationConsumerJob;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.scheduler.PeriodicalEventNotificationConsumerJobScheduler;
+
+import java.text.ParseException;
+import java.util.Date;
+
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+/**
+ * Scheduled Task definition and trigger to perform realtime event notification sending based on the cron string.
+ */
+@Generated(message = "Excluding from code coverage")
+public class PeriodicalEventNotificationConsumerJobActivator {
+
+ private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobActivator.class);
+
+ public void activate() {
+ int cronInSeconds = 60;
+ String periodicCronExpression = FinancialServicesConfigParser.getInstance()
+ .getRealtimeEventNotificationSchedulerCronExpression().replaceAll("[\r\n]", "");
+
+ try {
+ CronExpression cron = new CronExpression(periodicCronExpression);
+
+ Date nextValidTime = cron.getNextValidTimeAfter(new Date());
+ Date secondValidTime = cron.getNextValidTimeAfter(nextValidTime);
+
+ cronInSeconds = (int) (secondValidTime.getTime() - nextValidTime.getTime()) / 1000;
+
+ } catch (ParseException e) {
+ log.error("Error while parsing the event notification scheduler cron expression : "
+ + periodicCronExpression, e);
+ }
+
+ JobDetail job = newJob(EventNotificationConsumerJob.class)
+ .withIdentity("RealtimeEventNotificationJob", "group2")
+ .build();
+
+ Trigger trigger = newTrigger()
+ .withIdentity("periodicalEvenNotificationTrigger", "group2")
+ .withSchedule(SimpleScheduleBuilder.simpleSchedule()
+ .withIntervalInSeconds(cronInSeconds)
+ .repeatForever())
+ .build();
+
+ try {
+ Scheduler scheduler = PeriodicalEventNotificationConsumerJobScheduler.getInstance().getScheduler();
+ // this check is to remove already stored jobs in clustered mode.
+ if (scheduler.checkExists(job.getKey())) {
+ scheduler.deleteJob(job.getKey());
+ }
+
+ scheduler.scheduleJob(job, trigger);
+ log.info("Periodical Realtime Event Notification sender Started with cron : "
+ + periodicCronExpression);
+ } catch (SchedulerException e) {
+ log.error("Error while starting Periodical Realtime Event Notification sender", e);
+ }
+ }
+}
diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java
new file mode 100644
index 00000000..8f16605a
--- /dev/null
+++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com).
+ *
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.job;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.json.JSONObject;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser;
+import org.wso2.financial.services.accelerator.common.util.Generated;
+import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder;
+import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationSenderService;
+
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Scheduled Task to send realtime event notifications to callback Urls.
+ * This task is scheduled to run periodically.
+ * This task consumes all the notifications in the queue and send them to the callback urls.
+ */
+@Generated(message = "Excluding from code coverage")
+@DisallowConcurrentExecution
+public class EventNotificationConsumerJob implements Job {
+
+ private static final Log log = LogFactory.getLog(EventNotificationConsumerJob.class);
+ private static final int THREAD_POOL_SIZE = FinancialServicesConfigParser
+ .getInstance().getEventNotificationThreadPoolSize();
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) {
+ ArrayList
+ * WSO2 LLC. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.wso2.financial.services.accelerator.common.util.Generated;
+
+/**
+ * Periodic realtime event notification job scheduler class.
+ * This class initialize the scheduler and schedule configured jobs and triggers.
+ */
+@Generated(message = "Excluding from code coverage")
+public class PeriodicalEventNotificationConsumerJobScheduler {
+ private static volatile PeriodicalEventNotificationConsumerJobScheduler instance;
+ private static volatile Scheduler scheduler;
+ private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobScheduler.class);
+
+ private PeriodicalEventNotificationConsumerJobScheduler() {
+ initScheduler();
+ }
+
+ public static synchronized PeriodicalEventNotificationConsumerJobScheduler getInstance() {
+
+ if (instance == null) {
+ synchronized (PeriodicalEventNotificationConsumerJobScheduler.class) {
+ if (instance == null) {
+ instance = new PeriodicalEventNotificationConsumerJobScheduler();
+ }
+ }
+ }
+ return instance;
+ }
+
+ private void initScheduler() {
+
+ if (instance != null) {
+ return;
+ }
+ synchronized (PeriodicalEventNotificationConsumerJobScheduler.class) {
+ try {
+ scheduler = StdSchedulerFactory.getDefaultScheduler();
+ scheduler.start();
+ } catch (SchedulerException e) {
+ log.error("Exception while initializing the Real-time Event notification scheduler", e);
+ }
+ }
+ }
+
+ /**
+ * Returns the scheduler.
+ *
+ * @return Scheduler scheduler.
+ */
+ public Scheduler getScheduler() {
+ return scheduler;
+ }
+}
diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java
index b3916f14..496afb29 100644
--- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java
+++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java
@@ -31,8 +31,11 @@
import org.wso2.financial.services.accelerator.common.util.Generated;
import org.wso2.financial.services.accelerator.consent.mgt.service.impl.ConsentCoreServiceImpl;
import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator;
+import org.wso2.financial.services.accelerator.event.notifications.service.EventSubscriptionService;
+import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService;
import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants;
import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException;
+import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator;
import java.util.Optional;
@@ -61,15 +64,14 @@ public static EventNotificationGenerator getEventNotificationGenerator() {
*
* @return RealtimeEventNotificationRequestGenerator
*/
- //TODO
-// public static RealtimeEventNotificationRequestGenerator getRealtimeEventNotificationRequestGenerator() {
-//
-// RealtimeEventNotificationRequestGenerator realtimeEventNotificationRequestGenerator =
-// (RealtimeEventNotificationRequestGenerator) FinancialServicesUtils
-// .getClassInstanceFromFQN(FinancialServicesConfigParser.getInstance().
-// getRealtimeEventNotificationRequestGenerator());
-// return realtimeEventNotificationRequestGenerator;
-// }
+ public static RealtimeEventNotificationRequestGenerator getRealtimeEventNotificationRequestGenerator() {
+
+ RealtimeEventNotificationRequestGenerator realtimeEventNotificationRequestGenerator =
+ (RealtimeEventNotificationRequestGenerator) FinancialServicesUtils
+ .getClassInstanceFromFQN(FinancialServicesConfigParser.getInstance().
+ getRealtimeEventNotificationRequestGenerator());
+ return realtimeEventNotificationRequestGenerator;
+ }
/**
* Method to get event JSON from eventInformation payload string.
@@ -141,4 +143,13 @@ public static String getErrorDTO(String error, String errorDescription) {
eventNotificationError.put(EventNotificationConstants.ERROR_DESCRIPTION_FIELD, errorDescription);
return eventNotificationError.toString();
}
+
+ public static EventSubscriptionService getEventSubscriptionService() {
+ return new EventSubscriptionService();
+ }
+
+ public static RealtimeNotificationService getRealtimeNotificationService() {
+ return new RealtimeNotificationService();
+ }
+
}