皮皮网

【积分看图源码】【珠海网站源码】【策略先锋源码】webflux源码分析

来源:域名出租源码 时间:2025-01-17 09:26:37

1.使用Gateway作为SpringCloud网关
2.Reactor-Netty基本抽象类介绍
3.第6章 Reactor Netty中的码分消息处理逻辑 01
4.springbootcloud组件
5.springboot教程菜鸟(springboot入门教程)
6.Reactive Spring实战 -- 理解Reactor的设计与实现

webflux源码分析

使用Gateway作为SpringCloud网关

       本着能用原生就用原生的原则,我们这里使用SpringGateway来作为云服务的码分网关

       配置

       从官网的介绍来看,spring网关拥有的码分功能有,路由(配置,码分过滤,码分重写等),码分积分看图源码熔断以及流量控制

       首先引入包

       动态路由

       路由的码分配置比较简单,有两种方法:使用配置文件和代码注入,码分我们这里简单展示下两种方法

       或者使用

       路由配置中id、码分uri、码分order、码分predicates.path/host没什么好说的码分,根据需求配置即可,码分filters相关参数,码分这里最好还是码分参考源码相关部分或者Spring Cloud Gateway比较全面,比如常用的前缀切割

       这里我们以常用的两种filter,流量控制和熔断降级举例

       流量控制

       通常我们需要限流来保证服务的可用性,保护一些不太稳定的服务不会因为高并发的请求而挂掉,这里我们一般在网关层做流量控制,减少实际进入的请求达到平波峰的目的

       计数器算法

       如果某个服务会在请求中数量达到时候挂掉,请求平均时间为2s,我们给一段时间一个请求量的限制,比如2秒次,每次请求进入就减少计数,每2s开始时重新计数,这样就能保证服务请求中数量在以内。但是对于抢购类接口,可能前ms请求数量就用完了,后面所有请求都被拒绝,即请求突刺现象,这样的用户体验是非常差的所以我们需要尽可能在所有的时间内保证接口的可用性(计数器算法就像DRAM中的集中式刷新一样不太能被接受),而且短时间内大量请求运行在相同代码段是非常危险的,在设计不好的情况很可能会出现数据库死锁等等问题

       漏桶算法

       我们需要让请求尽可能地能进行来,就需要平波峰填波谷,就上例而言,2s内最大请求为,也就是珠海网站源码每个请求占用的时间比例为ms,我们设计一个容量为的桶(队列)每ms向接口发一个请求,可以让服务中请求数量不超过的情况下,每ms都能接受一个新的请求,这样就缓解了请求突刺现象。但是这里还有一个问题,对于抢购类接口,个容量可能ms就用完了,在第ms可能还会有个请求抢1个位置,个请求会被取消,这样也是相对来说不能被接受的

       令牌桶算法

       令牌桶算法就是目前spring cloud gateway采用的算法,这里采用的用户时间换用户失败的策略,假设我们认为用户的平均忍耐时间为8秒,接口超过8秒一些用户就要骂街了,减去实际执行的2秒,也就是说我们的可以利用6秒的时间容纳更多的请求。依上文而言每ms去调用这个端口,那么也就是说桶的设计可以更大,在桶里放上令牌,每个请求需要在桶里面拿到令牌才能调用,这里的桶容量就是6s/ms为个。但是我们的执行速度是不变的,也就是结果是,在请求多的情况下用户的执行时间在8秒左右,而在请求少的情况下执行速度在2s左右,这样就缓解了短时间内大量请求导致大量失败的问题了。这里比较重要的参数有两个,第一个是桶请求容量 defaultBurstCapacity,第二个是每秒执行的请求速度(也就是桶的填充速率)defaultReplenishRate

       在这个例子中defaultBurstCapacity=而defaultReplenishRate=,这两个参数我们会在下方配置

       这里我们需要引用redis包,再说明一下,本站使用的是jdk的版本,其他版本的配置和引用可能会稍有变化,需要调整

       覆写KeyResolver的实现类

       流量控制,这里同样有代码实现和配置文件实现,由于目前idea对于复杂配置文件的支持不太好,如果使用配置文件方式会疯狂报红,策略先锋源码但是如果全部使用代码的话会不方便实现动态路由,因为gateway是先加载配置再处理代码的。所以这里我们路由使用配置,filter之类复杂的使用代码实现,下面是简单示例

       这样全服务层面的接口流量控制就完成了,具体的哪些服务使用流量控制,具体控制参数的配置,自行稍作修改即可

       测试流量控制的话,可以将令牌回复量和令牌总容量调至比较低的水平,然后再浏览器直接curl接口,比如令牌回复量和容量为1,则单秒内curl即可触发浏览器提示,线上大令牌容量测试能需要多线程curl了,这里参考官方文档给的lua脚本

       ip限流

       如果我们需要对某个ip进行限流,比如防止脚本抢货,我们这里需要KeyResolver的实现不再使用exchange.getRequest().getURI().getPath() ,而是使用 exchange.getRequest().getRemoteAddress() 。但是这里还有一个问题,我们请求是经过层层转发的,nginx,docker等,所以我们可能并不能拿到原始的请求地址,所以这里我们需要在最外层,比如nginx中将原始地址存到header或者cookie当中,这里给出简单示例

       当然还有其他类似X-Forwarded-For的字段不再本文主要探讨范围就不多拓展了,在nginx中配置记录初始远程地址到header后,我们这里需要在程序中取出来,如果你这里使用的标准的X-Real-IP的字段去存储,那么只需要

       即可获取真实地址,如果你这里自定义了一个header的key那么需要在exchange.getRequest().getHeaders()里面自己找出来了

       最后我们这里给出对同一个接口同时配置两种限流的示例

       我在ip限流这里修改了返回的code由改为了,方便测试,这里我们将ip的限流参数设置为(2,2),将path的限流参数设置为(1,)然后不断请求接口就发现一开始返回错误,后续path令牌桶用完后返回错误,hotspotnew源码分析即设置成功

       补充

       如果这里你不希望返回,并且要求返回一个用户可读的带有json信息结果,那么比较好的业务处理方式是前端完成。如果是对外接口的话,那么我们这里就只能重写RateLimiter的实现了,不再使用RedisRateLimiter的类,而是自己去继承RateLimiter接口去实现,

       参考 SpringCloudGateway限流后,默认返回的改造:改跳转或增加响应body,这篇文章已经很详细,这里就不赘述了

       熔断降级

       熔断降级,即某个接口调用失败时使用其他接口代替,来保证整体服务对外的可用性

       首先需要引入熔断包

       circuitbreaker-reactor-resilience4j 熔断的相关配置分为两个部分,熔断逻辑本身的配置以及在集成到gateway中时候,网关的配置,熔断的重要的配置有,触发熔断的接口,代替接口,熔断超时时间(当然还有其他的,比如自定义熔断HttpStatus等等,详细参数参考 Spring Cloud Circuit Breaker以及resilience4j官网)

       这里熔断触发接口和代替接口配置位于gateway中,这里我们使用代码实现,位置参考前述

       这里setName的目的是和熔断包中的配置产生对应关系,下方为熔断包的配置,这里定义默认超时时间(也就是没有匹配到name的超时时间)为s,your_breaker_id的超时时间为3s

       最后

       到这里网关的基本功能就差不多了,自定义的一些业务功能配置,比如header,cookie,以及调用方ip的处理逻辑等等其实都是在网关层处理的,可以参考 Spring Cloud Gateway WebFilter Factories以及Writing Custom Spring Cloud Gateway Filters,但是这种配置基本都没什么坑,这里就不谈了

       网关由于不经常作为业务逻辑被重构,所以网络上的资料相对比较少,我这里使用的又是最新的版本还是蛮多和前版本不一样的地方,尤其是自助攒机源码webflux的一些东西,很多问题需要看源码才能解决,非常的消耗意志力。这里建议小伙伴们如果是业务使用的这种资料相对较少的架构,最好还是不要使用最新版本的比较好,毕竟万一遇到坑,踩个一两天是很正常的事情,而这种在业务场景可能就没那么容易接受了

