Skip to content

Commit

Permalink
add new demo of kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
zq2599 committed May 11, 2017
1 parent d2da35e commit 4dc04f8
Show file tree
Hide file tree
Showing 28 changed files with 1,090 additions and 3 deletions.
150 changes: 150 additions & 0 deletions kafka_consumer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId>
<artifactId>kafkaconsumer</artifactId>
<packaging>war</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>kafkaconsumer</name>
<url>http://maven.apache.org</url>
<properties>
<!-- spring版本号 -->
<spring.version>4.0.2.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<!-- 表示开发的时候引入,发布的时候不会加载此包 -->
<scope>test</scope>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- 导入java ee jar 包 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- JSTL标签类 -->
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<!-- 映入JSON -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!-- 上传组件包 -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>

<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1</version>
</dependency>

</dependencies>
<build>
<finalName>${project.artifactId}</finalName>

<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<!-- 是否替换资源中的属性-->
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<url>http://localhost:8082/manager/text</url>
<server>tomcat7</server>
<path>/${project.artifactId}</path>
<update>true</update>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.bolingcavalry.controller;

import com.bolingcavalry.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletRequest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;


@Controller
public class KafkaController {

private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Autowired
private KafkaService kafkaService;

/**
* 加入一些公共信息,这样在tomcat集群的时候可以确定响应来自哪台机器
* @param model
*/
private void addCommon(String topic, Model model){
if(null==model){
return;
}

if(!StringUtils.isEmpty(topic)) {
model.addAttribute("topic", topic);
}

model.addAttribute("time", sdf.format(new Date()));
Map<String, String> map = System.getenv();
model.addAttribute("serviceSource", map.get("TOMCAT_SERVER_ID"));
}


@RequestMapping("/poststart")
public String poststart(HttpServletRequest request, Model model) {
String topic = request.getParameter("topic");;
kafkaService.startConsume(topic);
addCommon(topic, model);
return "start_finish";
}


//start of entry

@RequestMapping("/start")
public String start(HttpServletRequest request, Model model) {
addCommon(null, model);
return "start";
}

//end of entry
}
6 changes: 6 additions & 0 deletions kafka_consumer/src/main/java/com/bolingcavalry/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.bolingcavalry;

public interface Constants {
String BROKER_HOST = "kafkahost";
String ZK_HOST = "zkhost";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.bolingcavalry.service;


import com.bolingcavalry.Constants;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* @author willzhao
* @version V1.0
* @Description: 单例模式的kafka底层基础服务
* @email [email protected]
* @Date 17/4/22 下午8:35
*/
public class KafkaConsumer {

final static String zkConnect = Constants.ZK_HOST + ":2181";
final static String groupId = "group1";

/**
* 单例
*/
private volatile static KafkaConsumer instance = null;

/**
* 禁止被外部实例化
*/
private KafkaConsumer(){
super();
}

public static KafkaConsumer getInstance(){
if(null==instance) {
synchronized (KafkaConsumer.class){
if(null==instance){
instance = new KafkaConsumer();
}
}
}

return instance;
}

/**
* 启动一个consumer
* @param topic
*/
public void startConsume(String topic){
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));


Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
final ConsumerIterator<byte[], byte[]> it = stream.iterator();

Runnable executor = new Runnable() {
@Override
public void run() {
while (it.hasNext()) {
System.out.println("************** receive:" + new String(it.next().message()));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};

new Thread(executor).start();
}




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.bolingcavalry.service;

/**
* @author willzhao
* @version V1.0
* @Description: Kafka基础服务的封装
* @email [email protected]
* @Date 17/4/23 下午5:03
*/
public interface KafkaService {
/**
* 启动消费者
* @param topic
*/
void startConsume(String topic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.KafkaConsumer;
import com.bolingcavalry.service.KafkaService;
import org.springframework.stereotype.Service;

import java.util.HashSet;
import java.util.Set;

/**
* @author willzhao
* @version V1.0
* @Description: kafka服务的实现
* @email [email protected]
* @Date 17/4/23 下午5:06
*/
@Service
public class KafkaServiceImpl implements KafkaService{

static Set<String> runningTopicSet = new HashSet<String>();

@Override
public void startConsume(String topic) {
if(runningTopicSet.contains(topic)){
System.out.println("topic [" + topic + "]'s consumer is running");
return;
}

//如果该topic对应的consumer没有启动,就立即启动
synchronized (runningTopicSet){
if(!runningTopicSet.contains(topic)){
System.out.println("start topic [" + topic + "]");
runningTopicSet.add(topic);
KafkaConsumer.getInstance().startConsume(topic);
}
}

}
}
Loading

0 comments on commit 4dc04f8

Please sign in to comment.