1.在Windows搭建gRPC C++开发环境
2.C++快速集成gRPC的源码几种方式介绍(内含预编译库下载)
3.C++_GRPC使用讲解-编译,开发环境搭建
4.gRPCå
¥åè®°
5.PolarisMesh源码系列--Polaris-Go注册发现流程
6.gRPCè´è½½åè¡¡ï¼èªå®ä¹è´è½½åè¡¡çç¥--etcdå®ç°ï¼
在Windows搭建gRPC C++开发环境
在Windows下搭建gRPC C++开发环境,源码并开发、源码配置简单的源码服务端及.net客户端的步骤如下:
1、下载gRPC源码:
通过git命令行在预设目录下载gRPC 1..0版本。源码
2、源码im酷聊源码生成工程文件:
使用CMake生成工程文件,源码需调整选项包括添加ABSL_PROPAGATE_CXX_STD为true,源码调整zlib依赖版本至2.8.,源码设置CMAKE_INSTALL_PREFIX以指定安装目录。源码
3、源码编译、源码安装gRPC:
使用Visual Studio 编译安装,源码设置为Release x生成ALL_BUILD和INSTALL项目,源码确保bin目录路径添加到环境变量Path中。源码
4、创建测试工程:
创建解决方案GRPCTest,adx同花顺源码包含c++空项目CPPServer与.Net 控制台项目DotNetClient,将protos文件夹及helloworld.proto文件导入。
5、编译proto文件:
使用命令行生成c++及c#文件,确保执行路径正确。
6、生成CPPServer项目:
将greeter_server.cc文件拷贝至CPPServer目录,并添加相关文件及目录,配置包含及附加库。
7、生成DotNetClient:
通过Nuget安装所需包,并将Helloworld相关文件添加到DotNetClient项目中,编辑Program.cs并编译。
8、测试:
运行CPPServer.exe与DotNetClient.exe进行测试,验证服务端与客户端通信是视频源码编辑否正常。
C++快速集成gRPC的几种方式介绍(内含预编译库下载)
集成gRPC到C++的途径多种多样,但每种方法都需要额外的步骤和资源投入。本文将对不同方式的集成进行介绍,并提供预编译库下载链接,帮助开发者简化步骤,更快上手。
官方提供的gRPC安装方式包括源码编译等,适合深入理解gRPC内部结构的开发者。对于希望快速集成gRPC的初学者,推荐直接使用预编译库。
推荐使用vcpkg进行预编译库的下载。vcpkg能够简化库的下载与配置过程,极大降低集成难度。若条件允许,自行下载与编译库以加深对gRPC的理解。
对于Windows用户,底部介入源码推荐下载名为grpc-vcpkg-repo-windows-x.7z的预编译库。在使用cmake构建时,需设置DCMAKE_TOOLCHAIN_FILE环境变量。针对使用Clion的开发者,vcpkg.cmake工具能自动完成大部分配置,快速搭建开发环境。
对于基于包管理系统的Linux发行版用户,推荐使用系统包管理器安装gRPC,同时可以选择下载预编译库以简化安装过程。
对于Linux ARM版本用户,推荐下载名为grpc-vcpkg-repo-linux-arm.7z的预编译库。请自行确保编译环境满足库的运行需求。若需要其他特定内核版本或系统环境支持的编译服务,可联系作者微信lnl,费用为一杯咖啡。
C++_GRPC使用讲解-编译,mrp源码下载开发环境搭建
特别强调,grpc对gcc/g++版本有要求,需6.3及以上,低于此版本需升级。首先,确保安装必要的依赖工具。1. 安装依赖工具
如cmake低于3.或gcc/g++低于7.0,请按文档进行更新。cmake推荐安装最新版本(最低3.)。
卸载旧版CMake后,解压下载的cmake包,bin目录包含cmake家族工具。
创建软链接,通常选择/opt或/usr路径。
2. gcc/g++升级
务必升级到6.3以上,版本7.0以上无需重复。安装7.0版本,确认版本显示为7.5。3. 编译grpc
推荐使用cmake编译,对网络有依赖。如果无法访问外部资源,可使用我提供的1..2版本压缩包编译,否则从源码开始下载。下载源码,选择v1..2或其他相应版本。
编译过程中会自动处理protobuf依赖,无需单独安装。
编译完成后,测试helloworld服务和客户端。
4. 辅助工具-scp命令
scp命令用于服务器间文件传输,提供下载和上传文件/目录的功能,但非课程重点。下载:scp username@ip:/path/to/file local/path
上传:scp local/path username@ip:/path/to/destination
下载目录:scp -r username@ip:/path/to/directory local/path
上传目录:scp -r local/path username@ip:/path/to/destination
获取grpc-v1..2源码包,可通过群组获取。gRPCå ¥åè®°
æ¦è¦
ç±äºgRPC主è¦æ¯è°·æå¼åçï¼ç±äºä¸äºå·²ç¥çåå ï¼gRPCè·demoè¿æ¯ä¸é£ä¹é¡ºå©çãåç¬åè¿ä¸ç¯ï¼ä¸»è¦æ¯gRPCå®è£ è¿ç¨ä¸çå太å¤äºï¼è®°å½ä¸æ¥è®©å¤§å®¶å°èµ°å¼¯è·¯ã
主è¦çåï¼
æ¬æ讲解gRPC demoçåæ¶ï¼ä¼ä»ç»å¦ä½è§£å³è¿äºåãæ¬æ对åºçGithubå°åï¼blogs.com/fhy/p/.html
(æ¬æå®)
PolarisMesh源码系列--Polaris-Go注册发现流程
北极星是腾讯开源的一款服务治理平台,其目标在于解决分布式和微服务架构中的服务管理、流量管理、配置管理、故障容错和可观测性问题。与Spring Cloud、Apache Dubbo和Istio等其他流行技术相比,北极星提供了独特的优势与服务注册发现的实现。
从功能实现角度看,Spring Cloud、Apache Dubbo、Istio和北极星都实现了服务治理的关键功能,但它们的实现思路有所不同。Spring Cloud在Spring Boot框架基础上扩展,继承了其灵活性,能够方便地集成服务注册发现、服务治理和可观测组件。而北极星则直接从下一代架构基金会制定的服务治理标准出发,构建服务治理的模型,并基于此模型构建控制面和数据面,提供了统一的服务治理框架。
ServiceMesh采用Sidecar模式解耦业务逻辑和服务治理逻辑,将服务治理能力下沉到基础设施,增强整体架构的灵活性。然而,这种模式在性能上有所损耗,并且对中小团队的灵活性和扩展性提出了挑战。Istio虽然提供了基于虚拟机/物理机的部署方式,但对Kubernetes的依赖较高,非Kubernetes环境的团队可能难以部署。
北极星Mesh则通过融合和兼容多种技术,提供了一种自顶向下的正向思考过程。它先基于服务治理标准构建模型,然后围绕该模型构建控制面和数据面,支持与ServiceMesh的集成,为未来发展留有空间。此外,北极星Mesh通过插件机制为框架扩展预留了灵活性。
本文重点分析了Polaris-Go SDK在服务注册和发现过程中的技术实现和源码阅读。服务注册流程相对简单,线性操作,通过gRPC服务接口实现。服务发现流程则更为复杂,涉及本地缓存与远程服务器信息的懒加载同步,以及处理实例信息、服务信息、路由信息和限流信息等复杂内容。在服务发现过程中,gRPC接口被用于关键点的处理。
综上所述,北极星服务治理平台通过实现服务治理标准,提供了全面的服务发现和治理方案。其客户端与服务器端的数据同步与交互设计了良好的服务治理模型和通信机制,确保了可靠性和稳定性。同时,通过插件机制,Polaris-Go SDK框架提供了灵活的扩展能力。这一分析仅是基于现有信息,如有错误或遗漏,欢迎指正。
gRPCè´è½½åè¡¡ï¼èªå®ä¹è´è½½åè¡¡çç¥--etcdå®ç°ï¼
èæ¯
å¨å·¥ä½å¦ä¹ ä¸ä½¿ç¨gRPCçå°æ¹æ¯è¾å¤ï¼é常æ们é½ä½¿ç¨çæ¯èªå¸¦çè´è½½åè¡¡ç®æ³ï¼ä½æ¯å¨æäºåºæ¯ä¸æ们éè¦å¯¹æå¡ççæ¬è¿è¡æ§å¶æ¯å¦[appV2åªè½å»é¾æ¥userV3],å¨è¿æ ·çæ åµä¸å°±åªè½éèªå®ä¹è´è½½åè¡¡çç¥
ç®æ å®ç°åºäºçæ¬ï¼versionï¼çgrpcè´è½½åè¡¡å¨ï¼äºè§£è¿ç¨åå¯èªå·±å®ç°æ´å¤çè´è½½åè¡¡åè½
注åä¸å¿
EtcdLeaseæ¯ä¸ç§æ£æµå®¢æ·ç«¯åæ´»ç¶åµçæºå¶ã群éæäºå ·æçåæ¶é´çç§çº¦ãå¦æetcd群éå¨ç»å®çTTLæ¶é´å æªæ¶å°keepAliveï¼åç§çº¦å°æã为äºå°ç§çº¦ç»å®å°é®å¼åå¨ä¸ï¼æ¯ä¸ªkeyæå¤å¯ä»¥éå ä¸ä¸ªç§çº¦
æå¡æ³¨å(注åæå¡)
å®æ¶ææ¬å°æå¡ï¼APPï¼å°å,çæ¬çä¿¡æ¯æ³¨åå°æå¡å¨
æå¡åç°(客æ·ç«¯åèµ·æå¡è§£æ请æ±ï¼APPï¼)
æ¥è¯¢æ³¨åä¸å¿ï¼APPï¼ä¸æé£äºæå¡
并åææçæå¡å»ºç«HTTP2é¿é¾æ¥
éè¿Etcdwatchçå¬æå¡ï¼APPï¼ï¼éè¿ååæ´æ°é¾æ¥
è´è½½åè¡¡(客æ·ç«¯å起请æ±ï¼APPï¼)
è´è½½åè¡¡éæ©åéçæå¡ï¼APPHTTP2é¿é¾æ¥ï¼
åèµ·è°ç¨
æå¡æ³¨å(注åæå¡)æºç register.go
funcNewRegister(opt...RegisterOptions)(*Register,error){ s:=&Register{ opts:newOptions(opt...),}varctx,cancel=context.WithTimeout(context.Background(),time.Duration(s.opts.RegisterTtl)*time.Second)defercancel()data,err:=json.Marshal(s.opts)iferr!=nil{ returnnil,err}etcdCli,err:=clientv3.New(s.opts.EtcdConf)iferr!=nil{ returnnil,err}s.etcdCli=etcdCli//ç³è¯·ç§çº¦resp,err:=etcdCli.Grant(ctx,s.opts.RegisterTtl)iferr!=nil{ returns,err}s.name=fmt.Sprintf("%s/%s",s.opts.Node.Path,s.opts.Node.Id)//注åèç¹_,err=etcdCli.Put(ctx,s.name,string(data),clientv3.WithLease(resp.ID))iferr!=nil{ returns,err}//ç»çº¦ç§çº¦s.keepAliveChan,err=etcdCli.KeepAlive(context.Background(),resp.ID)iferr!=nil{ returns,err}returns,nil}å¨etcdéé¢æ们å¯ä»¥çå°å¦ä¸ä¿¡æ¯APPv1çæ¬æå¡å¨èç¹çkey/hwholiday/srv/app/app-beb3cb-eb-eb-d-2cfdc7c
{ "node":{ "name":"app","path":"/hwholiday/srv/app","id":"app-beb3cb-eb-eb-d-2cfdc7c","version":"v1","address":"...:"}}APPv2çæ¬æå¡å¨èç¹çkey/hwholiday/srv/app/app-beb3cb-eb-eb-d-2cfdc7c
{ "node":{ "name":"app","path":"/hwholiday/srv/app","id":"app--eb-eb-c0-2cfdc7c","version":"v2","address":"...:"},}æå¡åç°(客æ·ç«¯åèµ·æå¡è§£æ请æ±ï¼APPï¼)æºç discovery.goå®ç°grpcå çresolver.Builderæ¥å£ï¼Builderå建ä¸ä¸ªè§£æå¨ï¼ç¨äºçè§å称解ææ´æ°ï¼
funcNewDiscovery(opt...ClientOptions)resolver.Builder{ s:=&Discovery{ opts:newOptions(opt...),}etcdCli,err:=clientv3.New(s.opts.EtcdConf)iferr!=nil{ panic(err)}s.etcdCli=etcdClireturns}//Buildå½è°ç¨`grpc.Dial()`æ¶æ§è¡func(d*Discovery)Build(targetresolver.Target,ccresolver.ClientConn,optsresolver.BuildOptions)(resolver.Resolver,error){ d.cc=ccres,err:=d.etcdCli.Get(context.Background(),d.opts.SrvName,clientv3.WithPrefix())iferr!=nil{ returnnil,err}for_,v:=rangeres.Kvs{ iferr=d.AddNode(v.Key,v.Value);err!=nil{ log.Println(err)continue}}gofunc(dd*Discovery){ dd.watcher()}(d)returnd,err}//æ ¹æ®å®æ¹ç建议æ们æä»æ³¨åä¸å¿æ¿å°çæå¡ä¿¡æ¯å¨åå°Attributesä¸//Attributescontainsarbitrarydataabouttheresolverintendedfor//consumptionbytheloadbalancingpolicy.//å±æ§å å«æå ³ä¾è´è½½å¹³è¡¡çç¥ä½¿ç¨ç解æå¨çä»»ææ°æ®ã//Attributes*attributes.Attributesfunc(d*Discovery)AddNode(key,val[]byte)error{ vardata=new(register.Options)err:=json.Unmarshal(val,data)iferr!=nil{ returnerr}addr:=resolver.Address{ Addr:data.Node.Address}addr=SetNodeInfo(addr,data)d.Node.Store(string(key),addr)returnd.cc.UpdateState(resolver.State{ Addresses:d.GetAddress()})}è´è½½åè¡¡(客æ·ç«¯å起请æ±ï¼APPï¼)æºç version_balancer.go
gRPCæä¾äºPickerBuilderåPickeræ¥å£è®©æ们å®ç°èªå·±çè´è½½åè¡¡çç¥
//PickerBuilderå建balancer.PickerãtypePickerBuilderinterface{ //Buildè¿åä¸ä¸ªéæ©å¨ï¼gRPCå°ä½¿ç¨å®æ¥éæ©ä¸ä¸ªSubConnãBuild(infoPickerBuildInfo)balancer.Picker}//gRPC使ç¨Pickeræ¥éæ©ä¸ä¸ªSubConnæ¥åéRPCã//æ¯æ¬¡å¹³è¡¡å¨çå é¨ç¶æåçååæ¶ï¼å®é½ä¼ä»å®çå¿«ç §ä¸çæä¸ä¸ªæ°çéæ©å¨ã//gRPC使ç¨çéæ©å¨å¯ä»¥éè¿ClientConn.UpdateState()æ´æ°ãtypePickerinterface{ //éæ©åéçåé¾æ¥åé请æ±Pick(infoPickInfo)(PickResult,error)}ä»ä¸é¢å¾ç¥æ们å¯ä»¥å¹²äºçå°æ¹å¨Buildæ¹æ³æè Pickæ¹æ³ï¼è°ç¨gRPCæ¹æ³æ¶å æ§è¡Buildåæ§è¡Pickï¼
Build(infoPickerBuildInfo)balancer.Pickerinfoéé¢ææå¡çé¾æ¥ï¼åé¾æ¥å¯¹åºçååéè¿AddNodeæ¹æ³åå ¥çæå¡ä¿¡æ¯è¿éæ们å¯ä»¥åºäºgrpc-clientå±é¢æ¥åè´è½½ï¼æ¯å¦ï¼å æéæºè´è½½ï¼
Pick(infoPickInfo)(PickResult,error)infoéé¢æè°ç¨çæ¹æ³ååcontext.Contextéè¿context.Contextæ们å¯ä»¥è·å¾è¿ä¸ªæ¥è·åå起请æ±çæ¶åå¡«å ¥çåæ°ï¼è¿æ ·æ们å¯ä»¥å¾çµæ´»çé对æ¯ä¸ªæ¹æ³è¿è¡ä¸åçè´è½½è¿éæ们å¯ä»¥åºäºgrpc-client-apiå±é¢æ¥åè´è½½
func(*rrPickerBuilder)Build(infobase.PickerBuildInfo)balancer.Picker{ iflen(info.ReadySCs)==0{ returnbase.NewErrPicker(balancer.ErrNoSubConnAvailable)}varscs=make(map[balancer.SubConn]*register.Options,len(info.ReadySCs))forconn,addr:=rangeinfo.ReadySCs{ nodeInfo:=GetNodeInfo(addr.Address)ifnodeInfo!=nil{ scs[conn]=nodeInfo}}iflen(scs)==0{ returnbase.NewErrPicker(balancer.ErrNoSubConnAvailable)}return&rrPicker{ node:scs,}}func(p*rrPicker)Pick(infobalancer.PickInfo)(balancer.PickResult,error){ p.mu.Lock()deferp.mu.Unlock()version:=info.Ctx.Value("version")varsubConns[]balancer.SubConnforconn,node:=rangep.node{ ifversion!=""{ ifnode.Node.Version==version.(string){ subConns=append(subConns,conn)}}}iflen(subConns)==0{ returnbalancer.PickResult{ },errors.New("nomatchfoundconn")}index:=rand.Intn(len(subConns))sc:=subConns[index]returnbalancer.PickResult{ SubConn:sc},nil}客æ·ç使ç¨æ们å®ä¹çversionè´è½½åè¡¡çç¥r:=discovery.NewDiscovery(discovery.SetName("hwholiday.srv.app"),discovery.SetEtcdConf(clientv3.Config{ Endpoints:[]string{ "...:"},DialTimeout:time.Second*5,}))resolver.Register(r)//è¿æ¥æå¡å¨conn,err:=grpc.Dial("hwholiday.srv.app",//没æ使ç¨è¿ä¸ªåæ°grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "LoadBalancingPolicy":"%s"}`,"version")),grpc.WithInsecure(),)iferr!=nil{ log.Fatalf("net.Connecterr:%v",err)}deferconn.Close()//è°ç¨æå¡apiClient:=api.NewApiClient(conn)ctx:=context.WithValue(context.Background(),"version","v1")_,err=apiClient.ApiTest(ctx,&api.Request{ Input:"v1v1v1v1v1"})iferr!=nil{ fmt.Println(err)}è¿è¡æææµè¯æºç
è¿è¡APPæå¡v1,è°ç¨grpc-client使ç¨v1
APPæå°
å¯å¨æå===>0.0.0.0:
input:"v1v1v1v1v1"
grpc-clientæå°
===RUNTestClient
v1v1v1v1v1v1v1v1v1v1
è¿è¡APPæå¡v1,è°ç¨grpc-client使ç¨v2
APPæå°
å¯å¨æå===>0.0.0.0:
grpc-clientæå°
===RUNTestClient
rpcerror:code=Unavailabledesc=nomatchfoundconn
æ»ç»è¯¦æ ä»ç»å°å
æºç å°å:/hwholiday/learning_tools/tree/master/etcd
éè¿å¦ä¹ æ们å¯ä»¥å®ç°åºäºversionçè´è½½çç¥ï¼è¿éåªæ¯æä¾ä¸ç§æè·¯æä¹å»å®ç°å¯è½æçè¿ä¸ªä¾åä¸å¤ªéåè¿ä¸ªï¼ä½æ¯æä¾äºä¸ç§æè·¯ï¼æ¬¢è¿ä¸èµ·è®¨è®ºã
gRPC 流量控制详解
gRPC 流量控制详解
流量控制, 一般来说指的是在网络传输中, 发送者主动限制自身发送数据的速率或发送的数据量, 以适应接收者处理数据的速度. 当接收者的处理速度较慢时, 来不及处理的数据会被存放在内存中, 而当内存中的数据缓存区被填满之后, 新收到的数据就会被扔掉, 导致发送者不得不重新发送, 就会造成网络带宽的浪费.
流量控制是一个网络组件的基本功能, 我们熟知的 TCP 协议就规定了流量控制算法. gRPC 建立在 TCP 之上, 也依赖于 HTTP/2 WindowUpdate Frame 实现了自己在应用层的流量控制.
在 gRPC 中, 流量控制体现在三个维度:
采样流量控制: gRCP 接收者检测一段时间内收到的数据量, 从而推测出 on-wire 的数据量, 并指导发送者调整流量控制窗口.
Connection level 流量控制: 发送者在初始化时被分配一个 quota (额度), quota 随数据发送减少, 并在收到接收者的反馈之后增加. 发送者在耗尽 quota 之后不能再发送数据.
Stream level 流量控制: 和 connection level 的流量控制类似, 只不过 connection level 管理的是一个发送者和一个接收者之间的全部流量, 而 stream level 管理的是 connection 中诸多 stream 中的一个.
在本篇剩余的部分中, 我们将结合代码一起来看看这三种流量控制的实现原理和实现细节.
本篇中的源代码均来自 /grpc/grpc-go, 并且为了方便展示, 在不影响表述的前提下截断了部分代码.
流量控制是双向的, 为了减少冗余的叙述, 在本篇中我们只讲述 gRPC 是如何控制 server 所发送的流量的.
gRPC 中的流量控制仅针对 HTTP/2 data frame.
采样流量控制原理采样流量控制, 准确来说应该叫做 BDP 估算和动态流量控制窗口, 是一种通过在接收端收集数据, 以决定发送端流量控制窗口大小的流量控制方法. 以下内容翻译自 gRPC 的一篇官方博客, 介绍了采样流量控制的意义和原理.
BDP 估算和动态流量控制这个 feature 缩小了 gRPC 和 HTTP/1.1 在高延迟网络环境下的性能差距.
Bandwidth Delay Product (BDP), 即带宽延迟积, 是网络连接的带宽和数据往返延迟的乘积. BDP 能够有效地告诉我们, 如果充分利用了网络连接, 那么在某一刻在网络连接上可以存在多少字节的数据.
计算 BDP 并进行相应调整的算法最开始是由 @ejona 提出的, 后来由 gRPC-C Core 和 gRPC-Java 实现. BDP 的想法简单而实用: 每次接收者得到一个 data frame, 它就会发出一个 BDP ping frame (一个只有 BDP 估算器使用的 ping frame). 之后, 接收者会统计指导收到 ACK 之前收到的字节数. 这个大约在 1.5RTT (往返时间) 中收到的所有字节的总和是有效 BDP1.5 的近似值. 如果该值接近当前流量窗口的大小 (例如超过 2/3), 接收者就需要增加窗口的大小. 窗口的大小被设定为 BDP (所有采样期间接受到的字节总和) 的两倍.
BDP 采样目前在 gRPC-go 的 server 端是默认开启的.
结合代码, 一起来看看具体的实现方式.
代码分析我们以 client 发送 BDP ping 给 server, 并决定 server 端的流量控制窗口为例.
在 gRPC-go 中定义了一个bdpEstimator , 是用来计算 BDP 的核心:
type?bdpEstimator?struct?{ //?sentAt?is?the?time?when?the?ping?was?sent.sentAt?time.Timemu?sync.Mutex//?bdp?is?the?current?bdp?estimate.bdp?uint//?sample?is?the?number?of?bytes?received?in?one?measurement?cycle.sample?uint//?bwMax?is?the?maximum?bandwidth?noted?so?far?(bytes/sec).bwMax?float//?bool?to?keep?track?of?the?beginning?of?a?new?measurement?cycle.isSent?bool//?Callback?to?update?the?window?sizes.updateFlowControl?func(n?uint)//?sampleCount?is?the?number?of?samples?taken?so?far.sampleCount?uint//?round?trip?time?(seconds)rtt?float}bdpEstimator 有两个主要的方法 add 和 calculate :
//?add?的返回值指示?是否发送?BDP?ping?frame?给?serverfunc?(b?*bdpEstimator)?add(n?uint)?bool?{ b.mu.Lock()defer?b.mu.Unlock()//?如果?bdp?已经达到上限,?就不再发送?BDP?ping?进行采样if?b.bdp?==?bdpLimit?{ return?false}//?如果在当前时间点没有?BDP?ping?frame?发送出去,?就应该发送,?来进行采样if?!b.isSent?{ b.isSent?=?trueb.sample?=?nb.sentAt?=?time.Time{ }b.sampleCount++return?true}//?已经有?BDP?ping?frame?发送出去了,?但是还没有收到?ACKb.sample?+=?nreturn?false}add 函数有两个作用:
决定 client 在接收到数据时是否开始采样.
记录采样开始的时间和初始数据量.
func?(t?*ing?flow?control?windows//?for?the?transport?and?the?stream?based?on?the?current?bdp//?estimation.func?(t?*ingWindowUpdateHandler?负责处理来自?client?的?window?update?framefunc?(l?*loopyWriter)?incomingWindowUpdateHandler(w?*incomingWindowUpdate)?error?{ if?w.streamID?==?0?{ //?增加?quotal.sendQuota?+=?w.incrementreturn?nil}......}sendQuota 在接收到来自 client 的 window update 后增加.
//?processData?负责发送?data?frame?给?clientfunc?(l?*loopyWriter)?processData()?(bool,?error)?{ ......//?根据发送的数据量减少?sendQuotal.sendQuota?-=?uint(size)......}并且 server 在发送数据时会减少 sendQuota .
Client 端//?add?的返回值指示?是否发送?BDP?ping?frame?给?serverfunc?(b?*bdpEstimator)?add(n?uint)?bool?{ b.mu.Lock()defer?b.mu.Unlock()//?如果?bdp?已经达到上限,?就不再发送?BDP?ping?进行采样if?b.bdp?==?bdpLimit?{ return?false}//?如果在当前时间点没有?BDP?ping?frame?发送出去,?就应该发送,?来进行采样if?!b.isSent?{ b.isSent?=?trueb.sample?=?nb.sentAt?=?time.Time{ }b.sampleCount++return?true}//?已经有?BDP?ping?frame?发送出去了,?但是还没有收到?ACKb.sample?+=?nreturn?false}0trInFlow 是 client 端控制是否发送 window update 的核心. 值得注意的是 client 端是否发送 window update 只取决于已经接收到的数据量, 而管这些数据是否被某些 stream 读取. 这一点是 gRPC 在流量控制中的优化, 即因为多个 stream 共享同一个 connection, 不应该因为某个 stream 读取数据较慢而影响到 connection level 的流量控制, 影响到其他 stream.
//?add?的返回值指示?是否发送?BDP?ping?frame?给?serverfunc?(b?*bdpEstimator)?add(n?uint)?bool?{ b.mu.Lock()defer?b.mu.Unlock()//?如果?bdp?已经达到上限,?就不再发送?BDP?ping?进行采样if?b.bdp?==?bdpLimit?{ return?false}//?如果在当前时间点没有?BDP?ping?frame?发送出去,?就应该发送,?来进行采样if?!b.isSent?{ b.isSent?=?trueb.sample?=?nb.sentAt?=?time.Time{ }b.sampleCount++return?true}//?已经有?BDP?ping?frame?发送出去了,?但是还没有收到?ACKb.sample?+=?nreturn?false}1这里 limit * 1/4 的限制其实是可以浮动的, 因为 limit 的数值会随着 server 端发来的 window update 而改变.
Stream level 流量控制原理Stream level 的流量控制和 connection level 的流量控制原理基本上一致的, 主要的区别有两点:
Stream level 的流量控制中的 quota 只针对单个 stream. 每个 stream 即受限于 stream level 流量控制, 又受限于 connection level 流量控制.
Client 端决定反馈给 server window update frame 的时机更复杂一点.
Stream level 的流量控制不光要记录已经收到的数据量, 还需要记录被 stream 消费掉的数据量, 以达到更加精准的流量控制. 实际上, client 会记录:
pendingData: stream 收到但还未被应用消费 (未被读取) 的数据量.
pendingUpdate: stream 收到且已经被应用消费 (已被读取) 的数据量.
limit: stream 能接受的数据上限, 被初始为 字节, 受到采样流量控制的影响.
delta: delta 是在 limit 基础上额外增加的数据量, 当应用试着去读取超过 limit 大小的数据是, 会临时在 limit 上增加 delta, 来允许应用读取数据.
Client 端的逻辑是这样的:
每当 client 接收到来自 server 的 data frame 的时候, pendingData += 接收到的数据量 .
每当 application 在从 stream 中读取数据之前 (即 pendingData 将被消费的时候),