第一代Spring Cloud核心组件
说明:上⾯提到⽹关组件Zuul性能⼀般,未来将退出SpringCloud⽣态圈,所以我们直接讲解GateWay,我们就把GateWay划分到第⼀代Spring Cloud核⼼组件这⼀部分了。
从形式上来说,Feign⼀个顶三,Feign=RestTemplate+Ribbon+Hystrix
一. Eureka服务注册中心
1.1 关于注册中心
注意:服务注册中⼼本质上是为了解耦服务提供者和服务消费者。
对于任何⼀个微服务,原则上都应存在或者⽀持多个提供者(⽐如简历微服务部署多个实例),这是由微服务的分布式属性决定的。 更进⼀步,为了⽀持弹性扩缩容特性,⼀个微服务的提供者的数量和分布往往是动态变化的,也是⽆法预先确定的。因此,原本在单体应⽤阶段常⽤的静态LB机制就不再适⽤了,需要引⼊额外的组件来管理微服务提供者的注册与发现,⽽这个组件就是服务注册中⼼。
-
一般原理
分布式微服务架构中,服务注册中⼼⽤于存储服务提供者地址信息、服务发布相关的属性信息,消费者通过主动查询和被动通知的⽅式获取服务提供者的地址信息,⽽不再需要通过硬编码⽅式得到提供者的地址信息。消费者只需要知道当前系统发布了那些服务,⽽不需要知道服务具体存在于什么位置,这就是透明化路由。
1)服务提供者启动
2)服务提供者将相关服务信息主动注册到注册中⼼
3)服务消费者获取服务注册信息:
- push模式:服务消费者订阅服务(当服务提供者有变化时,注册中⼼也会主动推送更新后的服务清单给消费者 - pull模式:服务消费者可以主动拉取可⽤的服务提供者清单
4)服务消费者直接调⽤服务提供者
另外,注册中⼼也需要完成服务提供者的健康监控,当发现服务提供者失效时需要及时剔除;
-
主流服务中⼼对⽐
-
Zookeeper Zookeeper它是⼀个分布式服务框架,是ApacheHadoop的⼀个⼦项⽬,它主要是⽤来解决分布式应⽤中经常遇到的⼀些数据管理问题,如:统⼀命名服务、状态同步服务、集群管理、分布式应⽤配置项的管理等。
简单来说zookeeper本质=存储+监听通知。 Zookeeper⽤来做服务注册中⼼,主要是因为它具有节点变更通知功能,只要客户端监听相关服务节点,服务节点的所有变更,都能及时的通知到监听客户端,这样作为调⽤⽅只要使⽤Zookeeper的客户端就能实现服务节点的订阅和变更通知功能了,⾮常⽅便。另外,Zookeeper可⽤性也可以,因为只要半数以上的选举节点存活,整个集群就是可⽤的
-
Eureka 由Netflix开源,并被Pivatal集成到SpringCloud体系中,它是基于RestfulAPI ⻛格开发的服务注册与发现组件。
-
Consul Consul是由HashiCorp基于Go语⾔开发的⽀持多数据中⼼分布式⾼可⽤的服务发布和注册服务软件,采⽤Raft算法保证服务的⼀致性,且⽀持健康检查。
-
Nacos Nacos是⼀个更易于构建云原⽣应⽤的动态服务发现、配置管理和服务管理平台。简单来说Nacos就是注册中⼼+配置中⼼的组合,帮助我们解决微服务开发必会涉及到的服务注册与发现,服务配置,服务管理等问题。Nacos是SpringCloudAlibaba核⼼组件之⼀,负责服务注册与发现,还有配置。
-
1.2 Eureka
-
基础架构
-
交互流程及原理
Eureka通过⼼跳检测、健康检查和客户端缓存等机制,提⾼系统的灵活性、可伸缩性和可⽤性。
1.3 Eureka应用及高可用集群
1)单实例EurekaServer—>访问管理界⾯—>EurekaServer集群
2)服务提供者(简历微服务注册到集群)
3)服务消费者(⾃动投递微服务注册到集群/从EurekaServer集群获取服务信息)
4)完成调⽤
- 接下来我们直接搭建一个多例Ha高可用集群
1. 父工程引入spring-cloud-dependencied, 见代码示例
2. 父工程引入jaxb的jar,因为Jdk9之后默认没有加载该模块,EurekaServer使⽤到,所以需要⼿动导⼊,否则EurekaServer服务⽆法启动
3. 创建两个子工程cloud-eureka-server-8761, cloud-eureka-server-8762
4. 在两个子工程中引入eureka-server依赖
<dependencies>
<!--Eureka server依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
5. 修改本机host属性,这样我们在配置文件中的hostname就可以用自定义名称表示,将127.0.0.1映射成一个不同名称的集群
127.0.0.1 CloudEurekaServerA
127.0.0.1 CloudEurekaServerB
6. 添加applicaton.yml配置文件, 见代码示例
7. 添加SpringBoot启动类
8. 启动后,访问http://cloudeurekaservera:8762/ 即可看到两个都注册了
-
微服务提供者—>注册到EurekaServer集群
注册简历微服务(简历服务部署两个实例,分别占⽤8080、8081端⼝)
1. 父工程引入spring-cloud-commons依赖 2. pom 文件中引入spring-cloud-starter-netflix-eureka-client 3. applicaiton.yml 添加eureka server高可用集群地址相关配置 4. 启动类添加注解 @EnableEurekaClient // 开启Eureka Client(Eureka独有) // 开启注册中心客户端 (通用型注解,比如注册到Eureka、Nacos等) // 说明:从SpringCloud的Edgware版本开始,不加注解也ok,但是建议大家加上 @EnableDiscoveryClient 5. 分别启动后,即可看到注册服务
Tips:
1)从SpringCloudEdgware版本开始,@EnableDiscoveryClient或 @EnableEurekaClient可省略。只需加上相关依赖,并进⾏相应配置,即可将微服务注册到服务发现组件上。
2)@EnableDiscoveryClient和@EnableEurekaClient⼆者的功能是⼀样的。但是如果选⽤的是eureka服务器,那么就推荐@EnableEurekaClient,如果是其他的注册中⼼,那么推荐使⽤@EnableDiscoveryClient,考虑到通⽤性,后期我们可以使⽤@EnableDiscoveryClient
-
微服务消费者—>注册到EurekaServer集群
1. 添加spring-cloud-starter-netflix-eureka-client依赖 2. application.yml 配置,与提供才配置一样 3. 启动类添加 @EnableDiscoveryClient
-
服务消费者调⽤服务提供者(通过Eureka)
见示例代码
1.4 eureka细节详解
-
元数据
标准元数据:主机名、IP地址、端⼝号等信息,这些信息都会被发布在服务注册表中,⽤于服务之间的调⽤。
⾃定义元数据:可以使⽤eureka.instance.metadata-map配置,符合KEY/VALUE的存储格式。这些元数据可以在远程客户端中访问。
我们可以在程序中可以使⽤DiscoveryClient获取指定微服务的所有元数据信息
-
客户端
服务提供者(也是Eureka客户端)要向EurekaServer注册服务,并完成服务续约等⼯作
服务注册详解(服务提供者)
1)当我们导⼊了eureka-client依赖坐标,配置Eureka服务注册中⼼地址
2)服务在启动时会向注册中⼼发起注册请求,携带服务元数据信息
3)Eureka注册中⼼会把服务的信息保存在Map中。
服务续约详解(服务提供者)
服务每隔30秒会向注册中⼼续约(⼼跳)⼀次(也称为报活),如果没有续约,租约在90秒后到期,然后服务会被失效。每隔30秒的续约操作我们称之为⼼跳检测, 默认配置,不需要调整
如果想修改:
#向Eureka服务中⼼集群注册服务 eureka: instance: #租约续约间隔时间,默认30秒 lease-renewal-interval-in-seconds:30 #租约到期,服务时效时间,默认值90秒,服务超过90秒没有发⽣⼼跳,EurekaServer会将服务从列表移除 lease-expiration-duration-in-seconds:90
获取服务列表详解(服务消费者)
每隔30秒服务会从注册中⼼中拉取⼀份服务列表,这个时间可以通过配置修改。往往不需要我们调整
#向Eureka服务中⼼集群注册服务 eureka: client: #每隔多久拉取⼀次服务列表 registry-fetch-interval-seconds:30
-
服务端
服务下线
1)当服务正常关闭操作时,会发送服务下线的REST请求给EurekaServer。
2)服务中⼼接受到请求后,将该服务置为下线状态
失效剔除
EurekaServer会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认60s)进⾏检查,如果发现实例在在⼀定时间(此值由客户端设置的eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)内没有收到⼼跳,则会注销此实例。
EurekaServer会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认60s)进⾏检查,如果发现实例在在⼀定时间(此值由客户端设置的 eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)内没有收到⼼跳,则会注销此实例。 ⾃我保护
服务提供者—>注册中⼼
定期的续约(服务提供者和注册中⼼通信),假如服务提供者和注册中⼼之间的⽹络有点问题,不代表服务提供者不可⽤,不代表服务消费者⽆法访问服务提供者
如果在15分钟内超过85%的客户端节点都没有正常的⼼跳,那么Eureka就认为客户端与注册中⼼出现了⽹络故障EurekaServer⾃动进⼊⾃我保护机制。
为什么会有⾃我保护机制?
默认情况下,如果EurekaServer在⼀定时间内(默认90秒)没有接收到某个微服务实例的⼼跳,EurekaServer将会移除该实例。但是当⽹络分区故障发⽣时,微服务与EurekaServer之间⽆法正常通信,⽽微服务本身是正常运⾏的,此时不应该移除这个微服务,所以引⼊了⾃我保护机制。
当处于⾃我保护模式时
1)不会剔除任何服务实例(可能是服务提供者和EurekaServer之间⽹络问题),保证了⼤多数服务依然可⽤
2)EurekaServer仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上,保证当前节点依然可⽤,当⽹络稳定时,当前EurekaServer新的注册信息会被同步到其它节点中。
3)在EurekaServer⼯程中通过eureka.server.enable-self-preservation配置可⽤关停⾃我保护,默认值是打开
eureka: server: enable-self-preservation:false#关闭⾃我保护模式(缺省为打开)
1.5 源码剖析
略过……有时间可以从以下分析: 启动过程, 服务接口暴露策略, 服务注册接口, 服务续约接口, 注册服务, 下架服务, 心跳续约等
二. Ribbon负载均衡
2.1 负载均衡
负载均衡⼀般分为服务器端负载均衡和客户端负载均衡
服务器端负载均衡: ⽐如Nginx、F5这些,请求到达服务器之后由这些负载均衡器根据⼀定的算法将请求路由到⽬标服务器处理。
所谓客户端负载均衡: ⽐如我们要说的Ribbon,服务消费者客户端会有⼀个服务器地址列表,调⽤⽅在请求前通过⼀定的负载均衡算法选择⼀个服务器进⾏访问,负载均衡算法的执⾏是在请求客户端进⾏。
Ribbon是Netflix发布的负载均衡器。Eureka⼀般配合Ribbon进⾏使⽤,Ribbon利⽤从Eureka中读取到服务信息,在调⽤服务提供者提供的服务时,会根据⼀定的算法进⾏负载。
2.2 高级应用
不需要引⼊额外的Jar坐标,因为在服务消费者中我们引⼊过eureka-client,它会引⼊Ribbon相关Jar
-
代码中使⽤如下,在RestTemplate上添加对应注解即可
@Bean //Ribbon负载均衡 @LoadBalanced public RestTemplate getRestTemplate() { return new RestTemplate(); }
-
服务提供都提供测试接口: /port 返回端口号,见示例代码
-
客户端测试调用: ip端口号换成服务名
/** * 使用Ribbon负载均衡 * * @param * @return */ @GetMapping("/ribbon") public Integer ribbon() { // 使用ribbon不需要我们自己获取服务实例然后选择一个那么去访问了(自己的负载均衡) String url = "http://demo-service-resume/resume/port/"; // 指定服务名 Integer forObject = restTemplate.getForObject(url, Integer.class); return forObject; }
Tips:
测试发现, 返回的端口号8081,8080在动态变化, 说明进行了负载均衡
2.3 ribbon负载均衡策略
Ribbon内置了多种负载均衡策略,内部负责复杂均衡的顶级接⼝为com.netflix.loadbalancer.IRul
策略 | 描述 |
---|---|
RoundRobinRule:轮询策略 | 默认超过10次获取到的server都不可⽤,会返回⼀个空的server |
RandomRule:随机策略 | 如果随机到的server为null或者不可⽤的话,会while不停的循环选取 |
RetryRule:重试策略 | ⼀定时限内循环重试。默认继承RoundRobinRule,也⽀持⾃定义注⼊,RetryRule会在每次选取之后,对选举的server进⾏判断,是否为null,是否alive,并且在500ms 内会不停的选取判断。⽽RoundRobinRule失效的策略是超过10次,RandomRule是没有失效时间的概念,只要serverList没都挂。 |
BestAvailableRule:最⼩连接数策略 | 遍历serverList,选取出可⽤的且连接数最⼩的⼀个server。该算法⾥⾯有⼀个LoadBalancerStats 的成员变量,会存储所有server的运⾏状况和连接数。如果选取到的server为null,那么会调⽤RoundRobinRule重新选取。1(1)2(1)3(1) |
AvailabilityFilteringRule:可⽤过滤策略 | 扩展了轮询策略,会先通过默认的轮询选取⼀个server,再去判断该server是否超时可⽤,当前连接数是否超限,都成功再返回。 |
ZoneAvoidanceRule:区域权衡策略(默认策略) | 扩展了轮询策略,继承了2个过滤器:ZoneAvoidancePredicate和AvailabilityPredicate,除了过滤超时和链接数过多的server,还会过滤掉不符合要求的zone区域⾥⾯的所有节点,AWS–ZONE在⼀个区域/机房内的服务实例中轮询 |
修改负载策略:
#针对的被调⽤⽅微服务名称,不加就是全局⽣效
demo-service-resume:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule #负载策略调整
2.4 核心源码剖析
- 工作原理:
-
重点:Ribbon给restTemplate添加了⼀个拦截器
-
细节结构
1)获取服务实例列表 2)从列表中选择⼀个server
图中核⼼是负载均衡管理器LoadBalancer(总的协调者,相当于⼤脑,为了做事情,协调四肢),围绕它周围的多有IRule、IPing等
IRule:是在选择实例的时候的负载均衡策略对象
IPing:是⽤来向服务发起⼼跳检测的,通过⼼跳检测来判断该服务是否可⽤
ServerListFilter:根据⼀些规则过滤传⼊的服务实例列表
ServerListUpdater:定义了⼀系列的对服务列表的更新操作
略过…..后续有时间再分析, 可以从以下分析: 加载时机, RoundRobinRule轮询策略, RandomRule随机策略入手
三. Hystrix熔断器
3.1 微服务雪崩效应
扇⼊:代表着该微服务被调⽤的次数,扇⼊⼤,说明该模块复⽤性好
扇出:该微服务调⽤其他微服务的个数
扇出⼤,说明业务逻辑复杂扇⼊⼤是⼀个好事,扇出⼤不⼀定是好事
3.2 雪崩效应解决方案
-
服务熔断 熔断机制是应对雪崩效应的⼀种微服务链路保护机制。我们在各种场景下都会接触到熔断这两个字。⾼压电路中,如果某个地⽅的电压过⾼,熔断器就会熔断,对电路进⾏保护。股票交易中,如果股票指数过⾼,也会采⽤熔断机制,暂停股票的交易。同样,在微服务架构中,熔断机制也是起着类似的作⽤。当扇出链路的某个微服务不可⽤或者响应时间太⻓时,熔断该节点微服务的调⽤,进⾏服务的降级,快速返回错误的响应信息。当检测到该节点微服务调⽤响应正常后,恢复调⽤链路。
注意:
1)服务熔断重点在“断”,切断对下游服务的调⽤
2)服务熔断和服务降级往往是⼀起使⽤的,Hystrix就是这样。
-
服务降级 通俗讲就是整体资源不够⽤了,先将⼀些不关紧的服务停掉(调⽤我的时候,给你返回⼀个预留的值,也叫做兜底数据),待渡过难关⾼峰过去,再把那些服务打开。
服务降级⼀般是从整体考虑,就是当某个服务熔断之后,服务器将不再被调⽤,此刻客户端可以⾃⼰准备⼀个本地的fallback回调,返回⼀个缺省值,这样做,虽然服务⽔平下降,但好⽍可⽤,⽐直接挂掉要强。
-
服务限流
服务降级是当服务出问题或者影响到核⼼流程的性能时,暂时将服务屏蔽掉,待⾼峰或者问题解决后再打开;但是有些场景并不能⽤服务降级来解决,⽐如秒杀业务这样的核⼼功能,这个时候可以结合服务限流来限制这些场景的并发/请求量
限制总并发数(⽐如数据库连接池、线程池)
限制瞬时并发数(如nginx限制瞬时并发连接数)
限制时间窗⼝内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率)
限制远程接⼝调⽤速率、限制MQ的消费速率等
3.3 Hystrix简介
[来⾃官⽹]Hystrix(豪猪—–>刺),宣⾔“defendyourapp”是由Netflix开源的⼀个延迟和容错库,⽤于隔离访问远程系统、服务或者第三⽅库,防⽌级联失败,从⽽提升系统的可⽤性与容错性。Hystrix主要通过以下⼏点实现延迟和容错。
- 包裹请求:使⽤HystrixCommand包裹对依赖的调⽤逻辑。⾃动投递微服务⽅法(@HystrixCommand添加Hystrix控制)——调⽤简历微服务
- 跳闸机制:当某服务的错误率超过⼀定的阈值时,Hystrix可以跳闸,停⽌请求该服务⼀段时间。
- 资源隔离:Hystrix为每个依赖都维护了⼀个⼩型的线程池(舱壁模式)(或者信号量)。如果该线程池已满,发往该依赖的请求就被⽴即拒绝,⽽不是排队等待,从⽽加速失败判定。
- 监控:Hystrix可以近乎实时地监控运⾏指标和配置的变化,例如成功、失败、超时、以及被拒绝的请求等。
- 回退机制:当请求失败、超时、被拒绝,或当断路器打开时,执⾏回退逻辑。回退逻辑由开发⼈员⾃⾏提供,例如返回⼀个缺省值。
- ⾃我修复:断路器打开⼀段时间后,会⾃动进⼊“半开”状态。
3.4 熔断应用
⽬的:简历微服务⻓时间没有响应,服务消费者—>⾃动投递微服务快速失败给⽤户提示
-
服务消费者⼯程(⾃动投递微服务)中引⼊Hystrix依赖坐标(也可以添加在⽗⼯程中)
<!--熔断器Hystrix--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
-
服务消费者⼯程(⾃动投递微服务)的启动类中添加熔断器开启注解@EnableCircuitBreaker
-
定义服务降级处理⽅法,并在业务⽅法上使⽤@HystrixCommand的fallbackMethod属性关联到服务降级处理⽅法
降级(兜底)⽅法必须和被降级⽅法相同的⽅法签名(相同参数列表、相同返回值)
-
可以在类上使⽤@DefaultProperties注解统⼀指定整个类中共⽤的降级(兜底)⽅法
-
服务提供者端(简历微服务)模拟请求超时(线程休眠3s),只修改8080实例,8081不修改,对⽐观察
因为我们使用了ribbon负载, 所以会出现一次正常,一次fallback的情况
3.5 hystrix舱壁模式(线程池隔离策略)
如果不进⾏任何设置,所有熔断⽅法使⽤⼀个Hystrix线程池(10个线程),那么这样的话会导致问题,这个问题并不是扇出链路微服务不可⽤导致的,⽽是我们的线程机制导致的,如果⽅法A的请求把10个线程都⽤了,⽅法2请求处理的时候压根都没法去访问B,因为没有线程可⽤,并不是B服务不可⽤。
为了避免问题服务请求过多导致正常服务⽆法访问,Hystrix不是采⽤增加线程数,⽽是单独的为每⼀个控制⽅法创建⼀个线程池的⽅式,这种模式叫做“舱壁模式",也是线程隔离的⼿段。
可以通过以下方式观察线程信息
1.我们可以通过 jps命令查看java进程
2.使用jstack命令查看指定进程中的线程信息,如:jstack pid|grep hystrix
3.6 hystrix工作流程与高级应用
1)当调⽤出现问题时,开启⼀个时间窗(10s)
2)在这个时间窗内,统计调⽤次数是否达到最⼩请求数?
如果没有达到,则重置统计信息,回到第1步
如果达到了,则统计失败的请求数占所有请求数的百分⽐,是否达到阈值?
如果达到,则跳闸(不再请求对应服务)
如果没有达到,则重置统计信息,回到第1步
3)如果跳闸,则会开启⼀个活动窗⼝(默认5s),每隔5s,Hystrix会让⼀个请求通过,到达那个问题服务,看是否调⽤成功,如果成功,重置断路器回到第1步,如果失败,回到第3步
// hystrix高级配置,定制工作过程细节
// 统计时间窗口定义
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds",value = "8000"),
// 统计时间窗口内的最小请求数
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "2"),
// 统计时间窗口内的错误数量百分比阈值
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "50"),
// 自我修复时的活动窗口长度
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "3000")
也可在配置文件中配置
#配置熔断策略:
hystrix:
command:
default:
circuitBreaker:
#强制打开熔断器,如果该属性设置为true,强制断路器进⼊打开状态,将会拒绝所有的请求。默认false关闭的
forceOpen: false
#触发熔断错误⽐例阈值,默认值50%
errorThresholdPercentage: 50
#熔断后休眠时⻓,默认值5秒
sleepWindowInMilliseconds: 3000
#熔断触发最⼩请求次数,默认值是20
requestVolumeThreshold: 2
execution:
isolation:
thread:
#熔断超时设置,默认为1秒
timeoutInMilliseconds: 2000
基于springboot的健康检查观察跳闸状态(⾃动投递微服务暴露健康检查细节)
# springboot中暴露健康检查等断点接口
management:
endpoints:
web:
exposure:
include: "*"
# 暴露健康接口的细节
endpoint:
health:
show-details: always
访问健康检查接⼝:http://localhost:8090/actuator/health
正常
"hystrix": {
"status": "UP"
}
也有跳闸状态, 窗口内自我修复等状态
3.7 HystrixDashboard断路监控仪表盘
正常状态是UP,跳闸是⼀种状态CIRCUIT_OPEN,可以通过/health查看,前提是⼯程中需要引⼊SpringBoot的actuator(健康监控),它提供了很多监控所需的接⼝,可以对应⽤系统进⾏配置查看、相关功能统计等。
如果我们想看到Hystrix相关数据,⽐如有多少请求、多少成功、多少失败、多少降级等,那么引⼊SpringBoot健康监控之后,访问/actuator/hystrix.stream接⼝可以获取到监控的⽂字信息,但是不直观,所以Hystrix官⽅还提供了基于图形化的 DashBoard(仪表板)监控平台。Hystrix仪表板可以显示每个断路器(被@HystrixCommand注解的⽅法)的状态。
-
新建工程 cloud-hystrix-dashboard
-
导入依赖 见代码示例
-
启动类添加@EnableHystrixDashboard激活仪表盘
-
配置application.yml
-
在被监测的微服务中注册监控servlet(⾃动投递微服务,监控数据就是来⾃于这个微服务),即在autodeliver启动类中添加
被监控微服务发布之后,可以直接访问监控servlet,但是得到的数据并不直观,后期可以结合仪表盘更友好的展示
-
访问测试http://localhost:9000/hystrix
-
输入监控目标http://localhost:8090/actuator/hystrix.stream
百分⽐,10s内错误请求百分⽐
实⼼圆:
- ⼤⼩:代表请求流量的⼤⼩,流量越⼤球越⼤
- 颜⾊:代表请求处理的健康状态,从绿⾊到红⾊递减,绿⾊代表健康,红⾊就代表很不健康
曲线波动图:记录了2分钟内该⽅法上流量的变化波动图,判断流量上升或者下降的趋势
3.8 HystrixTurbine聚合监控
之前,我们针对的是⼀个微服务实例的Hystrix数据查询分析,在微服务架构下,⼀个微服务的实例往往是多个(集群化)
⽐如⾃动投递微服务
实例1(hystrix)ip1:port1/actuator/hystrix.stream
实例2(hystrix)ip2:port2/actuator/hystrix.stream
实例3(hystrix)ip3:port3/actuator/hystrix.stream
按照已有的⽅法,我们就可以结合dashboard仪表盘每次输⼊⼀个监控数据流url,进去查看
⼿⼯操作能否被⾃动功能替代?HystrixTurbine聚合(聚合各个实例上的hystrix监控数据)监控
Turbine(涡轮)
思考:微服务架构下,⼀个微服务往往部署多个实例,如果每次只能查看单个实例的监控,就需要经常切换很不⽅便,在这样的场景下,我们可以使⽤Hystrix
Turbine进⾏聚合监控,它可以把相关微服务的监控数据聚合在⼀起,便于查看。
-
创建工程 cloud-hystrix-turbine-9001
-
引入依赖 spring-cloud-starter-netflix-turbine
-
将需要进⾏Hystrix监控的多个微服务配置起来,在⼯程application.yml中开启Turbine及进⾏相关配置
-
在当前项⽬启动类上添加注解@EnableTurbine,开启仪表盘以及Turbine聚合
-
浏览器访问Turbine项⽬,http://localhost:9001/turbine.stream,就可以看到监控数据了
-
我们通过dashboard的⻚⾯查看数据更直观,把刚才的地址输⼊dashboard地址栏
3.8 核心源码分析
略过…. 感兴趣可以研究
四. Feign远程调用组件
服务消费者调⽤服务提供者的时候使⽤RestTemplate技术
4.1 feign简介
Feign是Netflix开发的⼀个轻量级RESTful的HTTP服务客户端(⽤它来发起请求,远程调⽤的),是以Java接⼝注解的⽅式调⽤Http请求,⽽不⽤像Java中通过封装HTTP请求报⽂的⽅式直接调⽤,Feign被⼴泛应⽤在SpringCloud的解决⽅案中。
类似于Dubbo,服务消费者拿到服务提供者的接⼝,然后像调⽤本地接⼝⽅法⼀样去调⽤,实际发出的是远程的请求。
- Feign可帮助我们更加便捷,优雅的调⽤HTTPAPI:不需要我们去拼接url然后呢调⽤restTemplate的api,在SpringCloud中,使⽤Feign⾮常简单,创建⼀个接⼝(在消费者–服务调⽤⽅这⼀端),并在接⼝上添加⼀些注解,代码就完成了
- SpringCloud对Feign进⾏了增强,使Feign⽀持了SpringMVC注解(OpenFeign)
本质:封装了Http调⽤流程,更符合⾯向接⼝化的编程习惯,类似于Dubbo的服务调⽤
4.2 feign配置应用
在服务调⽤者⼯程(消费)创建接⼝(添加注解)
效果: Feign=RestTemplate+Ribbon+Hystrix
-
创建工程 demo-service-autodeliver-8091
-
引入依赖 spring-cloud-starter-openfeign
-
启动类使⽤注解@EnableFeignClients
注意:此时去掉Hystrix熔断的⽀持注解@EnableCircuitBreaker即可包括引⼊的依赖,因为Feign会⾃动引⼊
-
创建Feign接⼝
1)@FeignClient注解的name属性⽤于指定要调⽤的服务提供者名称,和服务提供者yml⽂件中spring.application.name保持⼀致
2)接⼝中的接⼝⽅法,就好⽐是远程服务提供者Controller中的Hander⽅法(只不过如同本地调⽤了),那么在进⾏参数绑定的时,可以使⽤@PathVariable、@RequestParam、@RequestHeader等,这也是OpenFeign 对SpringMVC注解的⽀持,但是需要注意value必须设置,否则会抛出异常
-
使⽤接⼝中⽅法完成远程调⽤(注⼊接⼝即可,实际注⼊的是接⼝的实现)
4.3 feign对负载均衡的支持
Feign本身已经集成了Ribbon依赖和⾃动配置,因此我们不需要额外引⼊依赖,可以通过ribbon.xx来进⾏全局配置,也可以通过服务名.ribbon.xx来对指定服务进⾏细节配置配置(参考之前,此处略)
Feign默认的请求处理超时时⻓1s,有时候我们的业务确实执⾏的需要⼀定时间,那么这个时候,我们就需要调整请求处理超时时⻓,Feign⾃⼰有超时设置,如果配置Ribbon的超时,则会以Ribbon的为准
lagou-service-resume:
ribbon:
#请求连接超时时间
ConnectTimeout: 2000
#请求处理超时时间
##########################################Feign超时时长设置
ReadTimeout: 3000
#对所有操作都进行重试
OkToRetryOnAllOperations: true
####根据如上配置,当访问到故障请求的时候,它会再尝试访问一次当前实例(次数由MaxAutoRetries配置),
####如果不行,就换一个实例进行访问,如果还不行,再换一次实例访问(更换次数由MaxAutoRetriesNextServer配置),
####如果依然不行,返回失败信息。
MaxAutoRetries: 0 #对当前选中实例重试次数,不包括第一次调用
MaxAutoRetriesNextServer: 0 #切换实例的重试次数
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule #负载策略调整
4.4 feign对熔断器的支持
-
在Feign客户端⼯程配置⽂件(application.yml)中开启Feign对熔断器的⽀持
#开启Feign的熔断功能 feign: hystrix: enabled: true
Feign的超时时⻓设置那其实就上⾯Ribbon的超时时⻓设置
Hystrix超时设置(就按照之前Hystrix设置的⽅式就OK了)
Tips:
1)开启Hystrix之后,Feign中的⽅法都会被进⾏⼀个管理了,⼀旦出现问题就进⼊对应的回退逻辑处理
2)针对超时这⼀点,当前有两个超时时间设置(Feign/hystrix),熔断的时候是根据这两个时间的最⼩值来进⾏的,即处理时⻓超过最短的那个超时时间了就熔断进⼊回退降级逻辑
-
⾃定义FallBack处理类(需要实现FeignClient接⼝)
-
在@FeignClient注解中关联2)中⾃定义的处理类
@FeignClient(value = "demo-service-resume", fallback = ResumeFallback.class, path = "/resume")
4.5 feign对请求压缩和响应压缩的支持
Feign⽀持对请求和响应进⾏GZIP压缩,以减少通信过程中的性能损耗。通过下⾯的参数即可开启请求与响应的压缩功能
feign:
compression:
request:
enabled: true #开启请求压缩
mime-types: text/html,application/xml,application/json #设置压缩的数据类型,此处也是默认值
min-request-size:2048 #设置触发压缩的⼤⼩下限,此处也是默认值
response:
enabled:true #开启响应压缩
4.6 feign的日志级别配置
Feign是http请求客户端,类似于咱们的浏览器,它在请求和接收响应的时候,可以打印出⽐较详细的⼀些⽇志信息(响应头,状态码等等)
如果我们想看到Feign请求时的⽇志,我们可以进⾏配置,默认情况下Feign的⽇志没有开启。
-
开启Feign⽇志功能及级别
/** * Feign的⽇志级别(Feign请求过程信息) * <p> * NONE:默认的,不显示任何⽇志----性能最好 * BASIC:仅记录请求⽅法、URL、响应状态码以及执⾏时间----⽣产问题追踪 * HEADERS:在BASIC级别的基础上,记录请求和响应的header * FULL:记录请求和响应的header、body和元数据----适⽤于开发及测试环境定位问题 */
-
配置log⽇志级别为debug
4.7 feign源码剖析
略过………
五. GateWay网关组件
⽹关(翻译过来就叫做GateWay):微服务架构中的重要组成部分
局域⽹中就有⽹关这个概念,局域⽹接收或者发送数据出去通过这个⽹关,⽐如⽤Vmware虚拟机软件搭建虚拟机集群的时候,往往我们需要选择IP段中的⼀个IP作为⽹关地址。
我们学习的GateWay–>SpringCloudGateWay(它只是众多⽹关解决⽅案中的⼀种)
5.1 gateway简介
SpringCloudGateWay是SpringCloud的⼀个全新项⽬,⽬标是取代NetflixZuul,它基于Spring5.0+SpringBoot2.0+WebFlux(基于⾼性能的Reactor模式响应式通信框架Netty,异步⾮阻塞模型)等技术开发,性能⾼于Zuul,官⽅测试,GateWay是Zuul的1.6倍,旨在为微服务架构提供⼀种简单有效的统⼀的API路由管理⽅式。
SpringCloudGateWay不仅提供统⼀的路由⽅式(反向代理)并且基于Filter(定义过滤器对请求过滤,完成⼀些功能)链的⽅式提供了⽹关基本的功能,例如:鉴权、流量控制、熔断、路径重写、⽇志监控等。
网关在架构中的位置
5.2 gateway核心概念
Zuul1.x阻塞式IO, 2.x基于Netty
Spring Cloud GateWay天⽣就是异步⾮阻塞的,基于Reactor模型
⼀个请求—>⽹关根据⼀定的条件匹配—匹配成功之后可以将请求转发到指定的服务地址;⽽在这个过程中,我们可以进⾏⼀些⽐较具体的控制(限流、⽇志、⿊⽩名单)
-
路由 route
⽹关最基础的部分,也是⽹关⽐较基础的⼯作单元。路由由⼀个ID、⼀个⽬标URL(最终路由到的地址)、⼀系列的断⾔(匹配条件判断)和Filter过滤器(精细化控制)组成。如果断⾔为true,则匹配该路由
-
断言 predicates
参考了Java8中的断⾔java.util.function.Predicate,开发⼈员可以匹配Http请求中的所有内容(包括请求头、请求参数等)(类似于 nginx中的location匹配⼀样),如果断⾔与请求相匹配则路由
-
过滤器 filter
⼀个标准的SpringwebFilter,使⽤过滤器,可以在请求之前或者之后执⾏业务逻辑
其中,Predicates断⾔就是我们的匹配条件,⽽Filter就可以理解为⼀个⽆所不能的拦截器,有了这两个元素,结合⽬标URL,就可以实现⼀个具体的路由转发。
5.3 gateway工作过程
客户端向SpringCloudGateWay发出请求,然后在GateWayHandlerMapping中找到与请求相匹配的路由,将其发送到GateWayWebHandler;Handler再通过指定的过滤器链来将请求发送到我们实际的服务执⾏业务逻辑,然后返回。过滤器之间⽤虚线分开是因为过滤器可能会在发送代理请求之前(pre)或者之后(post)执⾏业务逻辑。
Filter在“pre”类型过滤器中可以做参数校验、权限校验、流量监控、⽇志输出、协议转换等,在“post”类型的过滤器中可以做响应内容、响应头的修改、⽇志的输出、流量监控等。
GateWay核⼼逻辑:路由转发+执⾏过滤器链
5.4 gateway应用
使⽤⽹关对⾃动投递微服务进⾏代理(添加在它的上游,相当于隐藏了具体微服务的信息,对外暴露的是⽹关)
- 创建工程 cloud-gateway-server-9002
- 导入依赖 GateWay不需要使⽤web模块,它引⼊的是WebFlux(类似于SpringMVC)
- Application.yml配置文件
- 测试地址,即请求网关: http://localhost:9002/autodeliver/checkState/1545132
5.5 gateway路由规则详解
SpringCloudGateWay帮我们内置了很多Predicates功能,实现了各种路由匹配规则(通过Header、请求参数等作为条件)匹配到对应的路由。
-
时间点后匹配
spring: cloud: gateway: routes: -id: after_route uri: https://example.org predicates: -After=2017-01-20T17:42:47.789-07:00[America/Denver]
-
时间点前匹配
spring: cloud: gateway: routes: -id:before_route uri:https://example.org predicates: -Before=2017-01-20T17:42:47.789-07:00[America/Denver]
-
时间区间匹配
spring: cloud: gateway: routes: -id: between_route uri: https://example.org predicates: -Between=2017-01-20T17:42:47.789-07:00[America/Denver],2017-01-21T17:42:47.789-07:00[America/Denver]
-
指定Cookie正则匹配指定值
spring: cloud: gateway: routes: - id: cookie_route uri: https://example.org predicates: - Cookie=chocolate, ch.p
-
指定Header正则匹配指定值
spring: cloud: gateway: routes: - id: header_route uri: https://example.org predicates: - Header=X-Request-Id, \d+
-
请求Host匹配指定值
spring: cloud: gateway: routes: - id: host_route uri: https://example.org predicates: - Host=**.somehost.org,**.anotherhost.org
-
请求Method匹配指定请求⽅式
spring: cloud: gateway: routes: - id: method_route uri: https://example.org predicates: - Method=GET,POST
-
请求路径正则匹配
spring: cloud: gateway: routes: - id: path_route uri: https://example.org predicates: - Path=/red/{segment},/blue/{segment}
-
请求包含某参数
spring: cloud: gateway: routes: - id: query_route uri: https://example.org predicates: - Query=green
-
请求包含某参数并且参数值匹配正则表达式
spring: cloud: gateway: routes: - id: query_route uri: https://example.org predicates: - Query=red, gree.
-
远程地址匹配
spring: cloud: gateway: routes: - id: remoteaddr_route uri: https://example.org predicates: - RemoteAddr=192.168.1.1/24
5.6 动态路由详解
GateWay⽀持⾃动从注册中⼼中获取服务列表并访问,即所谓的动态路由
动态路由设置时,uri以 lb: //开头(lb代表从注册中⼼获取服务),后⾯是需要转发到的服务名称
即示例代码里配置的
5.7 gateway过滤器
从过滤器⽣命周期(影响时机点)的⻆度来说,主要有两个pre和post
生命周期时间点 | 作用 |
---|---|
Pre | 这种过滤器在请求被路由之前调⽤。我们可利⽤这种过滤器实现身份验证、在集群中选择请求的微服务、记录调试信息等。 |
Post | 这种过滤器在路由到微服务以后执⾏。这种过滤器可⽤来为响应添加标准的HTTPHeader、收集统计信息和指标、将响应从微服务发送给客户端等。 |
从过滤器类型的⻆度,SpringCloudGateWay的过滤器分为GateWayFilter和GlobalFilter两种
过滤器类型 | 影响范围 |
---|---|
GateWayFilter | 应⽤到单个路由路由上 |
GlobalFilter | 应⽤到所有的路由上 |
如GatewayFilter可以去掉url中的占位后转发路由,⽐如
- id: service-resume-router uri: lb://demo-service-resume predicates: - Path=/resume/** filters: #可以去掉resume之后转发 - StripPrefix=1
GlobalFilter全局过滤器是程序员使⽤⽐较多的过滤器,如示例代码里的ip 黑名单过滤器
5.8 gateway高可用
⽹关作为⾮常核⼼的⼀个部件,如果挂掉,那么所有请求都可能⽆法路由处理,因此我们需要做GateWay的⾼可⽤。
GateWay的⾼可⽤很简单:可以启动多个GateWay实例来实现⾼可⽤,在GateWay 的上游使⽤Nginx等负载均衡设备进⾏负载转发以达到⾼可⽤的⽬的。
启动多个GateWay实例(假如说两个,⼀个端⼝9002,⼀个端⼝9003),剩下的就是使⽤Nginx等完成负载代理即可。示例如下:
#配置多个GateWay实例
upstream gateway{
server127.0.0.1:9002;
server127.0.0.1:9003;}
location/{
proxy_passhttp://gateway;
}
六. Config分布式配置中心
6.1 应用场景
往往,我们使⽤配置⽂件管理⼀些配置信息,⽐如application.yml
单体应⽤架构: 配置信息的管理、维护并不会显得特别麻烦,⼿动操作就可以,因为就⼀个⼯程;
微服务架构: 因为我们的分布式集群环境中可能有很多个微服务,我们不可能⼀个⼀个去修改配置然后重启⽣效,在⼀定场景下我们还需要在运⾏期间动态调整配置信息,⽐如:根据各个微服务的负载情况,动态调整数据源连接池⼤⼩,我们希望配置内容发⽣变化的时候,微服务可以⾃动更新。
场景总结如下:
- 集中配置管理,⼀个微服务架构中可能有成百上千个微服务,所以集中配置管理是很重要的(⼀次修改、到处⽣效)
- 不同环境不同配置,⽐如数据源配置在不同环境(开发dev,测试test,⽣产prod)中是不同的
- 运⾏期间可动态调整。例如,可根据各个微服务的负载情况,动态调整数据源连接池⼤⼩等配置修改后可⾃动更新
- 如配置内容发⽣变化,微服务可以⾃动更新配置
那么,我们就需要对配置⽂件进⾏集中式管理,这也是分布式配置中⼼的作⽤。
6.2 Spring Cloud Config
SpringCloudConfig是⼀个分布式配置管理⽅案,包含了Server端和Client端两个部分。
-
Server端:
提供配置⽂件的存储、以接⼝的形式将配置⽂件的内容提供出去,通过使⽤@EnableConfigServer注解在Springboot应⽤中⾮常简单的嵌⼊
-
Client端:
通过接⼝获取配置数据并初始化⾃⼰的应⽤
ConfigServer是集中式的配置服务,⽤于集中管理应⽤程序各个环境下的配置。默认使⽤Git存储配置⽂件内容,也可以SVN。
服务端配置使用如下:
-
git仓库,gitee 创建配置文件仓库 spring-cloud-config-repo,名称随便命名
-
上传yml配置⽂件,命名规则如下
{application}-{profile}.yml或者{application}-{profile}.properties
其中,application为应⽤名称,profile指的是环境(⽤于区分开发环境,测试环境、⽣产环境等)
-
构建ConfigServer统⼀配置中⼼: cloud-config-server-9006
见示例工程
-
配置文件中配置连接"配置中心"的连接信息, 见示例工程
-
启动后,即可通过链接访问到我们的配置文件, 例:http://localhost:9006/master/application.yml
构建客户端如下 :
-
新建工程 demo-service-resume-8082
-
添加依赖 spring-cloud-config-client
-
application.yml修改为bootstrap.yml配置⽂件
bootstrap.yml是系统级别的,优先级⽐application.yml⾼,应⽤启动时会检查这个配置⽂件,在这个配置⽂件中指定配置中⼼的服务地址,会⾃动拉取所有应⽤配置并且启⽤。
spring: cloud: # config客户端配置,和ConfigServer通信,并告知ConfigServer希望获取的配置信息在哪个文件中 config: name: application #配置文件名称 profile: dev #后缀名称 label: master #分支名称 uri: http://localhost:9006 #ConfigServer配置中心地址
-
启动即可从配置服务器拉取配置生效
6.3 Config配置手动刷新
不⽤重启微服务,只需要⼿动的做⼀些其他的操作(访问⼀个地址/refresh)刷新,之后再访问即可
此时,客户端取到了配置中⼼的值,但当我们修改gitee上⾯的值时,服务端(ConfigServer)能实时获取最新的值,但客户端(ConfigClient)读的是缓存,⽆法实时获取最新值。SpringCloud已经为我们解决了这个问题,那就是客户端使⽤post去触发refresh,获取最新数据。
- Client客户端添加依赖springboot-starter-actuator
- Client客户端bootstrap.yml中添加配置(暴露通信端点)
- Client客户端使⽤到配置信息的类上添加@RefreshScope, 比如在controller上
- ⼿动向Client客户端发起POST请求,http://localhost:8082/actuator/refresh,刷新配置信息
6.4 config配置自动更新
实现⼀次通知处处⽣效
在微服务架构中,我们可以结合消息总线(Bus)实现分布式配置的⾃动更新(SpringCloudConfig+SpringCloudBus)
-
消息总线bus
所谓消息总线Bus,即我们经常会使⽤MQ消息代理构建⼀个共⽤的Topic,通过这个Topic连接各个微服务实例,MQ⼴播的消息会被所有在注册中⼼的微服务实例监听和消费。换⾔之就是通过⼀个主题连接各个微服务,打通脉络。
SpringCloudBus(基于MQ的,⽀持RabbitMq/Kafka)是SpringCloud中的消息总线⽅案,SpringCloudConfig+SpringCloudBus结合可以实现配置信息的⾃动更新。
-
SpringCloudConfig+SpringCloudBus实现⾃动刷新
MQ消息代理,我们还选择使⽤RabbitMQ,ConfigServer和ConfigClient都添加都消息总线的⽀持以及与RabbitMq的连接信息
-
Config server 添加总线支持,即引入依赖 spring-cloud-starter-bus-amqp
-
Config server添加配置
-
微服务暴露端口
include: bus-refresh 或者 include: "*"
-
重启各个服务, 向配置中心发送post请求: http://ip:port/actuator/bus-refresh, 各个客户端即可自动刷新
-
七. SpringCloudStream消息驱动组件
SpringCloudStream消息驱动组件帮助我们更快速,更⽅便,更友好的去构建消息驱动微服务的。
7.1 stream解决的痛点问题
MQ消息中间件⼴泛应⽤在应⽤解耦合、异步消息处理、流量削峰等场景中
不同的MQ消息中间件内部机制包括使⽤⽅式都会有所不同,⽐如RabbitMQ中有Exchange(交换机/交换器)这⼀概念,kafka有Topic、Partition分区这些概念,MQ消息中间件的差异性不利于我们上层的开发应⽤,当我们的系统希望从原有的RabbitMQ切换到Kafka时,我们会发现⽐较困难,很多要操作可能重来(因为应⽤程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。
SpringCloudStream进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前SpringCloudStream⽀持RabbitMQ和Kafka。
本质:屏蔽掉了底层不同MQ消息中间件之间的差异,统⼀了MQ的编程模型,降低了学习、开发、维护MQ的成本
7.2 stream重要概念
SpringCloudStream是⼀个构建消息驱动微服务的框架。应⽤程序通过inputs(相当于消息消费者consumer)或者outputs(相当于消息⽣产者producer)来与SpringCloudStream中的binder对象交互,⽽Binder对象是⽤来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。
说⽩了:对于我们来说,只需要知道如何使⽤SpringCloudStream与Binder对象交互即可
Binder绑定器
Binder绑定器是SpringCloudStream中⾮常核⼼的概念,就是通过它来屏蔽底层不同MQ消息中间件的细节差异,当需要更换为其他消息中间件时,我们需要做的就是更换对应的Binder绑定器⽽不需要修改任何应⽤逻辑(Binder绑定器的实现是框架内置的,SpringCloudStream⽬前⽀持Rabbit、Kafka两种消息队列)
7.3 传统MQ模型与Stream消息驱动模型
7.4 stream消息通信方式及编程模型
-
消息通信方式
Stream中的消息通信⽅式遵循了发布—订阅模式。
在SpringCloudStream中的消息通信⽅式遵循了发布-订阅模式,当⼀条消息被投递到消息中间件之后,它会通过共享的Topic主题进⾏⼴播,消息消费者在订阅的主题中收到它并触发⾃身的业务逻辑处理。这⾥所提到的Topic主题是Spring CloudStream中的⼀个抽象概念,⽤来代表发布共享消息给消费者的地⽅。在不同的消息中间件中,Topic可能对应着不同的概念,⽐如:在RabbitMQ中的它对应了Exchange、在Kakfa中则对应了Kafka中的Topic。
-
编程注解
如下的注解⽆⾮在做⼀件事,把我们结构图中那些组成部分上下关联起来,打通通道(这样的话⽣产者的message数据才能进⼊mq,mq中数据才能进⼊消费者⼯程)。
注解 描述 @Input(在消费者⼯程中使⽤) 注解标识输⼊通道,通过该输⼊通道接收到的消息进⼊应⽤程序 @Output(在⽣产者⼯程中使⽤) 注解标识输出通道,发布的消息将通过该通道离开应⽤程序 @StreamListener(在消费者⼯程中使⽤,监听message的到来) 监听队列,⽤于消费者的队列的消息的接收(有消息监听…..) @EnableBinding 把Channel和Exchange(对于RabbitMQ)绑定在⼀起 我们先安装一下rabiitmq: 官方安装教程
mac通过homebrew安装: https://www.rabbitmq.com/install-homebrew.html
下面我们创建三个工程,基于rabbitMq:
cloud-stream-producer-9090,作为⽣产者端发消息 cloud-stream-consumer-9091,作为消费者端接收消息 cloud-stream-consumer-9092,作为消费者端接收消息
生产者,消费者: 见示例代码工程
7.5 stream高级之自定义消息通道
Stream内置了两种接⼝: Source和Sink分别定义了binding为“output”的输出流和“input”的输⼊流
我们也可以⾃定义各种输⼊输出流(通道),但实际我们可以在我们的服务中使⽤多个binder、多个输⼊通道和输出通道,然⽽默认就带了⼀个input的输⼊通道和⼀个output的输出通道,怎么办?
我们是可以⾃定义消息通道的,学着Source和Sink的样⼦,给你的通道定义个⾃⼰的名字,多个输⼊通道和输出通道是可以写在⼀个类中的。
-
定义接口
interface CustomChannel { String INPUT_LOG = "inputLog"; String OUTPUT_LOG = "outputLog"; @Input(INPUT_LOG) SubscribableChannel inputLog(); @Output(OUTPUT_LOG) MessageChannel outputLog(); }
-
使用
- 在 @EnableBinding 注解中,绑定⾃定义的接⼝
- 使⽤ @StreamListener 做监听的时候,需要指定 CustomChannel.INPUT_LOG
bindings: inputLog: destination: xxxExchange outputLog: destination: xxxExchange
7.6 stream高级之消息分组
如上我们的情况,消费者端有两个(消费同⼀个MQ的同⼀个主题),但是呢我们的业务场景中希望这个主题的⼀个Message只能被⼀个消费者端消费处理,此时我们就可以使⽤消息分组
解决的问题:能解决消息重复消费问题
我们仅仅需要在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同⼀个group名称(在同⼀个group中的多个消费者只有⼀个可以获取到消息并消费)。