Reactor-Netty基本抽象类介绍

       概述

       之前已经把reactor3看的差不多了,在学会webflux之前还需要了解Reactor-Netty的相关知识,然后才能看懂webflux,然后才能看懂Gateway.

LoopResource

       首先先学习几个基本的类才能看懂Reactor-Netty在干什么.我们先来看LoopResource类.官方说这个类是一个EventLoopGroup 的 selector并且关联了 Channel的工厂

* An { @link EventLoopGroup} selector with associated* { @link io.netty.channel.Channel} factories.

       我们来看一下LoopResource提供的一些方法

static LoopResources create(String prefix) { if (Objects.requireNonNull(prefix, "prefix").isEmpty()) { throw new IllegalArgumentException("Cannot use empty prefix"); } return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT, DEFAULT_IO_WORKER_COUNT, true);}

        我们来看看DefaultLoopResource内部实现

        其实内部就是缓存了一堆的EventLoopGroup

ChannelPipelineConfigurer

        这个类的作用就是Channel创建好之后,在读取数据之前的初始化工作,我们看几个实现类 HttpServerChannelInitializer

ChannelGroup

       官方解释: 一个线程安全的集合,里面装的是打开的Channel,并且提供了很多操作Channel的方法,关闭的Channel会自动被group剔除.一个Channel可以属于多个Group

       先来看看唯一一个实现类DefaultChannelGroup的源码

        可以看到内部就是两个Map维护服务端和客户端的Channel,然后还有一个监听器.接下来看看添加Channel的方法再来看看是如何自动把过期Channel移除的,channel关闭之后会出发listener,listener会调用remove方法

        其实就是很简单的从map中移除数据的逻辑

