欢迎来到皮皮网网站!

【smali源码】【溯源码查询批号】【多线程编程源码】heartbeat源码

时间:2025-01-04 08:20:38 来源:11的源码

1.RocketMQ第五讲
2.Spring Cloud Eureka源码分析之心跳续约及自我保护机制
3.心脏滴血漏洞:OpenSSL中的一个漏洞如何导致安全危机
4.YARN源码剖析:NM启动过程

heartbeat源码

RocketMQ第五讲

       broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,smali源码把结果返回给消费者。

        消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为位,左边补零,剩余为起始偏移量,比如代表了第一个文件,起始偏移量为0,文件大小为1G=;当第一个文件写满了,第二个文件为,起始偏移量为,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

        CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

        broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是溯源码查询批号根据Topic来消费,会用到ConsumerQueue索引。

        也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

        引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

        其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{ topic}/{ queueId}/{ fileName}。同样consumequeue文件采取定长设计,每一个条目共个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.M。

        IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: { fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为M,一个IndexFile可以保存 W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的多线程编程源码索引文件其底层实现为hash索引。

        按照Message Key查询消息的时候,会用到这个索引文件。

        IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: { fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于+W 4+W = 个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

        其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图, Byte 的Header用于保存一些总的统计信息,4 W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。 W 是真正的索引数据,即一个 Index File 可以保存 W个索引。

        “按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

        RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,智慧养老源码平台Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。

        属性和方法很多,就不往这里放了。

        文件存储实现类,包括多个内部类

        · 对于文件夹下的一个文件

        上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

        RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型

        上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、rsi底背离源码编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

        上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

        AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

        FileWatchService:

        NettyEventExecutor:

        NettyNIOBoss_:一个

        NettyServerNIOSelector_:默认为三个

        NSScheduledThread:定时任务线程

        ServerHouseKeepingService:守护线程

        ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

        RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

        AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

        RocketmqBrokerAppender_inner

        RocketmqFilterAppender_inner

        RocketmqProtectionAppender_inner

        RocketmqRemotingAppender_inner

        RocketmqRebalanceLockAppender_inner

        RocketmqStoreAppender_inner

        RocketmqStoreErrorAppender_inner

        RocketmqWaterMarkAppender_inner

        RocketmqTransactionAppender_inner

        SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

        PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

        ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

        QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

        AdminBrokerThread_:remotingServer.registerDefaultProcessor

        ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

        HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

        EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

        ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

        brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

        ==================================================================

        BrokerControllerScheduledThread:=>

        BrokerController.this.getBrokerStats().record();

        BrokerController.this.consumerOffsetManager.persist();

        BrokerController.this.consumerFilterManager.persist();

        BrokerController.this.protectBroker();

        BrokerController.this.printWaterMark();

        log.info("dispatch behind commit log { } bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

        BrokerController.this.printMasterAndSlaveDiff();

        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

        BrokerFastFailureScheduledThread:=>

        FilterServerManagerScheduledThread:=>

        FilterServerManager.this.createFilterServer();

        ClientHousekeepingScheduledThread:=>

        ClientHousekeepingService.this.scanExceptionChannel();

        PullRequestHoldService

        FileWatchService

        AllocateMappedFileService

        AcceptSocketService

        BrokerStatsThread1

Spring Cloud Eureka源码分析之心跳续约及自我保护机制

       Eureka Server 判断服务不可用的机制是基于心跳续约的健康检查。客户端每秒发起一次心跳续约请求,服务端通过该机制检测服务提供者的状态。心跳续约的周期可以调整,通过配置参数来修改。客户端的续约流程主要在 DiscoveryClient.initScheduledTasks 方法中实现,其中 renewalIntervalInSecs=s,即默认周期为秒。续约线程 HeartbeatThread 调用 renew() 方法,将请求发送到 Eureka Server 的 "apps/" + appName + '/' + id 地址,以更新服务端的最后一次心跳时间。

       服务端在收到心跳请求时,调用 InstanceResource 类的 renewLease 方法进行续约处理。续约实现主要涉及两个步骤:从应用对应的实例列表中获取实例信息,然后调用 Lease.renew() 方法进行续约。续约过程更新了服务端记录的服务实例的最后一次心跳时间。

       Eureka 提供了一种自我保护机制,以避免因网络问题导致健康服务被误删除的情况。该机制在服务端收到的心跳请求低于特定比例(默认为%)时启动,以保护服务实例免于过期被剔除,保证集群的稳定和健壮性。开启自我保护机制的配置项为 eureka.server.enable-self-preservation,并默认开启。若服务客户端与注册中心之间出现网络故障,Eureka Server 会检测到低于%的正常心跳请求,进而自动进入自我保护状态。

       自我保护机制的阈值设置通过配置参数进行调整,具体计算公式为:(服务实例总数 * 0.)。例如,对于个服务实例,预期每分钟收到的续约请求数量为个。若实际收到的续约请求数量低于这个值,Eureka Server 将触发自我保护机制。此外,预期续约数量会随着服务注册和下线的变化而动态调整。当服务提供者主动下线时,需要更新客户端数量,反之则需增加。每隔分钟,自我保护阈值自动更新一次,以适应服务动态变化的场景。

       在 Eureka Server 启动时,通过 EurekaServerBootstrap 类的 contextInitialized 方法初始化 Eureka Server 的上下文,包括配置预期每分钟收到的续约客户端数量(expectedNumberOfClientsSendingRenews)。在 openForTraffic 方法中,初始化 expectedNumberOfClientsSendingRenews 和 numberOfRenewsPerMinThreshold 值,以确保自我保护机制正常运行。这些值会根据服务注册和下线情况动态调整,以维持系统的稳定性和准确性。

心脏滴血漏洞:OpenSSL中的一个漏洞如何导致安全危机

       Heartbleed(“心脏滴血”)是OpenSSL在年4月暴露的一个漏洞;它出现在数千个网络服务器上,包括那些运行像雅虎这样的主要网站的服务器。

       OpenSSL是实现传输层安全(Transport Layer Security, TLS)和安全套接字层(Secure Sockets Layer, SSL)协议的开放源代码库。该漏洞意味着恶意用户可以很容易地欺骗易受攻击的web服务器发送敏感信息,包括用户名和密码。

       TLS/SSL标准对现代网络加密至关重要,虽然漏洞是在OpenSSL的实现中,而不是标准本身,但OpenSSL被广泛使用。当漏洞被公开时,它影响了所有SSL服务器中的%并它引发了一场安全危机。

       Heartbleed的名称来自heartbeat,它是TLS/SSL协议的一个重要组件的名称。心跳是两台电脑相互通信时,即使用户此刻没有下载或上传任何东西,也能让对方知道它们仍然连接着。偶尔其中一台计算机会向另一台发送一条被加密的数据,称为心跳请求。第二台计算机将返回完全相同的加密数据,证明连接仍然存在。

       Heartbleed漏洞之所以得名,是因为攻击者可以使用心跳请求从目标服务器提取信息,也就是说,受害者通过心跳请求获取敏感数据。

       Heartbleed利用了一个重要的事实:心跳请求包含关于其自身长度的信息,但是OpenSSL库的易受攻击版本不会进行检查以确保信息的准确性,攻击者可以利用这一点欺骗目标服务器,使其允许攻击者访问其内存中应该保持私有的部分。

       “心脏滴血”是危险的,因为它让攻击者看到内存缓冲区的内容,其中可能包括敏感信息。诚然,如果您是攻击者,您无法提前知道刚刚从服务器获取的 KB内存中可能隐藏着什么,但是存在多种可能性。如果足够幸运可以得到SSL私钥,这将允许解密到服务器的安全通信,尽管几率很小,但不排除会被黑客获取。更常见的情况是,可以取回提交给服务器上运行的应用程序和服务的用户名和密码,这样你就可以登录到这些应用程序并获得用户帐户。

       Heartbleed 实际上是由两个不同的小组以非常不同的方式独立工作发现的:一次是在审查 OpenSSL 的开源代码库的过程中,一次是在对运行 OpenSSL 的服务器的一系列模拟攻击期间。这两个独立的发现发生在几周之内,但该漏洞已经潜伏了2年未被发现。

       心脏滴血漏洞的CVE编号是CVE--,CVSS3.1打分7.5,属于严重漏洞。

       “心脏滴血”(Heartbleed)漏洞在现实世界中已经被利用过,但目前尚不清楚在该漏洞被广泛公布之前是否有被利用过。早在年,安全公司就发现了一些未遂攻击在探测该漏洞。

       年4月,Codenomicon公开了这个漏洞,之后出现了一系列活动和一定程度的混乱,各公司争相更新自己的系统;例如,雅虎(Yahoo)和OKCupid的用户曾被简短地建议在OpenSSL安装补丁之前不要登录自己的账户,并在重新获得访问权限后更改密码。

       “心脏滴血”的代价超过了这些成功攻击所造成的损害;《安全杂志》估计,数千个组织需要撤销和更换他们的SSL证书的成本可能高达5亿美元。再加上检查和更新系统所需的工作时间,与这个漏洞相关的支出会大幅飙升。

       Heartbleed 是在 8 年多前被发现并修补的,然而许多服务器仍然存在 Heartbleed 漏洞。事实上,据SANS Internet Storm Center 的研究人员称,到 年 月,在线的服务器超过万。虽然从那以后这个数字可能有所下降,但几乎可以肯定仍有许多易受攻击的服务器等待被黑客攻击。

       超过六成的安全漏洞与代码有关,而静态代码分析技术可以减少-%的安全漏洞。目前,在OWASP TOP 安全漏洞中,-%的安全漏洞类型可以通过源代码静态分析技术检测出来。目前,随着恶意软件不断升级,网络攻击手段不断改进,仅通过传统防护手段如防火墙、杀毒软件等安全防御不足以全面抵抗网络攻击和恶意软件入侵。因此亟需加强软件自身安全,减少软件系统安全漏洞。通过在软件开发过程中不断检测修复代码缺陷,确保软件安全是提高网络安全性的重要手段。

YARN源码剖析:NM启动过程

       NodeManager初始化和启动过程主要涉及配置文件读取,资源信息配置,以及服务启动等步骤。重点在于初始化阶段,配置文件读取完成,包括关于节点资源信息的配置。

       启动NodeManager(NM)时,遵循与ResourceManager(RM)类似的逻辑,启动各个服务。关键在于nodeStatusUpdater模块。其中两个重要方法为registerWithRM()和startStatusUpdater()。这两个方法通过RPC远程调用ResourceManager中的两个接口:registerNodeManager()和nodeHeartbeat()。

       NM启动过程中添加的服务列表构成其核心功能描述。例如,NodeHealthCheckerService提供节点健康检查功能,包含两个子service:NodeHealthScriptRunner(使用配置的脚本进行健康检查)和LocalDirsHandlerService(检查磁盘健康状况)。此服务包含getHealthReport()方法,用于获取健康检查结果。

       NM中的关键类之一为NMContext,它作为组件间信息共享的接口。

       NM与RM之间的心跳通信是整个过程中不可或缺的部分,确保了资源管理系统的实时状态监控与资源分配协调。

       综上所述,NodeManager的启动过程涉及初始化配置、启动关键服务以及与ResourceManager的交互,实现资源管理和节点健康监控等功能。这一过程为YARN框架提供了稳定、高效的基础结构。

更多相关资讯请点击【时尚】频道>>>