Skip to content

Commit

Permalink
Merge pull request #12167 from PasanT9/par-threapool
Browse files Browse the repository at this point in the history
Parameterize EventHub threadpool configurations
  • Loading branch information
PasanT9 authored Oct 19, 2023
2 parents 020c150 + 1a52e2f commit 7db9013
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.wso2.carbon.apimgt.cache.invalidation.internal.DataHolder;
import org.wso2.carbon.apimgt.common.jms.JMSTransportHandler;
import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto;
import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties;
import org.wso2.carbon.apimgt.impl.jms.listener.JMSListenerShutDownService;
import org.wso2.carbon.core.ServerShutdownHandler;
import org.wso2.carbon.core.ServerStartupObserver;
Expand All @@ -38,9 +39,12 @@ public APIMgtServerStartupListener() {
EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
DataHolder.getInstance().getAPIManagerConfigurationService().getAPIManagerConfiguration()
.getEventHubConfigurationDto().getEventHubReceiverConfiguration();
ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties =
DataHolder.getInstance().getAPIManagerConfigurationService().getAPIManagerConfiguration()
.getThrottleProperties().getJmsConnectionProperties().getJmsTaskManagerProperties();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(
eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties);
}
}

Expand Down
4 changes: 4 additions & 0 deletions components/apimgt/org.wso2.carbon.apimgt.common.jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.apimgt</groupId>
<artifactId>org.wso2.carbon.apimgt.impl</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.common.jms.factory.JMSConnectionFactory;
import org.wso2.carbon.apimgt.common.jms.factory.JMSTaskManagerFactory;
import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -50,7 +51,17 @@ public class JMSTransportHandler {
private boolean stopIssued = false;
private static final Object lock = new Object();

public JMSTransportHandler(Properties jmsConnectionProperties) {
/**
* Constructs a JMSTransportHandler with specified JMS connection properties and JMS task manager properties.
*
* @param jmsConnectionProperties The JMS connection properties to utilize.
* If empty, default properties will be loaded.
* @param jmsTaskManagerProperties The properties for JMS task management.
* If null, default values will be applied.
*/
public JMSTransportHandler(Properties jmsConnectionProperties,
ThrottleProperties.JMSConnectionProperties
.JMSTaskManagerProperties jmsTaskManagerProperties) {

Properties properties;
Hashtable<String, String> parameters = new Hashtable<>();
Expand All @@ -71,6 +82,47 @@ public JMSTransportHandler(Properties jmsConnectionProperties) {
parameters.put(name, properties.getProperty(name));
}
jmsConnectionFactory = new JMSConnectionFactory(parameters, ListenerConstants.CONNECTION_FACTORY_NAME);
if (jmsTaskManagerProperties != null) {
extractTaskManagerProperties(jmsTaskManagerProperties);
}
}

/**
* Extracts task manager properties from the provided JMSTaskManagerProperties object.
*
* @param jmsTaskManagerProperties The JMSTaskManagerProperties object containing task manager configuration.
*/
private void extractTaskManagerProperties(
ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties) {

int retrievedMinThreadPoolSize = jmsTaskManagerProperties.getMinThreadPoolSize();
if (retrievedMinThreadPoolSize > 0) {
minThreadPoolSize = retrievedMinThreadPoolSize;
} else {
log.warn("Invalid min_thread_pool_size detected. Default value " + minThreadPoolSize + " will be used.");
}

int retrievedMaxThreadPoolSize = jmsTaskManagerProperties.getMaxThreadPoolSize();
if (retrievedMaxThreadPoolSize > 0) {
maxThreadPoolSize = retrievedMaxThreadPoolSize;
} else {
log.warn("Invalid max_thread_pool_size detected. Default value " + maxThreadPoolSize + " will be used.");
}

int retrievedKeepAliveTimeInMillis = jmsTaskManagerProperties.getKeepAliveTimeInMillis();
if (retrievedKeepAliveTimeInMillis > 0) {
keepAliveTimeInMillis = retrievedKeepAliveTimeInMillis;
} else {
log.warn("Invalid keep_alive_time_in_millis detected. Default value " + keepAliveTimeInMillis
+ " will be used.");
}

int retrievedJobQueueSize = jmsTaskManagerProperties.getJobQueueSize();
if (retrievedJobQueueSize > 0) {
jobQueueSize = retrievedJobQueueSize;
} else {
log.warn("Invalid job_queue_size detected. Default value " + jobQueueSize + " will be used.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ public GatewayStartupListener() {
ThrottleProperties.JMSConnectionProperties jmsConnectionProperties =
throttleProperties.getJmsConnectionProperties();
this.jmsTransportHandlerForTrafficManager =
new JMSTransportHandler(jmsConnectionProperties.getJmsConnectionProperties());
new JMSTransportHandler(jmsConnectionProperties.getJmsConnectionProperties(), null);
EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
ServiceReferenceHolder.getInstance().getAPIManagerConfiguration().getEventHubConfigurationDto()
.getEventHubReceiverConfiguration();
ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties =
ServiceReferenceHolder.getInstance().getAPIManagerConfiguration().getThrottleProperties()
.getJmsConnectionProperties().getJmsTaskManagerProperties();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(
eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ private void setThrottleProperties(OMElement element) {
OMElement jobQueueSizeElement = jmsTaskManagerElement
.getFirstChildWithName(new QName
(APIConstants.AdvancedThrottleConstants.JOB_QUEUE_SIZE));
if (keepAliveTimeInMillisElement != null) {
if (jobQueueSizeElement != null) {
jmsTaskManagerProperties.setJobQueueSize(Integer.parseInt(jobQueueSizeElement.getText()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.wso2.carbon.apimgt.impl.APIConstants;
import org.wso2.carbon.apimgt.impl.APIManagerConfiguration;
import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto;
import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties;
import org.wso2.carbon.apimgt.impl.jms.listener.JMSListenerShutDownService;
import org.wso2.carbon.apimgt.jms.listener.internal.ServiceReferenceHolder;
import org.wso2.carbon.core.ServerShutdownHandler;
Expand All @@ -44,9 +45,13 @@ public JMSListenerStartupShutdownListener() {
EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto()
.getEventHubReceiverConfiguration();
ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getThrottleProperties()
.getJmsConnectionProperties().getJmsTaskManagerProperties();

if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(
eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.wso2.carbon.apimgt.common.jms.JMSTransportHandler;
import org.wso2.carbon.apimgt.impl.APIConstants;
import org.wso2.carbon.apimgt.impl.dto.EventHubConfigurationDto;
import org.wso2.carbon.apimgt.impl.dto.ThrottleProperties;
import org.wso2.carbon.apimgt.impl.jms.listener.JMSListenerShutDownService;
import org.wso2.carbon.apimgt.throttle.policy.deployer.internal.ServiceReferenceHolder;
import org.wso2.carbon.core.ServerShutdownHandler;
Expand All @@ -41,9 +42,12 @@ public ThrottlePolicyStartupListener() {
EventHubConfigurationDto.EventHubReceiverConfiguration eventHubReceiverConfiguration =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getEventHubConfigurationDto()
.getEventHubReceiverConfiguration();
ThrottleProperties.JMSConnectionProperties.JMSTaskManagerProperties jmsTaskManagerProperties =
ServiceReferenceHolder.getInstance().getAPIMConfiguration().getThrottleProperties()
.getJmsConnectionProperties().getJmsTaskManagerProperties();
if (eventHubReceiverConfiguration != null) {
this.jmsTransportHandlerForEventHub =
new JMSTransportHandler(eventHubReceiverConfiguration.getJmsConnectionParameters());
this.jmsTransportHandlerForEventHub = new JMSTransportHandler(
eventHubReceiverConfiguration.getJmsConnectionParameters(), jmsTaskManagerProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,37 @@
{% if apim.throttling.jms.start_delay is defined %}
<InitDelay>{{apim.throttling.jms.start_delay}}</InitDelay>
{% endif %}
{% if (apim.event_hub.listener.min_thread_pool_size is defined)
or (apim.event_hub.listener.max_thread_pool_size is defined)
or (apim.event_hub.listener.keep_alive_time_in_millis is defined)
or (apim.event_hub.listener.job_queue_size is defined)
or (apim.throttling.jms.min_thread_pool_size is defined)
or (apim.throttling.jms.max_thread_pool_size is defined)
or (apim.throttling.jms.keep_alive_time_in_millis is defined)
or (apim.throttling.jms.job_queue_size is defined) %}
<JMSTaskManager>
{% if apim.event_hub.listener.min_thread_pool_size is defined %}
<MinThreadPoolSize>{{apim.event_hub.listener.min_thread_pool_size}}</MinThreadPoolSize>
{% elif apim.throttling.jms.min_thread_pool_size is defined %}
<MinThreadPoolSize>{{apim.throttling.jms.min_thread_pool_size}}</MinThreadPoolSize>
{% endif %}
{% if apim.event_hub.listener.max_thread_pool_size is defined %}
<MaxThreadPoolSize>{{apim.event_hub.listener.max_thread_pool_size}}</MaxThreadPoolSize>
{% elif apim.throttling.jms.max_thread_pool_size is defined %}
<MaxThreadPoolSize>{{apim.throttling.jms.max_thread_pool_size}}</MaxThreadPoolSize>
{% endif %}
{% if apim.event_hub.listener.keep_alive_time_in_millis is defined %}
<KeepAliveTimeInMillis>{{apim.event_hub.listener.keep_alive_time_in_millis}}</KeepAliveTimeInMillis>
{% elif apim.throttling.jms.keep_alive_time_in_millis is defined %}
<KeepAliveTimeInMillis>{{apim.throttling.jms.keep_alive_time_in_millis}}</KeepAliveTimeInMillis>
{% endif %}
{% if apim.event_hub.listener.job_queue_size is defined %}
<JobQueueSize>{{apim.event_hub.listener.job_queue_size}}</JobQueueSize>
{% elif apim.throttling.jms.job_queue_size is defined %}
<JobQueueSize>{{apim.throttling.jms.job_queue_size}}</JobQueueSize>
{% endif %}
<JMSTaskManager>
{% endif %}
<JMSConnectionParameters>
<transport.jms.ConnectionFactoryJNDIName>{{apim.throttling.jms.conn_jndi_name}}</transport.jms.ConnectionFactoryJNDIName>
<transport.jms.DestinationType>{{apim.throttling.jms.destination_type}}</transport.jms.DestinationType>
Expand Down

0 comments on commit 7db9013

Please sign in to comment.