springboot整合消息队列(ActiveMQ、RabbitMQ、Kafka)
安装过程不再演示
- ActiveMQ
- 引入pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
- application.yml配置相关基础信息
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
template:
default-destination: ypf
pub-sub-domain: true (默认FALSE 为点对点模式,TRUE 为开启订阅模式)
- 编写测试方法
生产消息,把消息存入自己建的队列中
@Autowired
private JmsMessagingTemplate messagingTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列:"+id);
messagingTemplate.convertAndSend("order.queue.id",id);
}
消费消息,新建一个listener,里面写拿到消息后要做的事情
@Component
public class MessageListener {
//监控指定队列中的消息,一有消息就消费掉
@JmsListener(destination = "order.queue.id")
//消费当前队列消息后,把返回值传入下个队列
@SendTo("order.other.queue.id")
public String receive(String id){
System.out.println("已完成短信发送业务 id:"+id);
return "new>>>>>>>>>>"+id;
}
}
- RabbitMQ
- 引入pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.yml配置相关基础信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
-
编写测试方法
编写配置类
示例配置类中配置了,RabbitMQ的 队列名称、队列交换机以及 队列和交换机绑定的操作
@Configuration
public class RabbitConfigDirect {
//新建队列,起个名称direct_queue
@Bean
public Queue directQueue(){
return new Queue("direct_queue");
}//建一个交换机,给交换机起个名字directExchange
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
//建立绑定关系,把建的队列给交换机后起个名字direct
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
}
}
写一个测试类把消息存入RabbitMQ中
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct):"+id);
amqpTemplate.convertAndSend("directExchange","direct",id);
}
再写一个消费者方法,取消费生产的消息
@Component
public class MessageListener {
//queues里的队列名 可以是多个,以数组方式 填入
@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("已完成短信发送业务(rabbitmq direct) id:"+id);
}
}
四级标题 注:如需使用topic模式,需要在新建交换机的时候使用TopicExchange类,队列匹配支持模糊匹配,一个消息可以被多个消费者消费
:用来表示一个单词,且该单词是必须出现的
#:用来表示任意数量
|匹配键|topic..*|topic.#|
|-------|-------|-------|
|topic.order.id|true|true|
|order.order.id|false| false |
|topic.sm.order.id| false |true|
|topic.sm.id| false |true|
|topic.id.order|true|true|
|topic.id| false |true|
|topic.order| false |true|
- Kafka
- 先开启zookeeper注册中心
./zookeeper-server-start.sh ../config/zookeeper.properties

- 开启kafka服务
./kafka-server-start.sh ../config/server.properties
- 使用命令创建topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 1 --partitions 1 --topic ypf
- springboot中使用,在pom中引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 配置application.yml文件
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order
- 编写方法进行调用
生产消息
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(kafka):"+id);
kafkaTemplate.send("ypf",id);
}
消费消息
@Component
public class MessageListener {
@KafkaListener(topics = "ypf")
public void onMessage(ConsumerRecord<String,String> record){
System.out.println("已完成短信发送业务(kafka) id:"+record.value());
}
}
- 感谢你赐予我前进的力量

