gRPCå ¥åè®°
æ¦è¦
ç±äºgRPC主è¦æ¯è°·æå¼åçï¼ç±äºä¸äºå·²ç¥çåå ï¼gRPCè·demoè¿æ¯ä¸é£ä¹é¡ºå©çãåç¬åè¿ä¸ç¯ï¼ä¸»è¦æ¯gRPCå®è£ è¿ç¨ä¸çå太å¤äºï¼è®°å½ä¸æ¥è®©å¤§å®¶å°èµ°å¼¯è·¯ã
主è¦çåï¼
æ¬æ讲解gRPC demoçåæ¶ï¼ä¼ä»ç»å¦ä½è§£å³è¿äºåãæ¬æ对åºçGithubå°åï¼blogs.com/fhy/p/.html
(æ¬æå®)
RocketMQ 5.0: POP 消费模式 原理详解 & 源码解析
RocketMQ 5.0 引入 Pop 消费模式,源码优化用于解决 Push 消费模式存在的源码优化痛点。Pop 消费模式将客户端的源码优化重平衡逻辑迁移至 Broker 端,使得消息消费过程更加高效,源码优化避免消息堆积和横向扩展能力受限的源码优化图像识别 ocr 源码问题。引入轻量化客户端后,源码优化通过 gRPC 封装 Pop 消费接口,源码优化实现了多语言支持,源码优化无需在客户端实现重平衡逻辑。源码优化
Pop 消费模式的源码优化原理在于客户端仅需发送 Pop 请求,由 Broker 端根据请求分配消息队列并返回消息。源码优化这样可以实现多客户端同时消费同一队列,源码优化避免单一客户端挂起导致消息堆积,源码优化同时也消除了频繁重平衡导致的源码优化消息积压问题。
Pop 消费流程涉及消息拉取、不可见时间管理、消费失败处理和消息重试等关键环节。消息拉取时,系统会为一批消息生成 CheckPoint,并在 Broker 内存中保存,以便与 ACK 消息匹配。消息不可见时间机制确保在规定时间内未被 ACK 的消息将被重试。消费失败时,客户端通过修改消息不可见时间来调整重试策略。当消费用时超过预设时间,Broker 也会将消息放入重试队列。通过定时消息,名片系统源码PHPBroker 可以提前消费重试队列中的消息,与 ACK 消息匹配,实现高效消息处理。
在 Broker 端,重平衡逻辑也进行了优化。Pop 模式的重平衡允许多个消费者同时消费同一队列,通过 popShareQueueNum 参数配置额外的负载获取队列次数。Pop 消息处理涉及从队列中 POP 消息、生成 CheckPoint 用于匹配 ACK 消息、以及存储 CheckPoint 与 Ack 消息匹配。Broker 端还通过 PopBufferMergeService 线程实现内存与磁盘中的 CheckPoint 和 Ack 消息匹配,以及消息重试处理。
源码解析部分涉及 Broker 端的重平衡逻辑、Pop 消息处理、Ack 消息处理、CheckPoint 与 Ack 消息匹配逻辑等关键组件的实现细节,这些细节展示了 RocketMQ 5.0 如何通过优化消费模式和流程设计,提升消息消费的效率和稳定性。
C++快速集成gRPC的几种方式介绍(内含预编译库下载)
集成gRPC到C++的途径多种多样,但每种方法都需要额外的步骤和资源投入。本文将对不同方式的集成进行介绍,并提供预编译库下载链接,帮助开发者简化步骤,更快上手。
官方提供的gRPC安装方式包括源码编译等,适合深入理解gRPC内部结构的开发者。对于希望快速集成gRPC的delphi xe 10.2.3 源码初学者,推荐直接使用预编译库。
推荐使用vcpkg进行预编译库的下载。vcpkg能够简化库的下载与配置过程,极大降低集成难度。若条件允许,自行下载与编译库以加深对gRPC的理解。
对于Windows用户,推荐下载名为grpc-vcpkg-repo-windows-x.7z的预编译库。在使用cmake构建时,需设置DCMAKE_TOOLCHAIN_FILE环境变量。针对使用Clion的开发者,vcpkg.cmake工具能自动完成大部分配置,快速搭建开发环境。
对于基于包管理系统的Linux发行版用户,推荐使用系统包管理器安装gRPC,同时可以选择下载预编译库以简化安装过程。
对于Linux ARM版本用户,推荐下载名为grpc-vcpkg-repo-linux-arm.7z的预编译库。请自行确保编译环境满足库的运行需求。若需要其他特定内核版本或系统环境支持的编译服务,可联系作者微信lnl,费用为一杯咖啡。
C++_GRPC使用讲解-编译,开发环境搭建
特别强调,grpc对gcc/g++版本有要求,需6.3及以上,低于此版本需升级。贷款app源码下载首先,确保安装必要的依赖工具。1. 安装依赖工具
如cmake低于3.或gcc/g++低于7.0,请按文档进行更新。cmake推荐安装最新版本(最低3.)。
卸载旧版CMake后,解压下载的cmake包,bin目录包含cmake家族工具。
创建软链接,通常选择/opt或/usr路径。
2. gcc/g++升级
务必升级到6.3以上,版本7.0以上无需重复。安装7.0版本,确认版本显示为7.5。3. 编译grpc
推荐使用cmake编译,对网络有依赖。如果无法访问外部资源,可使用我提供的1..2版本压缩包编译,否则从源码开始下载。下载源码,选择v1..2或其他相应版本。
编译过程中会自动处理protobuf依赖,无需单独安装。
编译完成后,测试helloworld服务和客户端。
4. 辅助工具-scp命令
scp命令用于服务器间文件传输,提供下载和上传文件/目录的短视频合集源码功能,但非课程重点。下载:scp username@ip:/path/to/file local/path
上传:scp local/path username@ip:/path/to/destination
下载目录:scp -r username@ip:/path/to/directory local/path
上传目录:scp -r local/path username@ip:/path/to/destination
获取grpc-v1..2源码包,可通过群组获取。gRPC 流量控制详解
gRPC 流量控制详解
流量控制, 一般来说指的是在网络传输中, 发送者主动限制自身发送数据的速率或发送的数据量, 以适应接收者处理数据的速度. 当接收者的处理速度较慢时, 来不及处理的数据会被存放在内存中, 而当内存中的数据缓存区被填满之后, 新收到的数据就会被扔掉, 导致发送者不得不重新发送, 就会造成网络带宽的浪费.
流量控制是一个网络组件的基本功能, 我们熟知的 TCP 协议就规定了流量控制算法. gRPC 建立在 TCP 之上, 也依赖于 HTTP/2 WindowUpdate Frame 实现了自己在应用层的流量控制.
在 gRPC 中, 流量控制体现在三个维度:
采样流量控制: gRCP 接收者检测一段时间内收到的数据量, 从而推测出 on-wire 的数据量, 并指导发送者调整流量控制窗口.
Connection level 流量控制: 发送者在初始化时被分配一个 quota (额度), quota 随数据发送减少, 并在收到接收者的反馈之后增加. 发送者在耗尽 quota 之后不能再发送数据.
Stream level 流量控制: 和 connection level 的流量控制类似, 只不过 connection level 管理的是一个发送者和一个接收者之间的全部流量, 而 stream level 管理的是 connection 中诸多 stream 中的一个.
在本篇剩余的部分中, 我们将结合代码一起来看看这三种流量控制的实现原理和实现细节.
本篇中的源代码均来自 /grpc/grpc-go, 并且为了方便展示, 在不影响表述的前提下截断了部分代码.
流量控制是双向的, 为了减少冗余的叙述, 在本篇中我们只讲述 gRPC 是如何控制 server 所发送的流量的.
gRPC 中的流量控制仅针对 HTTP/2 data frame.
采样流量控制原理采样流量控制, 准确来说应该叫做 BDP 估算和动态流量控制窗口, 是一种通过在接收端收集数据, 以决定发送端流量控制窗口大小的流量控制方法. 以下内容翻译自 gRPC 的一篇官方博客, 介绍了采样流量控制的意义和原理.
BDP 估算和动态流量控制这个 feature 缩小了 gRPC 和 HTTP/1.1 在高延迟网络环境下的性能差距.
Bandwidth Delay Product (BDP), 即带宽延迟积, 是网络连接的带宽和数据往返延迟的乘积. BDP 能够有效地告诉我们, 如果充分利用了网络连接, 那么在某一刻在网络连接上可以存在多少字节的数据.
计算 BDP 并进行相应调整的算法最开始是由 @ejona 提出的, 后来由 gRPC-C Core 和 gRPC-Java 实现. BDP 的想法简单而实用: 每次接收者得到一个 data frame, 它就会发出一个 BDP ping frame (一个只有 BDP 估算器使用的 ping frame). 之后, 接收者会统计指导收到 ACK 之前收到的字节数. 这个大约在 1.5RTT (往返时间) 中收到的所有字节的总和是有效 BDP1.5 的近似值. 如果该值接近当前流量窗口的大小 (例如超过 2/3), 接收者就需要增加窗口的大小. 窗口的大小被设定为 BDP (所有采样期间接受到的字节总和) 的两倍.
BDP 采样目前在 gRPC-go 的 server 端是默认开启的.
结合代码, 一起来看看具体的实现方式.
代码分析我们以 client 发送 BDP ping 给 server, 并决定 server 端的流量控制窗口为例.
在 gRPC-go 中定义了一个bdpEstimator , 是用来计算 BDP 的核心:
type?bdpEstimator?struct?{ //?sentAt?is?the?time?when?the?ping?was?sent.sentAt?time.Timemu?sync.Mutex//?bdp?is?the?current?bdp?estimate.bdp?uint//?sample?is?the?number?of?bytes?received?in?one?measurement?cycle.sample?uint//?bwMax?is?the?maximum?bandwidth?noted?so?far?(bytes/sec).bwMax?float//?bool?to?keep?track?of?the?beginning?of?a?new?measurement?cycle.isSent?bool//?Callback?to?update?the?window?sizes.updateFlowControl?func(n?uint)//?sampleCount?is?the?number?of?samples?taken?so?far.sampleCount?uint//?round?trip?time?(seconds)rtt?float}bdpEstimator 有两个主要的方法 add 和 calculate :
//?add?的返回值指示?是否发送?BDP?ping?frame?给?serverfunc?(b?*bdpEstimator)?add(n?uint)?bool?{ b.mu.Lock()defer?b.mu.Unlock()//?如果?bdp?已经达到上限,?就不再发送?BDP?ping?进行采样if?b.bdp?==?bdpLimit?{ return?false}//?如果在当前时间点没有?BDP?ping?frame?发送出去,?就应该发送,?来进行采样if?!b.isSent?{ b.isSent?=?trueb.sample?=?nb.sentAt?=?time.Time{ }b.sampleCount++return?true}//?已经有?BDP?ping?frame?发送出去了,?但是还没有收到?ACKb.sample?+=?nreturn?false}add 函数有两个作用:
决定 client 在接收到数据时是否开始采样.
记录采样开始的时间和初始数据量.
func?(t?*ing?flow?control?windows//?for?the?transport?and?the?stream?based?on?the?current?bdp//?estimation.func?(t?*ingWindowUpdateHandler?负责处理来自?client?的?window?update?framefunc?(l?*loopyWriter)?incomingWindowUpdateHandler(w?*incomingWindowUpdate)?error?{ if?w.streamID?==?0?{ //?增加?quotal.sendQuota?+=?w.incrementreturn?nil}......}sendQuota 在接收到来自 client 的 window update 后增加.
//?processData?负责发送?data?frame?给?clientfunc?(l?*loopyWriter)?processData()?(bool,?error)?{ ......//?根据发送的数据量减少?sendQuotal.sendQuota?-=?uint(size)......}并且 server 在发送数据时会减少 sendQuota .
Client 端//?add?的返回值指示?是否发送?BDP?ping?frame?给?serverfunc?(b?*bdpEstimator)?add(n?uint)?bool?{ b.mu.Lock()defer?b.mu.Unlock()//?如果?bdp?已经达到上限,?就不再发送?BDP?ping?进行采样if?b.bdp?==?bdpLimit?{ return?false}//?如果在当前时间点没有?BDP?ping?frame?发送出去,?就应该发送,?来进行采样if?!b.isSent?{ b.isSent?=?trueb.sample?=?nb.sentAt?=?time.Time{ }b.sampleCount++return?true}//?已经有?BDP?ping?frame?发送出去了,?但是还没有收到?ACKb.sample?+=?nreturn?false}0trInFlow 是 client 端控制是否发送 window update 的核心. 值得注意的是 client 端是否发送 window update 只取决于已经接收到的数据量, 而管这些数据是否被某些 stream 读取. 这一点是 gRPC 在流量控制中的优化, 即因为多个 stream 共享同一个 connection, 不应该因为某个 stream 读取数据较慢而影响到 connection level 的流量控制, 影响到其他 stream.
//?add?的返回值指示?是否发送?BDP?ping?frame?给?serverfunc?(b?*bdpEstimator)?add(n?uint)?bool?{ b.mu.Lock()defer?b.mu.Unlock()//?如果?bdp?已经达到上限,?就不再发送?BDP?ping?进行采样if?b.bdp?==?bdpLimit?{ return?false}//?如果在当前时间点没有?BDP?ping?frame?发送出去,?就应该发送,?来进行采样if?!b.isSent?{ b.isSent?=?trueb.sample?=?nb.sentAt?=?time.Time{ }b.sampleCount++return?true}//?已经有?BDP?ping?frame?发送出去了,?但是还没有收到?ACKb.sample?+=?nreturn?false}1这里 limit * 1/4 的限制其实是可以浮动的, 因为 limit 的数值会随着 server 端发来的 window update 而改变.
Stream level 流量控制原理Stream level 的流量控制和 connection level 的流量控制原理基本上一致的, 主要的区别有两点:
Stream level 的流量控制中的 quota 只针对单个 stream. 每个 stream 即受限于 stream level 流量控制, 又受限于 connection level 流量控制.
Client 端决定反馈给 server window update frame 的时机更复杂一点.
Stream level 的流量控制不光要记录已经收到的数据量, 还需要记录被 stream 消费掉的数据量, 以达到更加精准的流量控制. 实际上, client 会记录:
pendingData: stream 收到但还未被应用消费 (未被读取) 的数据量.
pendingUpdate: stream 收到且已经被应用消费 (已被读取) 的数据量.
limit: stream 能接受的数据上限, 被初始为 字节, 受到采样流量控制的影响.
delta: delta 是在 limit 基础上额外增加的数据量, 当应用试着去读取超过 limit 大小的数据是, 会临时在 limit 上增加 delta, 来允许应用读取数据.
Client 端的逻辑是这样的:
每当 client 接收到来自 server 的 data frame 的时候, pendingData += 接收到的数据量 .
每当 application 在从 stream 中读取数据之前 (即 pendingData 将被消费的时候),
在Windows搭建gRPC C++开发环境
在Windows下搭建gRPC C++开发环境,并开发、配置简单的服务端及.net客户端的步骤如下:
1、下载gRPC源码:
通过git命令行在预设目录下载gRPC 1..0版本。
2、生成工程文件:
使用CMake生成工程文件,需调整选项包括添加ABSL_PROPAGATE_CXX_STD为true,调整zlib依赖版本至2.8.,设置CMAKE_INSTALL_PREFIX以指定安装目录。
3、编译、安装gRPC:
使用Visual Studio 编译安装,设置为Release x生成ALL_BUILD和INSTALL项目,确保bin目录路径添加到环境变量Path中。
4、创建测试工程:
创建解决方案GRPCTest,包含c++空项目CPPServer与.Net 控制台项目DotNetClient,将protos文件夹及helloworld.proto文件导入。
5、编译proto文件:
使用命令行生成c++及c#文件,确保执行路径正确。
6、生成CPPServer项目:
将greeter_server.cc文件拷贝至CPPServer目录,并添加相关文件及目录,配置包含及附加库。
7、生成DotNetClient:
通过Nuget安装所需包,并将Helloworld相关文件添加到DotNetClient项目中,编辑Program.cs并编译。
8、测试:
运行CPPServer.exe与DotNetClient.exe进行测试,验证服务端与客户端通信是否正常。
Alluxio 客户端源码分析
Alluxio是一个用于云分析和人工智能的开源数据编排技术,作为分布式文件系统,采用与HDFS相似的主从架构。系统中包含一个或多个Master节点存储集群元数据信息,以及Worker节点管理缓存的数据块。本文将深入分析Alluxio客户端的实现。
创建客户端逻辑在类alluxio.client.file.FileSystem中,简单示例代码如下。
客户端初始化包括调用FileSystem.Context.create创建客户端对象的上下文,在此过程中需要初始化客户端以创建与Master和Worker连接的连接池。若启用了配置alluxio.user.metrics.collection.enabled,将启动后台守护线程定时与Master节点进行心跳传输监控指标信息。同时,客户端初始化时还会创建负责重新初始化的后台线程,定期从Master拉取配置文件的哈希值,若Master节点配置发生变化,则重新初始化客户端,期间阻塞所有请求直到重新初始化完成。
创建具有缓存功能的客户端在客户端初始化后,调用FileSystem.Factory.create进行客户端创建。客户端实现分为BaseFileSystem、MetadataCachingBaseFileSystem和LocalCacheFileSystem三种,其中MetadataCachingBaseFileSystem和LocalCacheFileSystem对BaseFileSystem进行封装,提供元数据和数据缓存功能。BaseFileSystem的调用主要分为三大类:纯元数据操作、读取文件操作和写入文件操作。针对元数据操作,直接调用对应GRPC接口(例如listStatus)。接下来,将介绍客户端如何与Master节点进行通信以及读取和写入的流程。
客户端需要先通过MasterInquireClient接口获取主节点地址,当前有三种实现:PollingMasterInquireClient、SingleMasterInquireClient和ZkMasterInquireClient。其中,PollingMasterInquireClient是针对嵌入式日志模式下选择主节点的实现类,SingleMasterInquireClient用于选择单节点Master节点,ZkMasterInquireClient用于Zookeeper模式下的主节点选择。因为Alluxio中只有主节点启动GRPC服务,其他节点连接客户端会断开,PollingMasterInquireClient会依次轮询所有主节点,直到找到可以连接的节点。之后,客户端记录该主节点,如果无法连接主节点,则重新调用PollingMasterInquireClient过程以连接新的主节点。
数据读取流程始于BaseFileSystem.openFile函数,首先通过getStatus向Master节点获取文件元数据,然后检查文件是否为目录或未写入完成等条件,若出现异常则抛出异常。寻找合适的Worker节点根据getStatus获取的文件信息中包含所有块的信息,通过偏移量计算当前所需读取的块编号,并寻找最接近客户端并持有该块的Worker节点,从该节点读取数据。判断最接近客户端的Worker逻辑位于BlockLocationUtils.nearest,考虑使用domain socket进行短路读取时的Worker节点地址一致性。根据配置项alluxio.worker.data.server.domain.socket.address,判断每个Worker使用的domain socket路径是否一致。如果没有使用域名socket信息寻找到最近的Worker节点,则根据配置项alluxio.user.ufs.block.read.location.policy选择一个Worker节点进行读取。若客户端和数据块在同一节点上,则通过短路读取直接从本地文件系统读取数据,否则通过与Worker节点建立GRPC通信读取文件。
如果无法通过短路读取数据,客户端会回退到使用GRPC连接与选中的Worker节点通信。首先判断是否可以通过domain socket连接Worker节点,优先选择使用domain socket方式。创建基于GRPC的块输入流代码位于BlockInStream.createGrpcBlockInStream。通过GRPC进行连接时,每次读取一个chunk大小并缓存chunk,减少RPC调用次数提高性能,chunk大小由配置alluxio.user.network.reader.chunk.size.bytes决定。
读取数据块完成后或出现异常终止,Worker节点会自动释放针对该块的写入锁。读取异常处理策略是记录失败的Worker节点,尝试从其他Worker节点读取,直到达到重试次数上限或没有可用的Worker节点。
若无法通过本地Worker节点读取数据,则客户端尝试发起异步缓存请求。若启用了配置alluxio.user.file.passive.cache.enabled且存在本地Worker节点,则向本地Worker节点发起异步缓存请求,否则向负责读取该块数据的Worker节点发起请求。
数据写入流程首先向Master节点发送CreateFile请求,Master验证请求合法性并返回新文件的基本信息。根据不同的写入类型,进行不同操作。如果是THROUGH或CACHE_THROUGH等需要直接写入底层文件系统的写入类型,则选择一个Worker节点处理写入到UFS的数据。对于MUST_CACHE、CACHE_THROUGH、ASYNC_THROUGH等需要缓存数据到Worker节点上的写入类型,则打开另一个流负责将每个写入的块缓存到不同的Worker上。写入worker缓存块流程类似于读取流程,若写入的Worker与客户端在同一个主机上,则使用短路写直接将块数据写入Worker本地,无需通过网络发送到Worker上。数据完成写入后,客户端向Master节点发送completeFile请求,表示文件已写入完成。
写入失败时,取消当前流以及所有使用过的输出流,删除所有缓存的块和底层存储中的数据,与读取流程不同,写入失败后不进行重试。
零拷贝实现用于优化写入和读取流程中WriteRequest和ReadResponse消息体积大的问题,通过配置alluxio.user.streaming.zerocopy.enabled开启零拷贝特性。Alluxio通过实现了GRPC的MethodDescriptor.Marshaller和Drainable接口来实现GRPC零拷贝特性。MethodDescriptor.Marshaller负责对消息序列化和反序列化的抽象,用于自定义消息序列化和反序列化行为。Drainable扩展java.io.InputStream,提供将所有内容转移到OutputStream的方法,避免数据拷贝,优化内容直接写入OutputStream的过程。
总结,阅读客户端代码有助于了解Alluxio体系结构,明白读取和写入数据时的数据流向。深入理解Alluxio客户端实现对于后续阅读其他Alluxio代码非常有帮助。
2024-11-30 14:07
2024-11-30 13:30
2024-11-30 13:04
2024-11-30 12:28
2024-11-30 12:00