本站提倡有节制游戏,合理安排游戏时间,注意劳逸结合。

【沙石镇时光源码】【俄罗斯源码论坛】【券 回收 商城 源码】nio selector源码

2025-01-01 13:01:05 来源:探索 分类:探索

1.java.net.ConnectException: 拒绝连接
2.RocketMQ第五讲
3.Netty源码探究1:事件驱动原理
4.eclipse启动tomcat时遇到的问题
5.Tars-Java网络编程源码分析

nio selector源码

java.net.ConnectException: 拒绝连接

       启动flume时

       flume-ng agent \

       --name flume-avro-agent \

       --conf $FLUME_HOME/conf \

       --conf-file $FLUME_HOME/config/flume-avro-sink.conf \

       -Dflume.root.logger=INFO,console

       -- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:)] Rpc sink avro-sink: Building RpcClient with hostname: node, port:

       -- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:)] Attempting to create Avro Rpc client.

       -- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:)] Using default maxIOWorkers

       -- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:)] Unable to deliver event. Exception follows.

       org.apache.flume.EventDeliveryException: Failed to send events

       at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:)

       at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:)

       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:)

       at java.lang.Thread.run(Thread.java:)

       Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: node, port: }: RPC connection error

       at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:)

       at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:)

       at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:)

       at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:)

       at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:)

       at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:)

       at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:)

       at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:)

       ... 3 more

       Caused by: java.io.IOException: Error connecting to node/...6:

       at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:)

       at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:)

       at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:)

       at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:)

       ... more

       Caused by: java.net.ConnectException: 拒绝连接: node/...6:

       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

       at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:)

       at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:)

       at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:)

       at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:)

       at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:)

       at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:)

       at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:)

       at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:)

       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)

       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)

       ... 1 more

