diff --git a/CHANGELOG.md b/CHANGELOG.md index d50c029..70264bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,21 @@ =========================== +## [1.0.0](https://github.com/buession/buession-canal/releases/tag/v1.0.0) (2024-05-06) + +### 🔨依赖升级 + +- [依赖库版本升级和安全漏洞修复](https://github.com/buession/buession-parent/releases/tag/v2.3.3) + + +### ⭐ 新特性 + +- **buession-canal-springboot:** 增加 canal 的 spring boot 支持 + + +--- + + ## [0.0.2](https://github.com/buession/buession-canal/releases/tag/v0.0.2) (2023-12-27) ### 🔨依赖升级 diff --git a/buession-canal-annotation/pom.xml b/buession-canal-annotation/pom.xml index 6fe10e6..299a213 100644 --- a/buession-canal-annotation/pom.xml +++ b/buession-canal-annotation/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.2 + 1.0.0 buession-canal-annotation https://canal.buession.com/ diff --git a/buession-canal-client/pom.xml b/buession-canal-client/pom.xml index beb2d90..c0815fa 100644 --- a/buession-canal-client/pom.xml +++ b/buession-canal-client/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.2 + 1.0.0 buession-canal-client https://canal.buession.com/ diff --git a/buession-canal-core/pom.xml b/buession-canal-core/pom.xml index 95f8751..d908509 100644 --- a/buession-canal-core/pom.xml +++ b/buession-canal-core/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.2 + 1.0.0 buession-canal-core https://canal.buession.com/ diff --git a/buession-canal-parent/pom.xml b/buession-canal-parent/pom.xml index 20e3ca1..23b5580 100644 --- a/buession-canal-parent/pom.xml +++ b/buession-canal-parent/pom.xml @@ -7,13 +7,13 @@ com.buession parent - 2.3.2 + 2.3.3 com.buession.canal buession-canal-parent https://canal.buession.com/ Buession Canal Framework Parent - 0.0.2 + 1.0.0 pom @@ -58,10 +58,12 @@ ../buession-canal-core ../buession-canal-client ../buession-canal-spring + ../buession-canal-springboot - 2.3.2 + 2.3.3 + 2.3.3 1.1.7 @@ -112,15 +114,6 @@ buession-canal-parent - - - src/main/resources - false - - **/*.* - - - org.apache.maven.plugins diff --git a/buession-canal-spring/pom.xml b/buession-canal-spring/pom.xml index 38aa30c..56173ca 100644 --- a/buession-canal-spring/pom.xml +++ b/buession-canal-spring/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.2 + 1.0.0 buession-canal-spring https://canal.buession.com/ diff --git a/buession-canal-springboot/pom.xml b/buession-canal-springboot/pom.xml new file mode 100644 index 0000000..b74d015 --- /dev/null +++ b/buession-canal-springboot/pom.xml @@ -0,0 +1,123 @@ + + + Buession Canal SpringBoot + 4.0.0 + + com.buession.canal + buession-canal-parent + ../buession-canal-parent + 1.0.0 + + buession-canal-springboot + https://canal.buession.com/ + Buession Canal SpringBoot + jar + + + buession.com Inc. + http://www.buession.com/ + + + + + yong.teng + yong.teng + webmaster@buession.com + + Project Manager + Developer + + + + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + scm:git:https://github.com/buession/buession-canal.git + scm:git:https://github.com/buession/buession-canal.git + https://github.com/buession/buession-canal + + + + github + https://github.com/buession/buession-canal/issues + + + + + com.buession.canal + buession-canal-client + ${project.version} + + + com.buession.canal + buession-canal-spring + ${project.version} + + + + com.buession.springboot + buession-springboot-boot + ${buession.springboot.version} + + + + org.slf4j + slf4j-api + + + + + buession-canal-springboot + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-source-plugin + + + org.apache.maven.plugins + maven-javadoc-plugin + + + org.apache.maven.plugins + maven-resources-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.maven.plugins + maven-release-plugin + + + org.apache.maven.plugins + maven-gpg-plugin + + + org.sonatype.plugins + nexus-staging-maven-plugin + + + + \ No newline at end of file diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/BaseInstanceConfiguration.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/BaseInstanceConfiguration.java new file mode 100644 index 0000000..92cf4d1 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/BaseInstanceConfiguration.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot; + +import com.buession.canal.core.Configuration; + +import java.time.Duration; + +/** + * 实例基本配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public abstract class BaseInstanceConfiguration extends Configuration { + + /** + * 构造函数 + */ + public BaseInstanceConfiguration() { + setTimeout(Duration.ofSeconds(10)); + setBatchSize(10); + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/MqBaseInstanceConfiguration.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/MqBaseInstanceConfiguration.java new file mode 100644 index 0000000..a34e100 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/MqBaseInstanceConfiguration.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot; + +/** + * MQ 实例基本配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public abstract class MqBaseInstanceConfiguration extends BaseInstanceConfiguration { + + private boolean flatMessage; + + /** + * 构造函数 + */ + public MqBaseInstanceConfiguration() { + super(); + } + + public boolean isFlatMessage() { + return flatMessage; + } + + public void setFlatMessage(boolean flatMessage) { + this.flatMessage = flatMessage; + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/ThreadConfig.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/ThreadConfig.java new file mode 100644 index 0000000..f7599d9 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/ThreadConfig.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot; + +import com.buession.core.concurrent.ThreadPolicy; + +import java.time.Duration; + +/** + * 线程池配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public class ThreadConfig { + + /** + * 线程名称前缀 + */ + private String namePrefix = "canal-execute"; + + /** + * 线程池核心线程大小 + */ + private Integer corePoolSize; + + /** + * 线程池最大线程数量 + */ + private Integer maximumPoolSize; + + /** + * 空闲线程存活时间 + */ + private Duration keepAliveTime = Duration.ZERO; + + /** + * 饱和策略 + */ + private ThreadPolicy policy = ThreadPolicy.DISCARD; + + /** + * 返回线程名称前缀 + * + * @return 线程名称前缀 + */ + public String getNamePrefix() { + return namePrefix; + } + + /** + * 设置线程名称前缀 + * + * @param namePrefix + * 线程名称前缀 + */ + public void setNamePrefix(String namePrefix) { + this.namePrefix = namePrefix; + } + + /** + * 返回线程池核心线程大小 + * + * @return 线程池核心线程大小 + */ + public Integer getCorePoolSize() { + return corePoolSize; + } + + /** + * 设置线程池核心线程大小 + * + * @param corePoolSize + * 线程池核心线程大小 + */ + public void setCorePoolSize(Integer corePoolSize) { + this.corePoolSize = corePoolSize; + } + + /** + * 返回线程池最大线程数量 + * + * @return 线程池最大线程数量 + */ + public Integer getMaximumPoolSize() { + return maximumPoolSize; + } + + /** + * 设置线程池最大线程数量 + * + * @param maximumPoolSize + * 线程池最大线程数量 + */ + public void setMaximumPoolSize(Integer maximumPoolSize) { + this.maximumPoolSize = maximumPoolSize; + } + + /** + * 返回空闲线程存活时间 + * + * @return 空闲线程存活时间 + */ + public Duration getKeepAliveTime() { + return keepAliveTime; + } + + /** + * 设置空闲线程存活时间 + * + * @param keepAliveTime + * 空闲线程存活时间 + */ + public void setKeepAliveTime(Duration keepAliveTime) { + this.keepAliveTime = keepAliveTime; + } + + /** + * 返回饱和策略 + * + * @return 饱和策略 + */ + public ThreadPolicy getPolicy() { + return policy; + } + + /** + * 设置饱和策略 + * + * @param policy + * 饱和策略 + */ + public void setPolicy(ThreadPolicy policy) { + this.policy = policy; + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AbstractAdapterProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AbstractAdapterProperties.java new file mode 100644 index 0000000..8e8a324 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AbstractAdapterProperties.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.BaseInstanceConfiguration; + +import java.util.HashMap; +import java.util.Map; + +/** + * Canal 适配器配置基类 + * + * @author Yong.Teng + * @since 1.0.0 + */ +abstract class AbstractAdapterProperties + implements AdapterProperties { + + /** + * 实例清单 + */ + private Map instances = new HashMap<>(); + + /** + * 返回实例清单 + * + * @return 实例清单 + */ + @Override + public Map getInstances() { + return instances; + } + + /** + * 设置实例清单 + * + * @param instances + * 实例清单 + */ + public void setInstances(Map instances) { + this.instances = instances; + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AbstractMqAdapterProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AbstractMqAdapterProperties.java new file mode 100644 index 0000000..cf4edc2 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AbstractMqAdapterProperties.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.MqBaseInstanceConfiguration; + +/** + * @author Yong.Teng + * @since 1.0.0 + */ +abstract class AbstractMqAdapterProperties + extends AbstractAdapterProperties { + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterClientConfiguration.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterClientConfiguration.java new file mode 100644 index 0000000..c75ab37 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterClientConfiguration.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.client.adapter.AdapterClient; +import com.buession.canal.client.adapter.KafkaAdapterClient; +import com.buession.canal.client.adapter.PulsarMQAdapterClient; +import com.buession.canal.client.adapter.RabbitMQAdapterClient; +import com.buession.canal.client.adapter.RocketMQAdapterClient; +import com.buession.canal.client.adapter.TcpAdapterClient; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * @author Yong.Teng + * @since 1.0.0 + */ +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties(CanalProperties.class) +public class AdapterClientConfiguration { + + @FunctionalInterface + interface CanalAdapterClientBuilder { + + C newInstance(final String destination, final IC instance); + + } + + interface IAdapterClientConfiguration { + + /** + * 初始化 {@link AdapterClient} 列表 bean + * + * @return {@link AdapterClient} 列表 bean + */ + Set createAdapterClients(); + + } + + static abstract class AbstractAdapterClientConfiguration implements IAdapterClientConfiguration { + + protected CanalProperties canalProperties; + + AbstractAdapterClientConfiguration(final CanalProperties canalProperties) { + this.canalProperties = canalProperties; + } + + protected Set createCanalAdapterClients( + final AdapterProperties properties, final CanalAdapterClientBuilder builder) { + return properties.getInstances().entrySet().stream().map((e)->builder.newInstance(e.getKey(), e.getValue())) + .collect(Collectors.toSet()); + } + + } + + @Configuration(proxyBeanMethods = false) + @EnableConfigurationProperties(CanalProperties.class) + @Conditional(AdapterClientConfiguredCondition.KafkaCanalAdapterClientConfiguredCondition.class) + @ConditionalOnMissingBean(IAdapterClientConfiguration.class) + @ConfigurationProperties(prefix = CanalProperties.PREFIX + ".kafka") + static class KafkaAdapterClientConfiguration extends AbstractAdapterClientConfiguration { + + public KafkaAdapterClientConfiguration(final CanalProperties canalProperties) { + super(canalProperties); + } + + @Bean + @Override + public Set createAdapterClients() { + final KafkaProperties kafka = canalProperties.getKafka(); + return createCanalAdapterClients(kafka, (topic, instance)->new KafkaAdapterClient(kafka.getServers(), topic, + instance.getGroupId(), instance.getPartition(), instance, instance.isFlatMessage())); + } + + } + + @Configuration(proxyBeanMethods = false) + @EnableConfigurationProperties(CanalProperties.class) + @Conditional(AdapterClientConfiguredCondition.PulsarCanalAdapterClientConfiguredCondition.class) + @ConditionalOnMissingBean(IAdapterClientConfiguration.class) + @ConfigurationProperties(prefix = CanalProperties.PREFIX + ".pulsar") + static class PulsarAdapterClientConfiguration extends AbstractAdapterClientConfiguration { + + public PulsarAdapterClientConfiguration(final CanalProperties canalProperties) { + super(canalProperties); + } + + @Bean + @Override + public Set createAdapterClients() { + final PulsarProperties pulsar = canalProperties.getPulsar(); + return createCanalAdapterClients(pulsar, (topic, instance)->new PulsarMQAdapterClient( + pulsar.getServiceUrl(), pulsar.getRoleToken(), topic, instance.getSubscriptName(), + pulsar.getGetBatchTimeout(), pulsar.getBatchProcessTimeout(), pulsar.getRedeliveryDelay(), + pulsar.getAckTimeout(), pulsar.isRetry(), pulsar.isRetryDLQUpperCase(), + pulsar.getMaxRedeliveryCount(), instance, instance.isFlatMessage())); + } + + } + + @Configuration(proxyBeanMethods = false) + @EnableConfigurationProperties(CanalProperties.class) + @Conditional(AdapterClientConfiguredCondition.RabbitCanalAdapterClientConfiguredCondition.class) + @ConditionalOnMissingBean(IAdapterClientConfiguration.class) + @ConfigurationProperties(prefix = CanalProperties.PREFIX + ".rabbit") + static class RabbitAdapterClientConfiguration extends AbstractAdapterClientConfiguration { + + public RabbitAdapterClientConfiguration(final CanalProperties canalProperties) { + super(canalProperties); + } + + @Bean + @Override + public Set createAdapterClients() { + final RabbitProperties rabbit = canalProperties.getRabbit(); + return createCanalAdapterClients(rabbit, + (queueName, instance)->new RabbitMQAdapterClient(rabbit.getServer(), + rabbit.getVirtualHost(), rabbit.getUsername(), rabbit.getPassword(), queueName, + instance, instance.isFlatMessage())); + } + + } + + @Configuration(proxyBeanMethods = false) + @EnableConfigurationProperties(CanalProperties.class) + @Conditional(AdapterClientConfiguredCondition.RocketCanalAdapterClientConfiguredCondition.class) + @ConditionalOnMissingBean(IAdapterClientConfiguration.class) + @ConfigurationProperties(prefix = CanalProperties.PREFIX + ".rocket") + static class RocketAdapterClientConfiguration extends AbstractAdapterClientConfiguration { + + public RocketAdapterClientConfiguration(final CanalProperties canalProperties) { + super(canalProperties); + } + + @Bean + @Override + public Set createAdapterClients() { + final RocketProperties rocket = canalProperties.getRocket(); + return createCanalAdapterClients(rocket, (topic, instance)->new RocketMQAdapterClient( + rocket.getNameServer(), topic, instance.getGroupId(), rocket.getEnableMessageTrace(), + rocket.getCustomizedTraceTopic(), rocket.getAccessChannel(), instance, + instance.isFlatMessage())); + } + + } + + @Configuration(proxyBeanMethods = false) + @EnableConfigurationProperties(CanalProperties.class) + @Conditional(AdapterClientConfiguredCondition.TcpCanalAdapterClientConfiguredCondition.class) + @ConditionalOnMissingBean(IAdapterClientConfiguration.class) + @ConfigurationProperties(prefix = CanalProperties.PREFIX + ".tcp") + static class TcpAdapterClientConfiguration extends AbstractAdapterClientConfiguration { + + public TcpAdapterClientConfiguration(final CanalProperties canalProperties) { + super(canalProperties); + } + + @Bean + @Override + public Set createAdapterClients() { + final TcpProperties tcp = canalProperties.getTcp(); + return createCanalAdapterClients(tcp, + (destination, instance)->new TcpAdapterClient(tcp.getServer(), + tcp.getZkServers(), destination, tcp.getUsername(), tcp.getPassword(), instance)); + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterClientConfiguredCondition.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterClientConfiguredCondition.java new file mode 100644 index 0000000..94361a3 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterClientConfiguredCondition.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.core.CanalMode; +import com.buession.canal.springboot.BaseInstanceConfiguration; +import com.buession.springboot.boot.autoconfigure.condition.BaseOnPropertyExistCondition; +import org.springframework.boot.context.properties.bind.Bindable; + +/** + * @author Yong.Teng + * @since 1.0.0 + */ +interface AdapterClientConfiguredCondition { + + abstract class AbstractCanalAdapterClientConfiguredCondition

