发表于: 2020-01-13 20:07:30

3 1278


啥也不说就是干!!

今天完成的事情:

1、RabbitMQ 入门学习

Linux 准备安装程序

yum install build-esstntial openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

wget https://www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.suse.noarch.rpm

rpm -ivh 安装包

AMQP 核心概念

Server 又称 Broker 接收客户端的连接,实现 AMQP 实体服务

Connection 应用程序与 Broker 的网络连接

Channel 网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道。客户端可建立多个 Channel,每个 Channel 代表一个会话任务

Message 消息,服务器与应用程序之间传递的数据,由 Properties 和 Body 组成。Properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body 则就是消息体内容

virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个 Virtual Host里面可以有若干个 Exchange 和 Queue,同一个 Virtual Host里面不能有同名称的 Exchange 或 Queue

Exchange:交换机,接收消息,根据路邮键转发消息到绑定的队列

Binding:Exchange 和 Queue 之间的虚拟连接,binding中可以包含 routing key

Routing key :一个路由规则,虚拟机可用它来确定如何路由一个特定的消息

Queue 也称为 Message Queue 消息队列,保存消息并将它们转发给消费者

消息流转机制:

消息到达MQ 时候,先通过 VirtualHost 找到对应名称的 Exchange,然后根据绑定的 RoutingKey 找到对应的消息队列,然后将消息放入队列中

通过 RabbitManage 创建 Exchange 和 MessageQueue

然后绑定两者之间的关系

2、RabbitMQ 与 SpringBoot 的整合

1、引入相关依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


2、对 applicaion.yml 进行配置

spring:
  rabbitmq:
  addresses: f0t1.top:5672
  username: guest
  password: guest
  virtual-host: /
  connection-timeout: 15s

address:mq 服务器的 ip+端口

username & password: ebin/rabbitmq.app 中配置的

virtual-host 自己定义

connection-timeout 设置超时时间

3、消息生产者

@Component
public class OrderSender {

@Autowired
   private RabbitTemplate rabbitTemplate;
   
   public void sendOrder(OrderInfo orderInfo) throws Exception{
CorrelationData correlationData = new CorrelationData();
       correlationData.setId(orderInfo.getMessageId());
       rabbitTemplate.convertAndSend("order-exchange",
               "order.abcd", //消息路由键
                orderInfo,             //发送内容
                correlationData);                      //消息唯一 id
   }
}

直接注入 RabbitTemplate 调用 convertAndSend 方法

参数分别为 : 交换机名称、消息路由键,消息体,消息唯一ID(一般将业务主键 id 作为消息 ID)

4、消息消费者

@Component
public class OrderRecevier {

@RabbitHandler
   @RabbitListener(
bindings = @QueueBinding(
value = @Queue(value="order-queue",durable = "true"),
                   exchange = @Exchange(name = "order-exchange",durable = "true",
                   type = "topic"),
                   key = "order.#"
           )
)
public void onOrderMessage(@Payload OrderInfo orderInfo,
                              @Headers Map<String,Object> headers,
                              Channel channel) throws Exception{
//消费者消费消息
       System.out.println("收到消息,开始消费....");
       System.out.println("订单 id:"+orderInfo.getId());

       Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
       //给 MQ 一个反馈
       channel.basicAck(deliverTag,false);

   }
}

通过注解 @RabbitHandler 中的 RabbitListener 指定该方法是用来处理消息的

其中 @RabbltListener 中 

@QueueBing 指定 @Queue @Exchange 名称跟在 RabbitManage 中创建的对应

key 指定 RoutingKey 两者的绑定关系

其实这些注解还有个强大的功能,如果 RabbitManager 中没有创建任何指定关系,那么这些注解会自动创建到 RabbitManage 后台

5、测试

先启动消费者进行监听,然后再启动生产者发送消息

@Test
public void sendMsgToMQ(){
OrderInfo orderInfo = new OrderInfo();
  orderInfo.setId("123");
  orderInfo.setName("order123");
  orderInfo.setMessageId(System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());
  try {
orderSender.sendOrder(orderInfo);
  } catch (Exception e) {
e.printStackTrace();
  }
}

可以看到消息已经被消费


明天计划的事情:

将短信接口 API 用消息队列形式接入项目

遇到的问题:

收获:

初步学习了 RabbitMQ 的使用


返回列表 返回列表
评论

    分享到