ConnectionObserver

       从字面上看就是连接的观察者.是一个Connection的生命周期观察器.核心方法是 onStateChange.子类很多,等看源码的时候看到具体的再看源码.我们先来看ConnectionObserver定义的几个状态

TransportConfig

       一个配置的抽象类,里面保存了一些属性

        我们上面介绍的那些类都被保存在了这个Config里面.来看看其中一些比较重要的子类

ServerTransportConfig

        可以看到这个子类里面提供了两个ConnectionObserver我们分别来看一看

ServerTransportDoOnconnectionServerTransportDoOn原文:/post/

第6章 Reactor Netty中的消息处理逻辑

       在深入探讨Reactor Netty中的消息处理逻辑时,我们首先回顾了ChannelOperationsHandler作为WebFlux与Netty之间的重要桥梁,以及如何通过Channel生命周期的核心功能实现对Netty客户端与服务器端连接的封装。关注服务器端层面上的浏览器通过HTTP请求访问应用程序所涉及的Channel及其消息传递机制,这一章旨在逐步揭示其中的组织细节。

       在理解Reactor Netty中如何利用ChannelOperationsHandler进行通信的基础上,我们重点分析了ConnectionObserver与channelOperation的相互配合。特别地,我们探讨了ConnectionObserver如何在服务器端通过其onStateChange方法,根据不同的ConnectionObserver.State状态对Channel执行相应操作。以PooledConnectionProvider#acquire的源码为例,展示了如何在获取Channel时预先配置ConnectionObserver和channelOperationFactory,进而根据应用场景进行灵活的定制与配置。

       为了深入了解ConnectionObserver与channelOperation在服务器端的协同工作,我们首先通过回顾PooledConnectionProvider的配置机制,了解到在服务器端配置ConnectionObserver的途径。具体来说,利用reactor.netty.

       Upgrade:websocket

       Connection:Upgrade

       Sec-WebSocket-Key:x3JJHMbDL1EzLkh9GBhXDw==

       Sec-WebSocket-Protocol:chat,superchat

       Sec-WebSocket-Version:

       Origin:

       熟悉HTTP的童鞋可能发现了,这段类似HTTP协议的握手请求中,多了几个东西。我会顺便讲解下作用。

       2.1Upgrade和Connection

       Upgrade:websocket

       Connection:Upgrade

       这个就是Websocket的核心了,告诉Apache、Tomcat、Nginx等服务器:注意啦,我发起的是Websocket协议,快点帮我找到对应的助理处理~不是那个老土的HTTP。

       2.2Sec-WebSocket

       Sec-WebSocket-Key:x3JJHMbDL1EzLkh9GBhXDw==

       Sec-WebSocket-Protocol:chat,superchat

       Sec-WebSocket-Version:

       首先,?Sec-WebSocket-Key是一个Baseencode的值,这个是浏览器随机生成的,告诉服务器:你妹,不要忽悠窝,我要验证尼是不是真的是Websocket助理。

       然后,?Sec_WebSocket-Protocol是一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议。简单理解:今晚我要服务A,别搞错啦~

       最后,?Sec-WebSocket-Version是告诉服务器所使用的WebSocketDraft(协议版本),在最初的时候,Websocket协议还在Draft阶段,各种奇奇怪怪的协议都有,而且还有很多期奇奇怪怪不同的东西,什么Firefox和Chrome用的不是一个版本之类的,当初Websocket协议太多可是一个大难题。。不过现在还好,已经定下来啦~大家都使用的一个东西~脱水:服务员,我要的是岁的噢→_→

       然后服务器会返回下列东西,表示已经接受到请求,成功建立Websocket啦!

       HTTP/1.SwitchingProtocols

       Upgrade:websocket

       Connection:Upgrade

       Sec-WebSocket-Accept:HSmrc0sMlYUkAGmm5OPpG2HaGWk=

       Sec-WebSocket-Protocol:chat

       这里开始就是HTTP最后负责的区域了,告诉客户,我已经成功切换协议啦~

       Upgrade:websocket

       Connection:Upgrade

       依然是固定的,告诉客户端即将升级的是Websocket协议,而不是mozillasocket,lurnarsocket或者shitsocket。

       然后,?Sec-WebSocket-Accept这个则是经过服务器确认,并且加密过后的Sec-WebSocket-Key。服务器:好啦好啦,知道啦,给你看我的IDCARD来证明行了吧。后面的,Sec-WebSocket-Protocol则是表示最终使用的协议。

       至此,HTTP已经完成它所有工作了,接下来就是完全按照Websocket协议进行了。具体的协议就不在这阐述了。

       ——————技术解析部分完毕——————

       你说了这么久,那到底Websocket有什么鬼用,pressionThreshold"value=""/

       /bean

       /property

       propertyname="opTimeout"value=""/

       propertyname="timeoutExceptionThreshold"value=""/

       propertyname="locatorType"value="CONSISTENT"/

       propertyname="hashAlg"

       valuetype="net.spy.memcached.DefaultHashAlgorithm"KETAMA_HASH/value

       /property

       propertyname="failureMode"value="Redistribute"/

       propertyname="useNagleAlgorithm"value="false"/

       /bean