RocketMQ第五讲

       broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,沙石镇时光源码把结果返回给消费者。

        消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为位,左边补零,剩余为起始偏移量,比如代表了第一个文件,起始偏移量为0,文件大小为1G=;当第一个文件写满了,第二个文件为,起始偏移量为,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

        CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

        broker启动了一个专门的线程来构建索引,把CommitLog中的俄罗斯源码论坛消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。

        也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

        引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

        其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{ topic}/{ queueId}/{ fileName}。同样consumequeue文件采取定长设计,每一个条目共个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由W个条目组成,可以像数组一样随机访问每一个条目,券 回收 商城 源码每个ConsumeQueue文件大小约5.M。

        IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: { fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为M,一个IndexFile可以保存 W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

        按照Message Key查询消息的时候,会用到这个索引文件。

        IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: { fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于+W 4+W = 个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

        其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图, Byte 的社交论坛源码phpHeader用于保存一些总的统计信息,4 W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。 W 是真正的索引数据,即一个 Index File 可以保存 W个索引。

        “按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

        RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。

        属性和方法很多,就不往这里放了。

        文件存储实现类,包括多个内部类

        · 对于文件夹下的一个文件

        上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

        RocketMQ的全局代理软件源码RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型

        上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

        上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

        AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

        FileWatchService:

        NettyEventExecutor:

        NettyNIOBoss_:一个

        NettyServerNIOSelector_:默认为三个

        NSScheduledThread:定时任务线程

        ServerHouseKeepingService:守护线程

        ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

        RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

        AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

        RocketmqBrokerAppender_inner

        RocketmqFilterAppender_inner

        RocketmqProtectionAppender_inner

        RocketmqRemotingAppender_inner

        RocketmqRebalanceLockAppender_inner

        RocketmqStoreAppender_inner

        RocketmqStoreErrorAppender_inner

        RocketmqWaterMarkAppender_inner

        RocketmqTransactionAppender_inner

        SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

        PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

        ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

        QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

        AdminBrokerThread_:remotingServer.registerDefaultProcessor

        ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

        HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

        EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

        ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

        brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

        ==================================================================

        BrokerControllerScheduledThread:=>

        BrokerController.this.getBrokerStats().record();

        BrokerController.this.consumerOffsetManager.persist();

        BrokerController.this.consumerFilterManager.persist();

        BrokerController.this.protectBroker();

        BrokerController.this.printWaterMark();

        log.info("dispatch behind commit log { } bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

        BrokerController.this.printMasterAndSlaveDiff();

        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

        BrokerFastFailureScheduledThread:=>

        FilterServerManagerScheduledThread:=>

        FilterServerManager.this.createFilterServer();

        ClientHousekeepingScheduledThread:=>

        ClientHousekeepingService.this.scanExceptionChannel();

        PullRequestHoldService

        FileWatchService

        AllocateMappedFileService

        AcceptSocketService

        BrokerStatsThread1

Netty源码探究1:事件驱动原理

       Netty源码探究1:事件驱动原理

       Netty借鉴了Reactor设计模式,这是一种事件处理模式,用于管理并发服务请求。在模式中,服务处理器对请求进行I/O多路复用,并同步分发给相应的请求处理器。Netty的核心内容是Reactor,因此深入分析其在Netty中的应用至关重要。Netty吸收了前人优秀经验,构建出这款优秀的技术框架。

       在Reactor设计模式中,Demultiplexer和Dispatcher是关键概念。Netty中的Demultiplexer是如何实现的?答案在于其Server端的架构设计。Netty通过Bootstrap(ServerBootstrap也适用)来构建Server,其中bind方法是启动Reactor运行的关键。在bind方法中,Netty创建并注册Channel到EventLoopGroup,从而实现Demultiplexer的功能。

       Netty中的Channel与JDK中的Channel有何不同?Netty通过NioServerSocketChannel构建Server,其内部封装了Java NIO的Channel,但Netty的Channel与JDK中的Channel在注册到Selector时有所不同。Netty中的Channel注册到NioEventLoop中的Selector上,只关注OP_ACCEPT事件。当客户端连接时,事件被触发,Server响应客户端连接。这涉及NioServerSocketChannel的构造过程和Selector的创建。

       Dispatcher在Java NIO中负责事件分发,Netty中如何实现这一功能?在NioEventLoop中,Selector.select()方法配合run()函数,共同实现事件监听循环。run函数中包含事件状态机和事件分派逻辑。当有事件到来时,状态机触发processSelectedKeys()方法,根据事件类型调用相应处理器进行处理。

       Netty中的事件驱动原理最终如何与自定义handler关联?在NioEventLoop的processSelectedKey()方法中,事件处理逻辑与Channel.Unsafe接口相关联。Channel.Unsafe接口用于封装Socket的最终操作,Netty通过此接口与业务层Handler建立关联。通过调用handler的read方法,Netty将事件与业务处理逻辑关联起来。

       总之,Netty通过Reactor设计模式实现了事件驱动原理,借助Demultiplexer和Dispatcher的机制,实现了对并发请求的高效处理。理解Netty的源码结构和事件驱动原理,对于深入掌握Netty技术框架至关重要。

eclipse启动tomcat时遇到的问题

       eclipse启动tomcat时遇问题及解决办法:

       端口被占用---杀掉占用该端口的进程或修改tomcat的端口;

       启动时间不够用--修改默认的s;

       缺少引用--先remove了该lib,然后再重新添加。

       一. 端口被占用: tomcat默认端口是,如果有其他进程占用了该端口就会报如上错误;

       解决方法有两个:1.杀掉占用该端口的进程:cmd命令中输入:netstat -ano|findstr ,接着输入:tasklist|findstr 杀死进程;

       2. 修改tomcat的端口:进入E:\apache-tomcat-5.5.\conf ,找到server.xml 并用编辑器打开,找到端口port=“”,修改为其他数字即可。

       二.启动时间不够用:双击tomcat,进入属性页,修改timeout默认值,改为或者更大。

       三.缺少引用:用eclipse导入的项目或者从SVN检出的项目,有时候会出这样的错误,就是各种找不到,而且是大面积的缺少。

       解决方法:在项目上右击-->build path --> configure build path 然后发现:引用的library没有绑定。

       解决方法:

       先remove了该lib,然后再重新添加:项目上右键-->build path --> add libraries ;

       然后根据移除的lib类型,进行添加相应的lib;

       如上是移除了server lib 然后就添加 server runtime;

       next>:

       即可解决。

Tars-Java网络编程源码分析

       Tars框架基本介绍

       Tars是腾讯开源的高性能RPC框架,支持多种语言,包括C++、Java、PHP、Nodejs、Go等。它提供了一整套解决方案,帮助开发者快速构建稳定可靠的分布式应用,并实现服务治理。

       Tars部署服务节点超过一千个,经过线上每日一百多亿消息推送量的考验。文章将从Java NIO网络编程原理和Tars使用NIO进行网络编程的细节两方面进行深入探讨。

       Java NIO原理介绍

       Java NIO提供了新的IO处理方式,它是面向缓冲区而不是字节流,且是非阻塞的,支持IO多路复用。

       Channel类型包括SocketChannel和ServerSocketChannel。ServerSocketChannel接受新连接,accept()方法会返回新连接的SocketChannel。Buffer类型用于数据读写,分配、读写、操作等。

       Selector用于监听多个通道的事件,单个线程可以监听多个数据通道。

       Tars NIO网络编程

       Tars采用多reactor多线程模型,核心类之间的关系明确。Java NIO服务端开发流程包括创建ServerSocketChannel、Selector、注册事件、循环处理IO事件等。

       Tars客户端发起请求流程包括创建通信器、工厂方法创建代理、初始化ServantClient、获取SelectorManager等。

       Tars服务端启动步骤包括初始化selectorManager、开启监听的ServerSocketChannel、选择reactor线程处理事件等。

       Reactor线程启动流程涉及多路复用器轮询检查事件、处理注册队列、获取已选键集中就绪的channel、更新Session、分发IO事件处理、处理注销队列等。

       IO事件分发处理涉及TCP和UDPAccepter处理不同事件,以及session中网络读写的详细处理过程。

       总结

       文章详细介绍了Java NIO编程原理和Tars-Java 1.7.2版本网络编程模块源码实现。最新的Tars-Java master分支已将网络编程改用Netty,学习NIO原理对掌握网络编程至关重要。

       了解更多关于Tars框架的介绍,请访问tarscloud.org。本文源码分析地址在github.com/TarsCloud/Ta...

相关推荐
一周热点