Skip to content

Commit

Permalink
CTR needs to support boot property styles for the tablePrefix property (
Browse files Browse the repository at this point in the history
#5857)

* CTR needs to support boot property styles for the tablePrefix property

* Add RelaxedNames support for identifying prefixes for Task Explorer schemas

Also resolved a bug where if a user specified the prefix on CTR for the apps instead of the apps themselves those props would be ignored
  • Loading branch information
cppwfs authored Aug 1, 2024
1 parent aa863b5 commit 8f6a793
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.springframework.cloud.dataflow.composedtaskrunner;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.slf4j.Logger;
Expand All @@ -31,8 +34,10 @@
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
import org.springframework.cloud.dataflow.core.RelaxedNames;
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaTaskExecutionDaoFactoryBean;
import org.springframework.cloud.dataflow.core.dsl.TaskParser;
import org.springframework.cloud.dataflow.rest.util.DeploymentPropertiesUtils;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.cloud.task.listener.TaskExecutionListener;
import org.springframework.cloud.task.repository.TaskExplorer;
Expand Down Expand Up @@ -102,17 +107,28 @@ private static void addTaskExplorer(
String taskName
) {
logger.debug("addTaskExplorer:{}", taskName);
String propertyName = String.format("app.%s.spring.cloud.task.tablePrefix", taskName);
String prefix = properties.getComposedTaskAppProperties().get(propertyName);
if (prefix == null) {
prefix = env.getProperty(propertyName);
}
List<String> propertyNames = new ArrayList<>();
RelaxedNames relaxedNames = RelaxedNames.forCamelCase("tablePrefix");
relaxedNames.forEach(tablePrefix -> propertyNames.add(
String.format("app.%s.spring.cloud.task.%s", taskName, tablePrefix)));
Map<String, String> taskDeploymentProperties =
DeploymentPropertiesUtils.parse(properties.getComposedTaskProperties());
String prefix = propertyNames.stream()
.map(propertyName -> {
String prefixOfComposedTaskProperties = taskDeploymentProperties.get(propertyName);
if(prefixOfComposedTaskProperties == null) {
prefixOfComposedTaskProperties = properties.getComposedTaskAppProperties().get(propertyName);
}
return prefixOfComposedTaskProperties == null ? env.getProperty(propertyName) : prefixOfComposedTaskProperties;
})
.filter(Objects::nonNull)
.findFirst().orElse(null);
if (prefix != null) {
TaskExecutionDaoFactoryBean factoryBean = new MultiSchemaTaskExecutionDaoFactoryBean(dataSource, prefix);
logger.debug("taskExplorerContainer:adding:{}:{}", taskName, prefix);
logger.info("taskExplorerContainer:adding:{}:{}", taskName, prefix);
explorers.put(taskName, new SimpleTaskExplorer(factoryBean));
} else {
logger.warn("Cannot find {} in {} ", propertyName, properties.getComposedTaskAppProperties());
logger.warn("Cannot find {} in {} ", propertyNames, properties.getComposedTaskAppProperties());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
Expand All @@ -29,6 +30,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.common.security.CommonSecurityAutoConfiguration;
import org.springframework.cloud.dataflow.composedtaskrunner.configuration.DataFlowTestConfiguration;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
Expand All @@ -53,6 +56,7 @@
StepBeanDefinitionRegistrar.class})
@TestPropertySource(properties = {"graph=AAA && BBB && CCC", "max-wait-time=1000", "spring.cloud.task.name=foo"})
@EnableAutoConfiguration(exclude = {CommonSecurityAutoConfiguration.class})
@ExtendWith(OutputCaptureExtension.class)
public class ComposedTaskRunnerConfigurationNoPropertiesTests {

@Autowired
Expand All @@ -69,7 +73,7 @@ public class ComposedTaskRunnerConfigurationNoPropertiesTests {

@Test
@DirtiesContext
public void testComposedConfiguration() throws Exception {
public void testComposedConfiguration(CapturedOutput outputCapture) throws Exception {
JobExecution jobExecution = this.jobRepository.createJobExecution(
"ComposedTest", new JobParameters());
TaskletStep ctrStep = context.getBean("AAA_0", TaskletStep.class);
Expand All @@ -85,5 +89,12 @@ public void testComposedConfiguration() throws Exception {
Collections.emptyMap(),
Arrays.asList("--spring.cloud.task.parent-execution-id=1", "--spring.cloud.task.parent-schema-target=boot2")
);

String logEntries = outputCapture.toString();
assertThat(logEntries).contains("Cannot find [app.AAA.spring.cloud.task.table-prefix, " +
"app.AAA.spring.cloud.task.table_prefix, app.AAA.spring.cloud.task.tablePrefix, " +
"app.AAA.spring.cloud.task.tableprefix, app.AAA.spring.cloud.task.TABLE-PREFIX, " +
"app.AAA.spring.cloud.task.TABLE_PREFIX, app.AAA.spring.cloud.task.TABLEPREFIX]");
assertThat(logEntries).doesNotContain("taskExplorerContainer:adding:");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
Expand All @@ -31,6 +32,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.common.security.CommonSecurityAutoConfiguration;
import org.springframework.cloud.dataflow.composedtaskrunner.configuration.DataFlowTestConfiguration;
import org.springframework.cloud.dataflow.composedtaskrunner.properties.ComposedTaskProperties;
Expand Down Expand Up @@ -60,6 +63,7 @@
"transaction-isolation-level=ISOLATION_READ_COMMITTED","spring.cloud.task.closecontext-enabled=true",
"dataflow-server-uri=https://bar", "spring.cloud.task.name=ComposedTest","max-start-wait-time=1011"})
@EnableAutoConfiguration(exclude = { CommonSecurityAutoConfiguration.class})
@ExtendWith(OutputCaptureExtension.class)
public class ComposedTaskRunnerConfigurationWithPropertiesTests {

@Autowired
Expand All @@ -78,12 +82,15 @@ public class ComposedTaskRunnerConfigurationWithPropertiesTests {
ApplicationContext context;

protected static final String COMPOSED_TASK_PROPS = "app.ComposedTest-AAA.format=yyyy, "
+ "app.ComposedTest-BBB.format=mm, "
+ "deployer.ComposedTest-AAA.memory=2048m";
+ "app.ComposedTest-AAA.spring.cloud.task.table-prefix=BOOT3_,"
+ "app.ComposedTest-BBB.spring.cloud.task.tableprefix=BOOT3_,"
+ "app.ComposedTest-CCC.spring.cloud.task.tablePrefix=BOOT3_,"
+ "app.ComposedTest-BBB.format=mm, "
+ "deployer.ComposedTest-AAA.memory=2048m";

@Test
@DirtiesContext
public void testComposedConfiguration() throws Exception {
public void testComposedConfiguration(CapturedOutput outputCapture) throws Exception {
assertThat(composedTaskProperties.isSkipTlsCertificateVerification()).isFalse();

JobExecution jobExecution = this.jobRepository.createJobExecution(
Expand All @@ -97,6 +104,8 @@ public void testComposedConfiguration() throws Exception {
Map<String, String> props = new HashMap<>(1);
props.put("format", "yyyy");
props.put("memory", "2048m");
props.put("spring.cloud.task.table-prefix", "BOOT3_");

assertThat(composedTaskProperties.getComposedTaskProperties()).isEqualTo(COMPOSED_TASK_PROPS);
assertThat(composedTaskProperties.getMaxWaitTime()).isEqualTo(1010);
assertThat(composedTaskProperties.getMaxStartWaitTime()).isEqualTo(1011);
Expand All @@ -110,6 +119,14 @@ public void testComposedConfiguration() throws Exception {
args.add("--spring.cloud.task.parent-execution-id=1");
args.add("--spring.cloud.task.parent-schema-target=boot2");
Assert.notNull(job.getJobParametersIncrementer(), "JobParametersIncrementer must not be null.");

verify(taskOperations).launch("ComposedTest-AAA", props, args);

String logEntries = outputCapture.toString();
assertThat(logEntries).contains("taskExplorerContainer:adding:ComposedTest-AAA:BOOT3_");

assertThat(logEntries).contains("taskExplorerContainer:adding:ComposedTest-BBB:BOOT3_");
assertThat(logEntries).contains("taskExplorerContainer:adding:ComposedTest-CCC:BOOT3_");

}
}

0 comments on commit 8f6a793

Please sign in to comment.