diff --git a/components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java b/components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java index 10e956403091..58797c082f9a 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java +++ b/components/apimgt/org.wso2.carbon.apimgt.cache.invalidation/src/main/java/org/wso2/carbon/apimgt/cache/invalidation/APIMgtServerStartupListener.java @@ -21,6 +21,7 @@ import org.wso2.carbon.apimgt.cache.invalidation.internal.DataHolder; import org.wso2.carbon.apimgt.common.jms.JMSTransportHandler; import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto; +import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties; import org.wso2.carbon.apimgt.impl.jms.listener.JMSListenerShutDownService; import org.wso2.carbon.core.ServerShutdownHandler; import org.wso2.carbon.core.ServerStartupObserver; @@ -38,9 +39,12 @@ public APIMgtServerStartupListener() { EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration = DataHolder.getInstance().getAPIManagerConfigurationService().getAPIManagerConfiguration() .getEventHubConfigurationDto().getEventHubReceiverConfiguration(); + ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties = + DataHolder.getInstance().getAPIManagerConfigurationService().getAPIManagerConfiguration() + .getThrottleProperties().getJmsConnectionProperties().getJmsTaskManagerProperties(); if (eventHubReceiverConfiguration != null) { - this.jmsTransportHandlerForEventHub = - new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters()); + this.jmsTransportHandlerForEventHub = new JMSTransportHandler( + eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties); } } diff --git a/components/apimgt/org.wso2.carbon.apimgt.common.jms/pom.xml b/components/apimgt/org.wso2.carbon.apimgt.common.jms/pom.xml index fef2c5459438..593cf526566e 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.common.jms/pom.xml +++ b/components/apimgt/org.wso2.carbon.apimgt.common.jms/pom.xml @@ -34,6 +34,10 @@ runtime test + + org.wso2.carbon.apimgt + org.wso2.carbon.apimgt.impl + diff --git a/components/apimgt/org.wso2.carbon.apimgt.common.jms/src/main/java/org/wso2/carbon/apimgt/common/jms/JMSTransportHandler.java b/components/apimgt/org.wso2.carbon.apimgt.common.jms/src/main/java/org/wso2/carbon/apimgt/common/jms/JMSTransportHandler.java index 32a933484c58..f643f8423f5d 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.common.jms/src/main/java/org/wso2/carbon/apimgt/common/jms/JMSTransportHandler.java +++ b/components/apimgt/org.wso2.carbon.apimgt.common.jms/src/main/java/org/wso2/carbon/apimgt/common/jms/JMSTransportHandler.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.wso2.carbon.apimgt.common.jms.factory.JMSConnectionFactory; import org.wso2.carbon.apimgt.common.jms.factory.JMSTaskManagerFactory; +import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties; import java.io.IOException; import java.io.InputStream; @@ -50,7 +51,17 @@ public class JMSTransportHandler { private boolean stopIssued = false; private static final Object lock = new Object(); - public JMSTransportHandler(Properties jmsConnectionProperties) { + /** + * Constructs a JMSTransportHandler with specified JMS connection properties and JMS task manager properties. + * + * @param jmsConnectionProperties The JMS connection properties to utilize. + * If empty, default properties will be loaded. + * @param jmsTaskManagerProperties The properties for JMS task management. + * If null, default values will be applied. + */ + public JMSTransportHandler(Properties jmsConnectionProperties, + ThrottleProperties.JMSConnectionProperties + .JMSTaskManagerProperties jmsTaskManagerProperties) { Properties properties; Hashtable parameters = new Hashtable<>(); @@ -71,6 +82,47 @@ public JMSTransportHandler(Properties jmsConnectionProperties) { parameters.put(name, properties.getProperty(name)); } jmsConnectionFactory = new JMSConnectionFactory(parameters, ListenerConstants.CONNECTION_FACTORY_NAME); + if (jmsTaskManagerProperties != null) { + extractTaskManagerProperties(jmsTaskManagerProperties); + } + } + + /** + * Extracts task manager properties from the provided JMSTaskManagerProperties object. + * + * @param jmsTaskManagerProperties The JMSTaskManagerProperties object containing task manager configuration. + */ + private void extractTaskManagerProperties( + ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties) { + + int retrievedMinThreadPoolSize = jmsTaskManagerProperties.getMinThreadPoolSize(); + if (retrievedMinThreadPoolSize > 0) { + minThreadPoolSize = retrievedMinThreadPoolSize; + } else { + log.warn("Invalid min_thread_pool_size detected. Default value " + minThreadPoolSize + " will be used."); + } + + int retrievedMaxThreadPoolSize = jmsTaskManagerProperties.getMaxThreadPoolSize(); + if (retrievedMaxThreadPoolSize > 0) { + maxThreadPoolSize = retrievedMaxThreadPoolSize; + } else { + log.warn("Invalid max_thread_pool_size detected. Default value " + maxThreadPoolSize + " will be used."); + } + + int retrievedKeepAliveTimeInMillis = jmsTaskManagerProperties.getKeepAliveTimeInMillis(); + if (retrievedKeepAliveTimeInMillis > 0) { + keepAliveTimeInMillis = retrievedKeepAliveTimeInMillis; + } else { + log.warn("Invalid keep_alive_time_in_millis detected. Default value " + keepAliveTimeInMillis + + " will be used."); + } + + int retrievedJobQueueSize = jmsTaskManagerProperties.getJobQueueSize(); + if (retrievedJobQueueSize > 0) { + jobQueueSize = retrievedJobQueueSize; + } else { + log.warn("Invalid job_queue_size detected. Default value " + jobQueueSize + " will be used."); + } } /** diff --git a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java index 6f6d2bc0b3ed..3eb3d3d8e0b6 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java +++ b/components/apimgt/org.wso2.carbon.apimgt.gateway/src/main/java/org/wso2/carbon/apimgt/gateway/listeners/GatewayStartupListener.java @@ -99,13 +99,16 @@ public GatewayStartupListener() { ThrottleProperties.JMSConnectionProperties jmsConnectionProperties = throttleProperties.getJmsConnectionProperties(); this.jmsTransportHandlerForTrafficManager = - new JMSTransportHandler(jmsConnectionProperties.getJmsConnectionProperties()); + new JMSTransportHandler(jmsConnectionProperties.getJmsConnectionProperties(), null); EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration = ServiceReferenceHolder.getInstance().getAPIManagerConfiguration().getEventHubConfigurationDto() .getEventHubReceiverConfiguration(); + ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties = + ServiceReferenceHolder.getInstance().getAPIManagerConfiguration().getThrottleProperties() + .getJmsConnectionProperties().getJmsTaskManagerProperties(); if (eventHubReceiverConfiguration != null) { - this.jmsTransportHandlerForEventHub = - new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters()); + this.jmsTransportHandlerForEventHub = new JMSTransportHandler( + eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties); } } diff --git a/components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java b/components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java index c9ecbba5bf44..4c43df271c36 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java +++ b/components/apimgt/org.wso2.carbon.apimgt.impl/src/main/java/org/wso2/carbon/apimgt/impl/APIManagerConfiguration.java @@ -1382,7 +1382,7 @@ private void setThrottleProperties(OMElement element) { OMElement jobQueueSizeElement = jmsTaskManagerElement .getFirstChildWithName(new QName (APIConstants.AdvancedThrottleConstants.JOB_QUEUE_SIZE)); - if (keepAliveTimeInMillisElement != null) { + if (jobQueueSizeElement != null) { jmsTaskManagerProperties.setJobQueueSize(Integer.parseInt(jobQueueSizeElement.getText())); } } diff --git a/components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java b/components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java index ac366a56744f..54aba5d0a760 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java +++ b/components/apimgt/org.wso2.carbon.apimgt.jms.listener/src/main/java/org/wso2/carbon/apimgt/jms/listener/utils/JMSListenerStartupShutdownListener.java @@ -25,6 +25,7 @@ import org.wso2.carbon.apimgt.impl.APIConstants; import org.wso2.carbon.apimgt.impl.APIManagerConfiguration; import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto; +import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties; import org.wso2.carbon.apimgt.impl.jms.listener.JMSListenerShutDownService; import org.wso2.carbon.apimgt.jms.listener.internal.ServiceReferenceHolder; import org.wso2.carbon.core.ServerShutdownHandler; @@ -44,9 +45,13 @@ public JMSListenerStartupShutdownListener() { EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration = ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto() .getEventHubReceiverConfiguration(); + ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties = + ServiceReferenceHolder.getInstance().getAPIMConfiguration().getThrottleProperties() + .getJmsConnectionProperties().getJmsTaskManagerProperties(); + if (eventHubReceiverConfiguration != null) { - this.jmsTransportHandlerForEventHub = - new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters()); + this.jmsTransportHandlerForEventHub = new JMSTransportHandler( + eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties); } } diff --git a/components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java b/components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java index 05d6c8c68904..2abb880614e2 100644 --- a/components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java +++ b/components/apimgt/org.wso2.carbon.apimgt.throttle.policy.deployer/src/main/java/org/wso2/carbon/apimgt/throttle/policy/deployer/utils/ThrottlePolicyStartupListener.java @@ -22,6 +22,7 @@ import org.wso2.carbon.apimgt.common.jms.JMSTransportHandler; import org.wso2.carbon.apimgt.impl.APIConstants; import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto; +import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties; import org.wso2.carbon.apimgt.impl.jms.listener.JMSListenerShutDownService; import org.wso2.carbon.apimgt.throttle.policy.deployer.internal.ServiceReferenceHolder; import org.wso2.carbon.core.ServerShutdownHandler; @@ -41,9 +42,12 @@ public ThrottlePolicyStartupListener() { EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration = ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto() .getEventHubReceiverConfiguration(); + ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties = + ServiceReferenceHolder.getInstance().getAPIMConfiguration().getThrottleProperties() + .getJmsConnectionProperties().getJmsTaskManagerProperties(); if (eventHubReceiverConfiguration != null) { - this.jmsTransportHandlerForEventHub = - new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters()); + this.jmsTransportHandlerForEventHub = new JMSTransportHandler( + eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties); } } diff --git a/features/apimgt/org.wso2.carbon.apimgt.core.feature/src/main/resources/conf_templates/templates/repository/conf/api-manager.xml.j2 b/features/apimgt/org.wso2.carbon.apimgt.core.feature/src/main/resources/conf_templates/templates/repository/conf/api-manager.xml.j2 index 5c1b9df0fa59..edc725d2c1cd 100644 --- a/features/apimgt/org.wso2.carbon.apimgt.core.feature/src/main/resources/conf_templates/templates/repository/conf/api-manager.xml.j2 +++ b/features/apimgt/org.wso2.carbon.apimgt.core.feature/src/main/resources/conf_templates/templates/repository/conf/api-manager.xml.j2 @@ -931,6 +931,37 @@ {% if apim.throttling.jms.start_delay is defined %} {{apim.throttling.jms.start_delay}} {% endif %} + {% if (apim.event_hub.listener.min_thread_pool_size is defined) + or (apim.event_hub.listener.max_thread_pool_size is defined) + or (apim.event_hub.listener.keep_alive_time_in_millis is defined) + or (apim.event_hub.listener.job_queue_size is defined) + or (apim.throttling.jms.min_thread_pool_size is defined) + or (apim.throttling.jms.max_thread_pool_size is defined) + or (apim.throttling.jms.keep_alive_time_in_millis is defined) + or (apim.throttling.jms.job_queue_size is defined) %} + + {% if apim.event_hub.listener.min_thread_pool_size is defined %} + {{apim.event_hub.listener.min_thread_pool_size}} + {% elif apim.throttling.jms.min_thread_pool_size is defined %} + {{apim.throttling.jms.min_thread_pool_size}} + {% endif %} + {% if apim.event_hub.listener.max_thread_pool_size is defined %} + {{apim.event_hub.listener.max_thread_pool_size}} + {% elif apim.throttling.jms.max_thread_pool_size is defined %} + {{apim.throttling.jms.max_thread_pool_size}} + {% endif %} + {% if apim.event_hub.listener.keep_alive_time_in_millis is defined %} + {{apim.event_hub.listener.keep_alive_time_in_millis}} + {% elif apim.throttling.jms.keep_alive_time_in_millis is defined %} + {{apim.throttling.jms.keep_alive_time_in_millis}} + {% endif %} + {% if apim.event_hub.listener.job_queue_size is defined %} + {{apim.event_hub.listener.job_queue_size}} + {% elif apim.throttling.jms.job_queue_size is defined %} + {{apim.throttling.jms.job_queue_size}} + {% endif %} + + {% endif %} {{apim.throttling.jms.conn_jndi_name}} {{apim.throttling.jms.destination_type}}