消息中间件
1. 简介
1.1 概念
维基百科对消息中间件的解释:面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件。
消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。
异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。
1.2 自定义消息中间件
使用java BlockingQueue(阻塞队列)实现生产者, 消费者
示例代码 rabbitmq-demo/demo01_blocking_queue: https://gitee.com/ixinglan/rabbitmq-demo.git
1.3 主流消息中间件及选型
在传统金融机构、银行、政府机构等有一些老系统还在使用IBM等厂商提供的商用MQ产品。
当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka这三款。
Redis在某种程度上也可以是实现类似“Queue”和“Pub/Sub”的机制,严格意义上不算消息中间件。
选取原则
首先,产品应该是开源的。开源意味着如果队列使用中遇到bug,可以很快修改,而不用等待开发者的更新。
其次,产品必须是近几年比较流行的,要有一个活跃的社区。这样遇到问题很快就可以找到解决方法。同时流行也意味着bug较少。流行的产品一般跟周边系统兼容性比较好。
最后,作为消息队列,要具备以下几个特性:
1、消息传输的可靠性:保证消息不会丢失。 2、支持集群,包括横向扩展,单点故障都可以解决。3、性能要好,要能够满足业务的性能需求。
RabbitMQ
RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。
优点: 1.轻量级,快速,部署使用方便 2.支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。 3.RabbitMQ的客户端支持大多数的编程语言。 缺点: 1.如果有大量消息堆积在队列中,性能会急剧下降 2.RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应 用要求高的性能,不要选择RabbitMQ。 3.RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。
RocketMQ
RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。
RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。经过了历次的双11考验,性能,稳定性可可靠性没的说。
RocketMQ几乎具备了消息队列应该具备的所有特性和功能, 对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。
性能比RabbitMQ高一个数量级,每秒处理几十万的消息
缺点: 跟其他系统的整合和兼容不是很好
Kafka
Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。
跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafka。
Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。
Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息。
如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。但是由于是异步的和批处理的,延迟也会高,不适合电商场景。
1.4 应用场景
消息中间件的使用场景非常广泛,比如,12306购票的排队锁座,电商秒杀,大数据实时计算等
如电商秒杀案例:
当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?
在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发的写请求呢?
系统应该如何应对高并发的读请求
使用缓存策略将请求挡在上层中的缓存中 能静态化的数据尽量做到静态化 加入限流(比如对短时间之内来自某一个用户,某一个IP、某个设备的重复请求做丢弃处理)
系统应该如何应对高并发的写请求
生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在1s内,有1万个数据连接同时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用消息队列。
消息队列的作用: 削去秒杀场景下的峰值写流量——流量削峰 通过异步处理简化秒杀请求中的业务流程——异步处理 解耦,实现秒杀系统模块之间松耦合——解耦
削去秒杀场景下的峰值写流量
将秒杀请求暂存于消息队列,业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去处理其它用户的请求。
削峰填谷
削平短暂的流量高峰,消息堆积会造成请求延迟处理,但秒杀用户对于短暂延迟有一定容忍度。
秒杀商品有1000件,处理一次购买请求的时间是500ms,那么总共就需要500s的时间。这时你部署10个队列处理程序,那么秒杀请求的处理时间就是50s,也就是说用户需要等待50s才可以看到秒杀的结果,这是可以接受的。这时会并发10个请求到达数据库,并不会对数据库造成很大的压力。
通过异步处理简化秒杀请求中的业务流程
先处理主要的业务,异步处理次要的业务。如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积分。
此时秒杀只要处理生成订单,扣减库存的耗时,发放优惠券、增加用户积分异步去处理了
解耦
实现秒杀系统模块之间松耦合
将秒杀数据同步给数据团队,有两种思路:
使用 HTTP 或者 RPC 同步调用,即提供一个接口,实时将数据推送给数据服务。系统的耦合度高,如果其中一个服务有问题,可能会导致另一个服务不可用。
使用消息队列
将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理。
2. JMS规范和AMQP协议
JMS即Java消息服务(JavaMessageService)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Messag eoriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。
2.1 JMS经典模式
2.1.1 JMS消息
消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。
报文头包括消息头字段和消息头属性。字段是JMS协议规定的字段,属性可以由用户按需添加。
JMS报文头全部字段:
消息主体则携带着应用程序的数据或有效负载
消息类型
1.简单文本(TextMessage)
2.可序列化的对象(ObjectMessage)
3.属性集合(MapMessage)
4.字节流(BytesMessage)
5.原始值流(StreamMessage)
6.无有效负载的消息(Message)。
2.1.2 体系架构
元素构成:
- JMS供应商产品: 即接口的实现
- JMS Client
- JMS Producer
- JMS Consumer
- JMS Message: 传递的数据对象
- JMS Queue
- JMS Topic: Pub/Sub模式
2.1.3 对象模型
- ConnectionFactory接口(连接工厂)
- Connection接口(连接)
- Destination接口(目标)
- Session接口(会话)
- MessageConsumer接口(消息消费者)
- MessageProducer接口(消息生产者)
- Message接口(消息)
2.1.4 模式
点对点或队列模型
一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列
一条消息只有一个消费者获得 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行状态。 每一个成功处理的消息要么自动确认,要么由接收者手动确认。
发布/订阅模式
支持向一个特定的主题发布消息。 0或多个订阅者可能对接收特定消息主题的消息感兴趣。 发布者和订阅者彼此不知道对方。 多个消费者可以获得消息
在发布者和订阅者之间存在时间依赖性。
发布者需要建立一个主题,以便客户能够订阅。 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。
2.1.5 传递方式
标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递
如果一个JMS服务下线,持久性消息不会丢失,等该服务恢复时再传递。
默认的消息传递方式是非持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。
2.1.6 供应商
开源软件:
1.ApacheActiveMQ
2.RabbitMQ
3.RocketMQ
4.JBoss社区所研发的HornetQ
5.Joram
6.Coridan的MantaRay
7.TheOpenJMSGroup的OpenJMS
专有供应商:
1.BEA的BEAWebLogicServerJMS
2.TIBCOSoftware的EMS
3.GigaSpacesTechnologies的GigaSpaces
4.Softwired2006的iBus
5.IONATechnologies的IONAJMS
6.SeeBeyond的IQManager(2005年8月被SunMicrosystems并购)
7.webMethods的JMS+-
8.my-channels的Nirvana
9.SonicSoftware的SonicMQ
10.SwiftMQ的SwiftMQ
11.IBM的WebSphereMQ
2.2 JMS在集群中的问题
生产中应用基本上都是以集群部署的。在Queue模式下,消息的消费没有什么问题,因为不同节点的相同应用会抢占式地消费消息,这样还能分摊负载。
如果使用Topic广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操作,这样就重复消费了。。。
方案一:选择Queue模式,创建多个一样的Queue,每个应用消费自己的Queue。
弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。
方案二:选择Topic模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。
弊端:对业务侵入较大,不是优雅的解决方法。
ActiveMQ通过“虚拟主题”解决了这个问题。
JMS规范文档(jms-1_1-fr-spec.pdf)下载地址:https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/
JMS是JEE平台的标准消息传递API。它可以在商业和开源实现中使用。每个实现都包括一个JMS服务器,一个JMS客户端库,以及用于管理消息传递系统的其他特定于实现的组件。JMS提供程序可以是消息传递服务的独立实现,也可以是非JMS消息传递系统的桥梁。
JMS客户端API是标准化的,因此JMS应用程序可在供应商的实现之间移植。但是:
底层消息传递实现未指定,因此JMS实现之间没有互操作性。除非存在桥接技术,否则想要共享消息传递的Java应用程序必须全部使用相同的JMS实现。
如果没有供应商特定的JMS客户端库来启用互操作性,则非Java应用程序将无法访问JMS。
AMQP 0-9-1是一种消息传递协议,而不是像JMS这样的API。任何实现该协议的客户端都可以访问支持AMQP 0-9-1的代理。
协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的AMQP 0-9-1客户端都可以参与消息传递系统,而无需桥接不兼容的服务器实现。
2.3 AMQP协议剖析
协议架构
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0
AMQP中的概念
- Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。
- Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个queue中接收消息。
- Server:一个具体的MQ服务实例,也称为Broker。
- Virtualhost:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、MessageQueue。
- Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的MessageQueue中。
- Routingkey:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通常需要和具体的Exchange类型、Binding的Routingkey结合起来使用。
- Bindings:指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routingkey和Binding配置(绑定关系、Binding、Routingkey等)来决定把消息分派到哪些具体的queue中。这依赖于Exchange类型。
- MessageQueue:实际存储消息的容器,并把消息传递给最终的Consumer。
AMQP传输层架构
概述
AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
我们假定有一个可靠的面向流的网络传输层(TCP/IP或等价的协议)。
在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序列传输。
我们使用小的数据类型来构造数据帧,如bit,integer,string以及字段表。数据帧的字段做了轻微的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。
线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)。我们假定AMQP会扩展,改进以及随时间的其他变化,并要求wire-level格式支持这些变化。
数据类型
Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。 Bits(统一为8个字节):用于表示开/关值。 Shortstrings:用于保存简短的文本属性,字符串个数限制为255,8个字节 Longstrings:用于保存二进制数据块。 Fieldtables:包含键值对,字段值一般为字符串,整数等。
协议协商
AMQP客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件。
在AMQP中,我们需要协商协议的一些特殊方面:
1、真实的协议和版本。服务器可能在同一个端口支持多个协议。
2、双方的加密参数和认证方式。这是功能层的一部分。
3、数据帧最大大小,通道数量以及其他操作限制。
对限制条件的认同可能会导致双方重新分配key的缓存,避免死锁。每个发来的数据帧要么遵守认同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一切工作正常,要么完全不工作”的RabbitMQ哲学。
数据帧界定
TCP/IP是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:
1.每个连接发送单一数据帧。简单但是慢。 2.在流中添加帧的边界。简单,但是解析很慢。 3.计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP的选择。
AMPQ客户端实现JMS客户端
RabbitMQ的JMS客户端用RabbitMQJava客户端实现,既与JMS API兼容,也与AMQP0-9-1协议兼容。
RabbitMQJMS客户端不支持某些JMS1.1功能
- JMS客户端不支持服务器会话。
- XA事务支持接口未实现。
- RabbitMQJ MS主题选择器插件支持主题选择器。队列选择器尚未实现。
- 支持RabbitMQ连接的SSL和套接字选项,但仅使用RabbitMQ客户端提供的(默认)SSL连接协议。
- RabbitMQ不支持JMS NoLocal订阅功能,该功能禁止消费者接收通过消费者自己的连接发布的消息。可以调用包含NoLocal参数的方法,但该方法将被忽略。
RabbitMQ使用amqp协议,JMS规范仅对于Java的使用作出的规定,跟其他语言无关,协议是语言无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。