Reactive Spring实战 -- 理解Reactor的设计与实现

       Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。它提供了可组合的异步序列API,包括用于多个元素的Flux和用于零到一个元素的Mono。

       Reactor Netty项目还支持非阻塞式网络通信,非常适合微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。本文将通过实例展示和源码阅读,深入分析Reactor的核心设计与实现机制。

       Reactor源码基于版本3.3。

       响应式编程是一个专注于数据流和变化传递的异步编程范式,允许使用编程语言表示静态或动态数据流。

       Reactor中,发布者(Publisher)负责生产数据,订阅者(Subscriber)负责处理和消费数据。创建发布者和订阅者后,通过建立订阅关系,发布者开始生产数据并传递给订阅者。

       Flux和Mono是两种发布者类型,分别用于生产多个数据元素和单个数据元素。例如,Flux.range和fromArray等静态方法会返回Flux子类。

       Reactor中关键方法包括Publisher#subscribe和Flux#subscribe。订阅者在onSubscribe方法中接收订阅关系,然后通过Subscription#request方法向发布者请求数据。

       RangeSubscription#request、Subscriber#onNext和CoreSubscriber的内部逻辑展示了数据流转的过程。Flux子类的subscribe方法创建Subscription,将操作符逻辑转移到Subscriber端。

       操作符方法,如skip、distinct、sort和filter,是Reactor的核心,用于处理和组合数据流。例如,myHandler作为订阅者,可以处理生成的Flux子类序列。

       Reactor支持push和pull模式。pull模式通过Flux#generate和Sink缓存数据,而push模式则通过Flux#create,允许多线程同时推送数据。

       Reactor提供线程与调度器支持,例如parallel、single、boundedElastic和parallel。这些调度器允许在不同线程环境下执行操作。

       Reactor中的publishOn和subscribeOn操作符方法用于切换操作上下文,分别影响后续操作和整个链路的线程执行环境。

       流量控制是响应式编程中的重要概念,FluxSink.OverflowStrategy定义了在数据生产速度超过消费速度时的策略,如忽略、错误或缓存数据。

       Reactor通过实例和源码展示了响应式编程的概念和实现机制,以及如何在实际应用中使用。通过WebFlux和AsyncRestTemplate的比较,将揭示响应式编程带来的优势。

