首页>代码>apache kafka简单生产者消费者实例>/kafka/src/main/java/com/liuyaofeng/kafka/service/KafkaConsumer.java
package com.liuyaofeng.kafka.service;
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
import kafka.serializer.StringDecoder;  
import kafka.utils.VerifiableProperties; 

@SuppressWarnings("all")
public class KafkaConsumer {
	 private final ConsumerConnector consumer;  
	  
	    private KafkaConsumer() {  
	        Properties props = new Properties();  
	        // zookeeper 配置  
	        props.put("zookeeper.connect", "127.0.0.1:2181");  
	  
	        // group 代表一个消费组  
	        props.put("group.id", "liugroup");  
	  
	        // zk连接超时  
	        props.put("zookeeper.session.timeout.ms", "4000");  
	        props.put("zookeeper.sync.time.ms", "200");  
	        props.put("rebalance.max.retries", "5");  
	        props.put("rebalance.backoff.ms", "1200");  
	          
	      
	        props.put("auto.commit.interval.ms", "1000");  
	        props.put("auto.offset.reset", "smallest");  
	        // 序列化类  
	        props.put("serializer.class", "kafka.serializer.StringEncoder");  
	  
	        ConsumerConfig config = new ConsumerConfig(props);  
	  
	        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
	    }  
	  
	    void consume() {  
	        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
	        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));  
	  
	        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
	        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
	  
	        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);  
	        KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);  
	        ConsumerIterator<String, String> it = stream.iterator();  
	        while (it.hasNext())  
	            System.out.println("******" + it.next().message() + "******");  
	    }  
	  
	    public static void main(String[] args) {  
	        new KafkaConsumer().consume();  
	    }  
}
最近下载更多
lironggang  LV38 2023年3月18日
503382513  LV10 2022年8月31日
zdm1231  LV2 2022年8月1日
chenhuahao  LV18 2019年9月11日
2663811356  LV1 2019年9月5日
倪卟懂  LV18 2019年7月22日
TwinkleQin  LV6 2019年6月26日
无上英雄  LV8 2019年6月22日
cqm0609  LV13 2019年4月30日
夕阳2266  LV10 2019年4月2日
最近浏览更多
youwuzuichen  LV10 1月4日
xiexiaoming05  LV14 2023年6月29日
zhaoka 2023年5月30日
暂无贡献等级
starmomom  LV10 2023年3月14日
cc900118  LV17 2022年12月3日
zdm1231  LV2 2022年8月1日
itcaizhe  LV9 2022年5月24日
泡芙1234  LV8 2022年4月21日
tangjj7260  LV18 2022年4月6日
yych007  LV5 2022年2月11日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友