博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringCloudBus使用Kafka实现消息总线
阅读量:6237 次
发布时间:2019-06-22

本文共 5386 字,大约阅读时间需要 17 分钟。

Kafka是分布式发布-订阅消息系统,最初由LinkedIn公司开发,之后成为之后成为Apache基金会的一部分,由 和 编写。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

在开始本文前,需要搭建kafka的环境,如果是在CentOS环境下,可以看看我前面的文章: 。其他平台下可以自行百度或Google。

在之前的环境中,需要修改server.properties文件,开启9092端口的监听:

listeners=PLAINTEXT://your.host.name:9092

SpringBoot简单整合Kafka

因为SpringCloud是基于SpringBoot的,所以在使用SpringCloudBus整合之前先用SpringBoot整合并记录下来。

创建项目

这里创建一个名为kafka-hello的SpringBoot项目,并添加以下依赖:

org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.kafka
spring-kafka
1.1.1.RELEASE
com.google.code.gson
gson
2.8.2

消息实体类

@Datapublic class Message {    private Long id;//id    private String msg; //消息    private Date sendTime; //发送时间}

消息产生者

在该类中创建一个消息发送的方法,使用KafkaTemplate.send()发送消息,wqh是Kafka里的Topic。

@Component@Slf4jpublic class KafkaSender {    @Autowired    private KafkaTemplate
kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send(Long i){ Message message = new Message(); message.setId(i); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("========发送消息 "+i+" >>>>{}<<<<<==========",gson.toJson(message)); kafkaTemplate.send("wqh",gson.toJson(message)); }}

消息接收类,

在这个类中,创建consumer方法,并使用@KafkaListener注解监听指定的topic,如这里是监听wanqh和wqh两个topic。

@Component@Slf4jpublic class KafkaConsumer {    @KafkaListener(topics = {"wanqh","wqh"})    public void consumer(ConsumerRecord
consumerRecord){ //判断是否为null Optional
kafkaMessage = Optional.ofNullable(consumerRecord.value()); log.info(">>>>>>>>>> record =" + kafkaMessage); if(kafkaMessage.isPresent()){ //得到Optional实例中的值 Object message = kafkaMessage.get(); log.info(">>>>>>>>接收消息message =" + message); } }}

修改启动类

@SpringBootApplicationpublic class KafkaApplication {    @Autowired    private KafkaSender kafkaSender;    @PostConstruct    public void init(){      for (int i = 0; i < 10; i++) {        //调用消息发送类中的消息发送方法        kafkaSender.send((long) i);      }    }    public static void main(String[] args) {       SpringApplication.run(KafkaApplication.class, args);    }}

配置文件

spring.application.name=kafka-helloserver.port=8080#============== kafka ===================# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=192.168.18.136:9092#=============== provider  =======================spring.kafka.producer.retries=0# 每次批量发送消息的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer  =======================# 指定默认消费者group idspring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

测试

直接启动该项目:

SpringCloudBus整合Kafka

前面介绍使用RabbitMQ整合SpringCloudBus实现了消息总线,并且测试了动态刷新配置文件。RabbitMQ是通过引入spring-cloud-starter-bus-amqp模块来实现消息总线。若使用Kafka实现消息总线,我们可以直接将之前添加的spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka

这里我将前面的config-client复制一份,改名config-client-kafka。传送门:

  • 所添加的依赖:
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-config
org.springframework.retry
spring-retry
org.springframework.boot
spring-boot-starter-aop
org.springframework.boot
spring-boot-starter-actuator
org.springframework.cloud
spring-cloud-starter-eureka
org.springframework.cloud
spring-cloud-starter-bus-kafka
  • 添加kafka的配置信息
#Kafka的服务端列表,默认localhostspring.cloud.stream.kafka.binder.brokers=192.168.18.136:9092#Kafka服务端的默认端口,当brokers属性中没有配置端口信息时,就会使用这个默认端口,默认9092spring.cloud.stream.kafka.binder.defaultBrokerPort=9092#Kafka服务端连接的ZooKeeper节点列表,默认localhostspring.cloud.stream.kafka.binder.zkNodes=192.168.18.136:2181#ZooKeeper节点的默认端口,当zkNodes属性中没有配置端口信息时,就会使用这个默认端口,默认2181spring.cloud.stream.kafka.binder.defaultZkPort=2181

测试方法与前一篇一样,不介绍了。


参考:

  • 《SpringCloud微服务实战》

项目地址:

原文[地址:

转载地址:http://rwkia.baihongyu.com/

你可能感兴趣的文章
Effective C++ Item 28 避免返回对象内部数据的引用或指针
查看>>
Activity简单几步支持向右滑动返回
查看>>
Spring 通过Java代码装配bean
查看>>
图片的base64编码通过javascript生成图片--当前URL地址的二维码应用
查看>>
sass10 demo1
查看>>
Asp.net mvc自定义Filter简单使用
查看>>
[LeetCode][Java] Binary Tree Level Order Traversal
查看>>
机器学习模板
查看>>
java thread 线程40个问题汇总
查看>>
第二部分计算机系统基础[专业课考试2]
查看>>
如何选择开源许可证
查看>>
x264代码剖析(八):encode()函数之x264_encoder_close()函数
查看>>
下半部和推后运行的工作
查看>>
(转) RabbitMQ学习之延时队列
查看>>
A Taxonomy for Performance
查看>>
C#文件运行类的VB.NET版本号
查看>>
iOS 的单例模式 dispatch_once
查看>>
tomcat 配置https
查看>>
劝学篇-荀子
查看>>
解决redis aof文件过大的问题
查看>>