> extends BaseOnPropertyExistCondition

+ implements AdapterClientConfiguredCondition { + + public AbstractCanalAdapterClientConfiguredCondition(final Bindable

registration, final CanalMode mode) { + super(CanalProperties.PREFIX + '.' + mode.getName(), "registered canal adapter clients", registration, + "Canal " + mode.getName() + "adapter client configured condition"); + } + + } + + class KafkaCanalAdapterClientConfiguredCondition + extends AbstractCanalAdapterClientConfiguredCondition { + + private final static Bindable REGISTRATION = Bindable.of(KafkaProperties.class); + + KafkaCanalAdapterClientConfiguredCondition() { + super(REGISTRATION, CanalMode.KAFKA); + } + + } + + class RabbitCanalAdapterClientConfiguredCondition + extends AbstractCanalAdapterClientConfiguredCondition { + + private final static Bindable REGISTRATION = Bindable.of(RabbitProperties.class); + + RabbitCanalAdapterClientConfiguredCondition() { + super(REGISTRATION, CanalMode.RABBIT_MQ); + } + + } + + class PulsarCanalAdapterClientConfiguredCondition + extends AbstractCanalAdapterClientConfiguredCondition { + + private final static Bindable REGISTRATION = Bindable.of(PulsarProperties.class); + + PulsarCanalAdapterClientConfiguredCondition() { + super(REGISTRATION, CanalMode.PULSAR_MQ); + } + + } + + class RocketCanalAdapterClientConfiguredCondition + extends AbstractCanalAdapterClientConfiguredCondition { + + private final static Bindable REGISTRATION = Bindable.of(RocketProperties.class); + + RocketCanalAdapterClientConfiguredCondition() { + super(REGISTRATION, CanalMode.ROCKET_MQ); + } + + } + + class TcpCanalAdapterClientConfiguredCondition + extends AbstractCanalAdapterClientConfiguredCondition { + + private final static Bindable REGISTRATION = Bindable.of(TcpProperties.class); + + TcpCanalAdapterClientConfiguredCondition() { + super(REGISTRATION, CanalMode.TCP); + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterProperties.java new file mode 100644 index 0000000..b776faf --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/AdapterProperties.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.core.Configuration; + +import java.util.Map; + +/** + * Canal 适配器配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +@FunctionalInterface +interface AdapterProperties { + + /** + * 返回实例清单 + * + * @return 实例清单 + */ + Map getInstances(); + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/CanalConfiguration.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/CanalConfiguration.java new file mode 100644 index 0000000..b04f82b --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/CanalConfiguration.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.client.CanalContext; +import com.buession.canal.client.DefaultCanalContext; +import com.buession.canal.client.adapter.AdapterClient; +import com.buession.canal.client.dispatcher.Dispatcher; +import com.buession.canal.spring.client.factory.CanalClientFactoryBean; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.util.Set; +import java.util.concurrent.ExecutorService; + +/** + * @author Yong.Teng + * @since 1.0.0 + */ +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties(CanalProperties.class) +@Import({ThreadPoolConfiguration.class, AdapterClientConfiguration.class}) +public class CanalConfiguration { + + @Bean(destroyMethod = "destroy") + public CanalClientFactoryBean createCanalClientFactoryBean( + ObjectProvider> canalAdapterClients, ObjectProvider dispatcher, + @Qualifier("canalExecutorService") ObjectProvider executorService) { + final CanalClientFactoryBean canalClientFactoryBean = new CanalClientFactoryBean(); + + CanalContext context = new DefaultCanalContext(canalAdapterClients.getIfAvailable()); + canalClientFactoryBean.setContext(context); + canalClientFactoryBean.setDispatcher(dispatcher.getIfAvailable()); + canalClientFactoryBean.setExecutor(executorService.getIfAvailable()); + + return canalClientFactoryBean; + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/CanalProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/CanalProperties.java new file mode 100644 index 0000000..be4736f --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/CanalProperties.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.ThreadConfig; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Canal 配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +@ConfigurationProperties(CanalProperties.PREFIX) +public class CanalProperties { + + public final static String PREFIX = "spring.canal"; + + /** + * 线程池配置 + */ + private ThreadConfig thread = new ThreadConfig(); + + /** + * Kafka 实例配置 + */ + private KafkaProperties kafka; + + /** + * RabbitMQ 实例配置 + */ + private RabbitProperties rabbit; + + /** + * RocketMQ 实例配置 + */ + private RocketProperties rocket; + + /** + * PulsarMQ 实例配置 + */ + private PulsarProperties pulsar; + + /** + * TCP 实例配置 + */ + private TcpProperties tcp; + + /** + * 返回线程池配置 + * + * @return 线程池配置 + */ + public ThreadConfig getThread() { + return thread; + } + + /** + * 设置线程池配置 + * + * @param thread + * 线程池配置 + */ + public void setThread(ThreadConfig thread) { + this.thread = thread; + } + + /** + * 返回 Kafka 实例配置 + * + * @return Kafka 实例配置 + */ + public KafkaProperties getKafka() { + return kafka; + } + + /** + * 设置 Kafka 实例配置 + * + * @param kafka + * Kafka 实例配置 + */ + public void setKafka(KafkaProperties kafka) { + this.kafka = kafka; + } + + /** + * 返回 RabbitMQ 实例配置 + * + * @return RabbitMQ 实例配置 + */ + public RabbitProperties getRabbit() { + return rabbit; + } + + /** + * 设置 RabbitMQ 实例配置 + * + * @param rabbit + * RabbitMQ 实例配置 + */ + public void setRabbit(RabbitProperties rabbit) { + this.rabbit = rabbit; + } + + /** + * 返回 RocketMQ 实例配置 + * + * @return RocketMQ 实例配置 + */ + public RocketProperties getRocket() { + return rocket; + } + + /** + * 设置 RocketMQ 实例配置 + * + * @param rocket + * RocketMQ 实例配置 + */ + public void setRocket(RocketProperties rocket) { + this.rocket = rocket; + } + + /** + * 返回 PulsarMQ 实例配置 + * + * @return PulsarMQ 实例配置 + */ + public PulsarProperties getPulsar() { + return pulsar; + } + + /** + * 设置 PulsarMQ 实例配置 + * + * @param pulsar + * PulsarMQ 实例配置 + */ + public void setPulsar(PulsarProperties pulsar) { + this.pulsar = pulsar; + } + + /** + * 返回 TCP 实例配置 + * + * @return TCP 实例配置 + */ + public TcpProperties getTcp() { + return tcp; + } + + /** + * 设置 TCP 实例配置 + * + * @param tcp + * TCP 实例配置 + */ + public void setTcp(TcpProperties tcp) { + this.tcp = tcp; + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/KafkaProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/KafkaProperties.java new file mode 100644 index 0000000..0662a61 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/KafkaProperties.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.MqBaseInstanceConfiguration; + +/** + * Kafka 适配器配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public class KafkaProperties extends AbstractMqAdapterProperties { + + /** + * Kafka 主机地址 + */ + private String servers; + + /** + * 返回 Kafka 主机地址 + * + * @return Kafka 主机地址 + */ + public String getServers() { + return servers; + } + + /** + * 设置 Kafka 主机地址 + * + * @param servers + * Kafka 主机地址 + */ + public void setServers(String servers) { + this.servers = servers; + } + + public final static class Instance extends MqBaseInstanceConfiguration { + + /** + * Group Id + */ + private String groupId; + + /** + * 分区 + */ + private Integer partition; + + public Instance() { + super(); + } + + /** + * 返回 Group Id + * + * @return Group Id + */ + public String getGroupId() { + return groupId; + } + + /** + * 设置 Group Id + * + * @param groupId + * Group Id + */ + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + /** + * 返回分区 + * + * @return 分区 + */ + public Integer getPartition() { + return partition; + } + + /** + * 设置分区 + * + * @param partition + * 分区 + */ + public void setPartition(Integer partition) { + this.partition = partition; + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/PulsarProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/PulsarProperties.java new file mode 100644 index 0000000..2c39fcd --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/PulsarProperties.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.client.adapter.PulsarMQAdapterClient; +import com.buession.canal.springboot.MqBaseInstanceConfiguration; + +/** + * PulsarMQ 适配器配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public class PulsarProperties extends AbstractMqAdapterProperties { + + /** + * PulsarMQ 服务地址 + */ + private String serviceUrl; + + /** + * Role Token + */ + private String roleToken; + + /** + * - + */ + private int getBatchTimeout = PulsarMQAdapterClient.DEFAULT_GET_BATCH_TIMEOUT; + + /** + * - + */ + private int batchProcessTimeout = PulsarMQAdapterClient.DEFAULT_BATCH_PROCESS_TIMEOUT; + + /** + * - + */ + private int redeliveryDelay = PulsarMQAdapterClient.DEFAULT_REDELIVERY_DELAY; + + /** + * - + */ + private int ackTimeout = PulsarMQAdapterClient.DEFAULT_ACK_TIMEOUT; + + /** + * 是否重试 + */ + private boolean retry = true; + + /** + * - + */ + private boolean retryDLQUpperCase = true; + + /** + * - + */ + private Integer maxRedeliveryCount; + + /** + * 返回 PulsarMQ 服务地址 + * + * @return PulsarMQ 服务地址 + */ + public String getServiceUrl() { + return serviceUrl; + } + + /** + * 设置 PulsarMQ 服务地址 + * + * @param serviceUrl + * PulsarMQ 服务地址 + */ + public void setServiceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + } + + /** + * 返回 Role Token + * + * @return Role Token + */ + public String getRoleToken() { + return roleToken; + } + + /** + * 设置 Role Token + * + * @param roleToken + * Role Token + */ + public void setRoleToken(String roleToken) { + this.roleToken = roleToken; + } + + public int getGetBatchTimeout() { + return getBatchTimeout; + } + + public void setGetBatchTimeout(int getBatchTimeout) { + this.getBatchTimeout = getBatchTimeout; + } + + public int getBatchProcessTimeout() { + return batchProcessTimeout; + } + + public void setBatchProcessTimeout(int batchProcessTimeout) { + this.batchProcessTimeout = batchProcessTimeout; + } + + public int getRedeliveryDelay() { + return redeliveryDelay; + } + + public void setRedeliveryDelay(int redeliveryDelay) { + this.redeliveryDelay = redeliveryDelay; + } + + public int getAckTimeout() { + return ackTimeout; + } + + public void setAckTimeout(int ackTimeout) { + this.ackTimeout = ackTimeout; + } + + /** + * 返回是否重试 + * + * @return true / false + */ + public boolean isRetry() { + return retry; + } + + /** + * 设置是否重试 + * + * @param retry + * true / false + */ + public void setRetry(boolean retry) { + this.retry = retry; + } + + public boolean isRetryDLQUpperCase() { + return retryDLQUpperCase; + } + + public void setRetryDLQUpperCase(boolean retryDLQUpperCase) { + this.retryDLQUpperCase = retryDLQUpperCase; + } + + public Integer getMaxRedeliveryCount() { + return maxRedeliveryCount; + } + + public void setMaxRedeliveryCount(Integer maxRedeliveryCount) { + this.maxRedeliveryCount = maxRedeliveryCount; + } + + public final static class Instance extends MqBaseInstanceConfiguration { + + /** + * 订阅名称 + */ + private String subscriptName; + + public Instance() { + super(); + } + + /** + * 返回订阅名称 + * + * @return 订阅名称 + */ + public String getSubscriptName() { + return subscriptName; + } + + /** + * 设置订阅名称 + * + * @param subscriptName + * 订阅名称 + */ + public void setSubscriptName(String subscriptName) { + this.subscriptName = subscriptName; + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/RabbitProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/RabbitProperties.java new file mode 100644 index 0000000..eacf96b --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/RabbitProperties.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.MqBaseInstanceConfiguration; + +/** + * RabbitMQ 适配器配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public class RabbitProperties extends AbstractMqAdapterProperties { + + /** + * RabbitMQ 主机地址 + */ + private String server; + + /** + * Virtual Host + */ + private String virtualHost; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 返回 RabbitMQ 主机地址 + * + * @return RabbitMQ 主机地址 + */ + public String getServer() { + return server; + } + + /** + * 设置 RabbitMQ 主机地址 + * + * @param server + * RabbitMQ 主机地址 + */ + public void setServer(String server) { + this.server = server; + } + + /** + * 返回 Virtual Host + * + * @return Virtual Host + */ + public String getVirtualHost() { + return virtualHost; + } + + /** + * 设置 Virtual Host + * + * @param virtualHost + * Virtual Host + */ + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + /** + * 返回用户名 + * + * @return 用户名 + */ + public String getUsername() { + return username; + } + + /** + * 设置用户名 + * + * @param username + * 用户名 + */ + public void setUsername(String username) { + this.username = username; + } + + /** + * 返回密码 + * + * @return 密码 + */ + public String getPassword() { + return password; + } + + /** + * 设置密码 + * + * @param password + * 密码 + */ + public void setPassword(String password) { + this.password = password; + } + + public final static class Instance extends MqBaseInstanceConfiguration { + + public Instance() { + super(); + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/RocketProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/RocketProperties.java new file mode 100644 index 0000000..92d4bed --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/RocketProperties.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.MqBaseInstanceConfiguration; + +/** + * RocketMQ 适配器配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public class RocketProperties extends AbstractMqAdapterProperties { + + /** + * RocketMQ NameServer 地址 + */ + private String nameServer; + + /** + * 是否启用消息跟踪 + */ + private Boolean enableMessageTrace; + + /** + * 消息轨迹数据 Topic + */ + private String customizedTraceTopic; + + /** + * - + */ + private String accessChannel; + + /** + * 返回 RocketMQ NameServer 地址 + * + * @return RocketMQ NameServer 地址 + */ + public String getNameServer() { + return nameServer; + } + + /** + * 设置 RocketMQ NameServer 地址 + * + * @param nameServer + * RocketMQ NameServer 地址 + */ + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + /** + * 返回是否启用消息跟踪 + * + * @return true / false + */ + public Boolean getEnableMessageTrace() { + return enableMessageTrace; + } + + /** + * 设置是否启用消息跟踪 + * + * @param enableMessageTrace + * true / false + */ + public void setEnableMessageTrace(Boolean enableMessageTrace) { + this.enableMessageTrace = enableMessageTrace; + } + + /** + * 返回消息轨迹数据 Topic + * + * @return 消息轨迹数据 Topic + */ + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + /** + * 设置消息轨迹数据 Topic + * + * @param customizedTraceTopic + * 消息轨迹数据 Topic + */ + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } + + public String getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(String accessChannel) { + this.accessChannel = accessChannel; + } + + public final static class Instance extends MqBaseInstanceConfiguration { + + /** + * Group ID + */ + private String groupId; + + /** + * 名称空间 + */ + private String namespace; + + public Instance() { + super(); + } + + /** + * 返回 Group ID + * + * @return Group ID + */ + public String getGroupId() { + return groupId; + } + + /** + * 设置 Group ID + * + * @param groupId + * Group ID + */ + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + /** + * 返回名称空间 + * + * @return 名称空间 + */ + public String getNamespace() { + return namespace; + } + + /** + * 设置名称空间 + * + * @param namespace + * 名称空间 + */ + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/TcpProperties.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/TcpProperties.java new file mode 100644 index 0000000..3113719 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/TcpProperties.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.springboot.BaseInstanceConfiguration; + +/** + * Kafka 适配器配置 + * + * @author Yong.Teng + * @since 1.0.0 + */ +public class TcpProperties extends AbstractAdapterProperties { + + /** + * 主机地址 + */ + private String server; + + /** + * Zookeeper 主机地址 + */ + private String zkServers; + + /** + * 用户名 + */ + private String username; + + /** + * 密码 + */ + private String password; + + /** + * 返回主机地址 + * + * @return 主机地址 + */ + public String getServer() { + return server; + } + + /** + * 设置主机地址 + * + * @param server + * 主机地址 + */ + public void setServer(String server) { + this.server = server; + } + + /** + * 返回 Zookeeper 主机地址 + * + * @return Zookeeper 主机地址 + */ + public String getZkServers() { + return zkServers; + } + + /** + * 设置 Zookeeper 主机地址 + * + * @param zkServers + * Zookeeper 主机地址 + */ + public void setZkServers(String zkServers) { + this.zkServers = zkServers; + } + + /** + * 返回用户名 + * + * @return 用户名 + */ + public String getUsername() { + return username; + } + + /** + * 设置用户名 + * + * @param username + * 用户名 + */ + public void setUsername(String username) { + this.username = username; + } + + /** + * 返回密码 + * + * @return 密码 + */ + public String getPassword() { + return password; + } + + /** + * 设置密码 + * + * @param password + * 密码 + */ + public void setPassword(String password) { + this.password = password; + } + + public final static class Instance extends BaseInstanceConfiguration { + + public Instance() { + } + + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/ThreadPoolConfiguration.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/ThreadPoolConfiguration.java new file mode 100644 index 0000000..516d7d0 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/ThreadPoolConfiguration.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.canal.springboot.autoconfigure; + +import com.buession.canal.core.concurrent.DefaultCanalThreadPoolExecutor; +import com.buession.canal.springboot.ThreadConfig; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author Yong.Teng + * @since 1.0.0 + */ +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties({CanalProperties.class}) +public class ThreadPoolConfiguration { + + private final ThreadConfig thread; + + public ThreadPoolConfiguration(CanalProperties canalProperties) { + this.thread = canalProperties.getThread(); + } + + @Bean(name = "canalExecutorService", destroyMethod = "shutdown") + public ExecutorService executorService() { + RejectedExecutionHandler rejectedExecutionHandler; + + switch(thread.getPolicy()){ + case ABORT: + rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); + break; + case CALLER_RUNS: + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + break; + case DISCARD: + rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy(); + break; + case DISCARD_OLDEST: + rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy(); + break; + default: + rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy(); + break; + } + + Integer corePoolSize = thread.getCorePoolSize(); + Integer maximumPoolSize = thread.getMaximumPoolSize(); + + if(corePoolSize == null || maximumPoolSize == null){ + int coreSize = Runtime.getRuntime().availableProcessors(); + if(corePoolSize == null){ + corePoolSize = coreSize << 1; + } + if(maximumPoolSize == null){ + maximumPoolSize = coreSize << 1; + } + } + + return new DefaultCanalThreadPoolExecutor(thread.getNamePrefix(), corePoolSize, maximumPoolSize, + thread.getKeepAliveTime().toMillis(), rejectedExecutionHandler); + } + +} diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/package-info.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/package-info.java new file mode 100644 index 0000000..ac7f947 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/autoconfigure/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +/** + * @author Yong.Teng + * @since 1.0.0 + */ +package com.buession.canal.springboot.autoconfigure; \ No newline at end of file diff --git a/buession-canal-springboot/src/main/java/com/buession/canal/springboot/package-info.java b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/package-info.java new file mode 100644 index 0000000..1d993f5 --- /dev/null +++ b/buession-canal-springboot/src/main/java/com/buession/canal/springboot/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +/** + * @author Yong.Teng + * @since 1.0.0 + */ +package com.buession.canal.springboot; \ No newline at end of file diff --git a/buession-canal-springboot/src/main/resources/META-INF/spring.factories b/buession-canal-springboot/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..ca2df33 --- /dev/null +++ b/buession-canal-springboot/src/main/resources/META-INF/spring.factories @@ -0,0 +1,4 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.buession.canal.springboot.autoconfigure.ThreadPoolConfiguration, \ + com.buession.canal.springboot.autoconfigure.AdapterClientConfiguration, \ + com.buession.canal.springboot.autoconfigure.CanalConfiguration \ No newline at end of file