安装过程不再演示

  • ActiveMQ
  1. 引入pom依赖
	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  1. application.yml配置相关基础信息
spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    template:
      default-destination: ypf
    pub-sub-domain: true  (默认FALSE 为点对点模式,TRUE 为开启订阅模式)
  1. 编写测试方法

生产消息,把消息存入自己建的队列中

 @Autowired
    private JmsMessagingTemplate messagingTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列:"+id);
        messagingTemplate.convertAndSend("order.queue.id",id);
    }

消费消息,新建一个listener,里面写拿到消息后要做的事情

@Component
public class MessageListener {

    //监控指定队列中的消息,一有消息就消费掉
    @JmsListener(destination = "order.queue.id")
    //消费当前队列消息后,把返回值传入下个队列
    @SendTo("order.other.queue.id")
    public String receive(String id){
        System.out.println("已完成短信发送业务 id:"+id);
        return "new>>>>>>>>>>"+id;
    }
}
  • RabbitMQ
  1. 引入pom依赖

	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  1. application.yml配置相关基础信息
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest 
    password: guest

  1. 编写测试方法
    编写配置类
    示例配置类中配置了,RabbitMQ的 队列名称、队列交换机以及 队列和交换机绑定的操作
    @Configuration
    public class RabbitConfigDirect {
    //新建队列,起个名称direct_queue
    @Bean
    public Queue directQueue(){
    return new Queue("direct_queue");
    }

    //建一个交换机,给交换机起个名字directExchange
    @Bean
    public DirectExchange directExchange(){
    return new DirectExchange("directExchange");
    }
    //建立绑定关系,把建的队列给交换机后起个名字direct
    @Bean
    public Binding bindingDirect(){
    return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
    }

}

写一个测试类把消息存入RabbitMQ中

@Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct):"+id);
        amqpTemplate.convertAndSend("directExchange","direct",id);
    }

再写一个消费者方法,取消费生产的消息

@Component
public class MessageListener {

    //queues里的队列名 可以是多个,以数组方式 填入
    @RabbitListener(queues = "direct_queue")
    public void receive(String id){
        System.out.println("已完成短信发送业务(rabbitmq direct) id:"+id);
    }
}

四级标题 注:如需使用topic模式,需要在新建交换机的时候使用TopicExchange类,队列匹配支持模糊匹配,一个消息可以被多个消费者消费

:用来表示一个单词,且该单词是必须出现的
#:用来表示任意数量
|匹配键|topic.
.*|topic.#|
|-------|-------|-------|
|topic.order.id|true|true|
|order.order.id|false| false |
|topic.sm.order.id| false |true|
|topic.sm.id| false |true|
|topic.id.order|true|true|
|topic.id| false |true|
|topic.order| false |true|

  • Kafka
  1. 先开启zookeeper注册中心
./zookeeper-server-start.sh ../config/zookeeper.properties

image.png

  1. 开启kafka服务
./kafka-server-start.sh ../config/server.properties 
  1. 使用命令创建topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 1 --partitions 1 --topic ypf

  1. springboot中使用,在pom中引入依赖
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  1. 配置application.yml文件
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order
  1. 编写方法进行调用
    生产消息
@Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳入处理队列(kafka):"+id);
        kafkaTemplate.send("ypf",id);
    }

消费消息

@Component
public class MessageListener {


    @KafkaListener(topics = "ypf")
    public void onMessage(ConsumerRecord<String,String> record){
        System.out.println("已完成短信发送业务(kafka) id:"+record.value());
    }
}