(WebFlux)、多数据源R2dbc事务失效分析

       在项目改造过程中,我们将SpringMVC替换为SpringWebflux,同时将Mybatis升级为R2dbc。项目进展顺利,直到新需求引入MongoDb,问题浮现。面对Mysql和MongoDb的多数据源挑战,事物操作出现异常。本文将深入分析问题原因与解决方案。

       在本地测试时,强烈推荐使用虚拟机和Docker安装MySql与MongoDb,以避免Mac直连Docker带来的麻烦。SpringBoot版本为2.6.,本文基于已集成R2DBC与MongoDb的环境。

       首先,我们创建了一个测试库r2dbc_test,包含user表。引入R2dbc并进行基本测试,实现事务操作,确保数据完整性。测试结果显示,R2dbc事务操作正常,当尝试删除并插入数据时,期望的异常和数据状态得到验证。

       接着引入MongoDb,并开启事务支持。根据官方文档,除非手动配置MongoTransactionManager,否则事务支持默认禁用。在项目中添加相应代码,为Webflux环境配置MongoDB事务。然而,引入MongoDb后,事务操作再次出现问题,未按预期回滚。

       为了解决此问题,我们深入分析了事务失效的原因。经过排查,发现事务管理器未能正确初始化,导致TransactionalOperator无法正常工作。通过查看源码,发现R2dbcTransactionManager的初始化依赖于是否存在ReactiveTransactionManager。由于MongoDb事务已先期初始化,导致R2dbcTransactionManager未能正确创建,从而影响了事物操作。

       为解决此问题,我们采取了以下措施:创建两个配置类,分别为MongoConfig和R2dbcConfig,用于自定义事务管理器的初始化。通过别名方式创建两个TransactionalOperator,确保R2dbcTransactionManager的正确初始化。经过验证,设置正确的名称后,事务操作恢复正常,数据回滚验证成功。

       本文提出了手动验证的方法,并指出了使用日志记录作为辅助工具的快捷途径。通过日志,可以清晰地追踪事务创建与回滚过程,验证操作的有效性。总结而言,在面对新工具和多数据源时,应充分实验、验证结果,面对问题时保持冷静,逐步解决问题。如有疑问,欢迎指正与交流。