博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Eclipse+kafka集群 实例源码
阅读量:6423 次
发布时间:2019-06-23

本文共 6483 字,大约阅读时间需要 21 分钟。

  1. 首先调试zookeeper与kafka正常配置,并达到集群功能,这个是前提条件。
  2. 建立Java工程,拷贝kafka的lib文件到工程目录下。
  3. Lib下有些asc文件,一定要去除,否则java编译错误
  4. 提示poll方法错误,最后找到原因为本机安装了jre7和jre8,默认加载jre7类包,改为jre8包后,系统调试通过。
  5. listeners必须要配置成Ip地址的形式 例如: listeners=PLAINTEXT://192.168.5.132:9092,我就是这个地方配置出错。
  6. Producer类如下:
package kafkamanager;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class producer {    public producer() {        // TODO Auto-generated constructor stub    }    public static void main(String[] args) throws InterruptedException {        // TODO Auto-generated method stub                             Properties props = new Properties();            props.put("bootstrap.servers", "192.168.5.132:9092,192.168.5.133:9092,192.168.5.135:9092");                        //broker消息确认的模式,有三种:默认1            //0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认            //1:由Leader确认,Leader接收到消息后会立即返回确认信息            //all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息            props.put("acks", "all");            //发送失败时Producer端的重试次数,默认为0            props.put("retries", 0);            //当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节            props.put("batch.size", 16384);            //发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,            //配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0            props.put("linger.ms", 1);            //消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,            //那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)            props.put("buffer.memory", 33554432);             //消息key/value的序列器Class,根据key和value的类型决定            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");            System.out.println("Start send !!!!!!");            Producer
producer = new KafkaProducer
(props); for (int i = 0; i < 5; i++) { //producer.send(new ProducerRecord
("sss", "HelloTest! "+ Integer.toString(i))); producer.send(new ProducerRecord
("ooo", Integer.toString(i), Integer.toString(i))); java.lang.Thread.sleep(2); } producer.close(); System.out.println("Start send end !!!!!!"); System.exit(0); }}
View Code

 

  7.consumer类如下

package kafkamanager;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class consumer implements Runnable {    private final KafkaConsumer
consumer; private final String topic; private static final String GROUPID = "groupA"; public consumer(String topicName) { Properties props = new Properties(); //********** 一定要注意:server.properties 的配置: listeners=PLAINTEXT://192.168.5.132:9092,这个地方配置成机器名接收不到消息。 props.put("bootstrap.servers", "192.168.5.132:9092"); //Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息, //不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。 //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据 props.put("group.id", GROUPID); //是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true props.put("enable.auto.commit", "false"); //自动提交offset的间隔毫秒数,默认5000。 //本 例中采用的是自动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自动提交的间隔内发生故障(比如整个JVM进程死掉),那么有一部分消息是会被 重复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代 码中用consumer.commitSync()来手动提交。 props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); ////此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空), //consumer应该从哪个offset开始消费.latest表示接受接收最大的offset(即最新消息), //earliest表示最小offset,即从topic的开始位置消费所有消息.最好设为earliest,这样新的分组,能从最开始进行处理 //原由是Kafka新的消费者,默认情况下会从最后一条消费进行消费,就是开始消费的时候,会从新增加的消息开始处理,即从我开始添加的1000条以后,才会开始处理。 //所以必须要设置auto.offset.reset设置新加入的消费者,从头条开始处理消费。当然有些情况,可能需要从最新的开始处理。 //props.put("auto.offset.reset", "earliest"); props.put("auto.offset.reset", "latest"); //props.put("auto.offset.reset", "none"); //props.put("max.poll.records", 2);//设置最新拉取数据条数 //props.put("key.deserializer", StringDeserializer.class.getName()); //props.put("value.deserializer", StringDeserializer.class.getName()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer
(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); while (true) { //poll方法即是从Broker拉取消息,在poll之前首先要用subscribe方法订阅一个Topic //如 果Topic有多个partition,KafkaConsumer会在多个partition间以轮询方式实现负载均衡 //。如果启动了多个 Consumer线程,Kafka也能够通过zookeeper实现多个Consumer间的调度,保证同一组下的Consumer不会重复消费消息。 //注 意,Consumer数量不能超过partition数,超出部分的Consumer无法拉取到任何数据。 ConsumerRecords
records = consumer.poll(100);//拉取超时毫秒数,如果没有新的消息可供拉取,consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集 for (ConsumerRecord
record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key:" + record.key() + ", message: " + record.value()); } //提交已经拉取出来的offset,如果是手动模式下面,必须拉取之后提交,否则以后会拉取重复消息 consumer.commitSync(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //break; } } public static void main(String args[]) { consumer test1 = new consumer("ooo"); Thread thread1 = new Thread(test1); thread1.start(); }}
View Code

  8.该实例已调通,希望对初学者有帮助。

 

  

转载于:https://www.cnblogs.com/jianzi/p/10026265.html

你可能感兴趣的文章
Java Collection: List、Set、 Map、 HashMap、 Hashtable、 Vector
查看>>
T-SQL查询进阶--流程控制语句
查看>>
Excel VBA小试
查看>>
备份Toad中保存的数据库连接用户名和密码
查看>>
ASP.NET中 Repeater 的使用前台绑定
查看>>
微信公众平台模拟群发技术
查看>>
C语言学习之指针详解
查看>>
学习使用Bing Maps Silverlight Control(一):准备和新建
查看>>
讲一讲什么叫阻塞非阻塞同步异步
查看>>
选择器补遗
查看>>
C# 实体集合和实体转换成相应的string、XDocument、XElement、XDocument
查看>>
轻松记住大端小端的含义(附对大端和小端的解释)
查看>>
dreamweaver中的 map怎么调用?_制作热点图像区域
查看>>
代码19
查看>>
Win10系列:UWP界面布局进阶5
查看>>
ABP Zero 本地化语言的初始化和扩展
查看>>
转Hibernate 一对多关联的CRUD__@ManyToOne(cascade=(CascadeType.ALL))
查看>>
FCT需求分析
查看>>
开门人和关门人(杭电1234)
查看>>
万能adapter
查看>>