- 首先调试zookeeper与kafka正常配置,并达到集群功能,这个是前提条件。
- 建立Java工程,拷贝kafka的lib文件到工程目录下。
- Lib下有些asc文件,一定要去除,否则java编译错误
- 提示poll方法错误,最后找到原因为本机安装了jre7和jre8,默认加载jre7类包,改为jre8包后,系统调试通过。
- listeners必须要配置成Ip地址的形式 例如: listeners=PLAINTEXT://192.168.5.132:9092,我就是这个地方配置出错。
- Producer类如下:
package kafkamanager;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class producer { public producer() { // TODO Auto-generated constructor stub } public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "192.168.5.132:9092,192.168.5.133:9092,192.168.5.135:9092"); //broker消息确认的模式,有三种:默认1 //0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认 //1:由Leader确认,Leader接收到消息后会立即返回确认信息 //all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息 props.put("acks", "all"); //发送失败时Producer端的重试次数,默认为0 props.put("retries", 0); //当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节 props.put("batch.size", 16384); //发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下, //配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0 props.put("linger.ms", 1); //消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度, //那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB) props.put("buffer.memory", 33554432); //消息key/value的序列器Class,根据key和value的类型决定 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); System.out.println("Start send !!!!!!"); Producerproducer = new KafkaProducer (props); for (int i = 0; i < 5; i++) { //producer.send(new ProducerRecord ("sss", "HelloTest! "+ Integer.toString(i))); producer.send(new ProducerRecord ("ooo", Integer.toString(i), Integer.toString(i))); java.lang.Thread.sleep(2); } producer.close(); System.out.println("Start send end !!!!!!"); System.exit(0); }}
7.consumer类如下
package kafkamanager;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class consumer implements Runnable { private final KafkaConsumerconsumer; private final String topic; private static final String GROUPID = "groupA"; public consumer(String topicName) { Properties props = new Properties(); //********** 一定要注意:server.properties 的配置: listeners=PLAINTEXT://192.168.5.132:9092,这个地方配置成机器名接收不到消息。 props.put("bootstrap.servers", "192.168.5.132:9092"); //Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息, //不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。 //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据 props.put("group.id", GROUPID); //是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true props.put("enable.auto.commit", "false"); //自动提交offset的间隔毫秒数,默认5000。 //本 例中采用的是自动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自动提交的间隔内发生故障(比如整个JVM进程死掉),那么有一部分消息是会被 重复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代 码中用consumer.commitSync()来手动提交。 props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); ////此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空), //consumer应该从哪个offset开始消费.latest表示接受接收最大的offset(即最新消息), //earliest表示最小offset,即从topic的开始位置消费所有消息.最好设为earliest,这样新的分组,能从最开始进行处理 //原由是Kafka新的消费者,默认情况下会从最后一条消费进行消费,就是开始消费的时候,会从新增加的消息开始处理,即从我开始添加的1000条以后,才会开始处理。 //所以必须要设置auto.offset.reset设置新加入的消费者,从头条开始处理消费。当然有些情况,可能需要从最新的开始处理。 //props.put("auto.offset.reset", "earliest"); props.put("auto.offset.reset", "latest"); //props.put("auto.offset.reset", "none"); //props.put("max.poll.records", 2);//设置最新拉取数据条数 //props.put("key.deserializer", StringDeserializer.class.getName()); //props.put("value.deserializer", StringDeserializer.class.getName()); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer (props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); while (true) { //poll方法即是从Broker拉取消息,在poll之前首先要用subscribe方法订阅一个Topic //如 果Topic有多个partition,KafkaConsumer会在多个partition间以轮询方式实现负载均衡 //。如果启动了多个 Consumer线程,Kafka也能够通过zookeeper实现多个Consumer间的调度,保证同一组下的Consumer不会重复消费消息。 //注 意,Consumer数量不能超过partition数,超出部分的Consumer无法拉取到任何数据。 ConsumerRecords records = consumer.poll(100);//拉取超时毫秒数,如果没有新的消息可供拉取,consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集 for (ConsumerRecord record : records) { System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key:" + record.key() + ", message: " + record.value()); } //提交已经拉取出来的offset,如果是手动模式下面,必须拉取之后提交,否则以后会拉取重复消息 consumer.commitSync(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //break; } } public static void main(String args[]) { consumer test1 = new consumer("ooo"); Thread thread1 = new Thread(test1); thread1.start(); }}
8.该实例已调通,希望对初学者有帮助。