【安装源码目录】【形态图形源码】【越界源码分析】flumesource源码修改
1.Flume的码修Source,Sink,码修Channel的码修作用?你们Source是什么类型?
2.flume èªå®ä¹ hbase sink
3.Flume常用Source、Channel、码修sink组件类型选型
4.Flume面试题
5.flumeä¸çagerntå
å«äºåªä¸ä¸ªç»ä»¶
6.flume工作进程由什么组件构成
Flume的码修Source,Sink,码修安装源码目录Channel的码修作用?你们Source是什么类型?
理解Flume的架构和性能优化至关重要。Flume设计原理确保数据不丢失,码修但可能引起重复,码修这取决于Sink响应情况。码修
优化Source,码修通过增加个数或配置多个FileGroups,码修可提升数据读取能力。码修batchSize参数调整有助于提高数据传输效率。码修
Channel选择影响性能与容错性。码修memory类型性能最佳,但易丢失数据;file类型容错性更强,配置多个不同盘目录可优化性能。注意,Channel容量和事务容量需与Source和Sink的batchSize参数相协调。
Sink优化关注增加个数以提升消费能力,形态图形源码避免过度配置导致资源浪费。适当调整batchSize参数可优化数据处理速度。
flume èªå®ä¹ hbase sink
ä¸å¡éæ± flumeéè¦ä»kafkaè·åæ°æ®å¹¶åå ¥hbaseå¼å§åçæ³æ³ï¼æç §flumeçæµç¨ï¼ä¸ä¸ªsource ï¼ä¸ä¸ªchannelï¼ ä¸ä¸ªsinkï¼å 为æéè¦ä¸ä¸ªåæï¼å¦æ使ç¨å®æ¹çhbase sinké£ä¹éè¦ä¸ä¸ªsinkãèä¸éè¦èªå®ä¹ä¸ä¸ªsourceçæ¦æªå¨ï¼æ ¹æ®kafkaè·åçæ°æ®å¹é ä¸ä¸åçchannelï¼ä¸ä¸ªchannel对åºä¸ä¸ªåæï¼ç¶åé ç½®å°sinkï¼å°±å¯ä»¥ä½¿ç¨å®æ¹hbaseçsinkæå ¥æ°æ®äºã
å®ç°ï¼
1. èªå®ä¹ä¸ä¸ªæ¦æªå¨
èªå®ä¹æ¦æªå¨
å°èªå®ä¹æ¦æªå¨ææjarå ï¼æ¾å°flumeçlibç®å½ï¼æä¾èµçå ä¹éè¦å°jarå ä¸å¹¶æ¾å ¥ï¼ä¸ç¶ä¼æ¥æ¾ä¸å°å å¼å¸¸
conf/flume-diysource.conf é 置信æ¯
ä¸ååå¤å°±ç»ªflumeå¯å¨å½ä»¤
æ§å¶å°æå°ä¿¡æ¯æ²¡æ¥ä»ä¹é误
æ¥çhbaseï¼hbaseçåææ¯å对äºï¼ä½æ¯ä»ææ´ä¸ä¸ªkafka读åçæ°æ®å½åä¸ä¸ªvalueåå ¥ä¸åï¼èä¸ååæ¯é»è®¤çï¼å¹¶ä¸æ¯ææ³è¦çã
äºæ¯..............diyå¼å§äº
å½æå¨çflumeçæ¶åçå°å ³äºkafka channelæ¯ è¿æ ·åç
æè·¯ï¼æ ¹æ® 3 æ们å¯ä»¥ä¸éè¦ååä¸ä¸ªsourceï¼ç´æ¥channel å° sink ä¸æ¸å°åºï¼åªéè¦å¨sinkä¸è¿è¡habseçç¸å ³æä½
ç´æ¥èªå®ä¹ sink
ä¾èµä¿¡æ¯
èªå®ä¹ç±» MyHbaseSink
èªå®ä¹å®æ¯ï¼å¼å§é ç½®æ件 ï¼è¿ä¸ªé 置就æ¯è¾ç®å conf/flume-diysource.conf æ件
ä¸ååå¤å¥½äºï¼å°èªå®ä¹çsinkææjarå ï¼æ¾å°flumeï¼ç´æ¥è¿è¡
åå°è¿è¡
å®ç¾è¿è¡ï¼habseå¨åå¦å¦çåå ¥ï¼
æ»ç»ï¼ ä¸å¡éæ±æ¯å°kafkaçæ°æ®åå ¥å°hbaseï¼å¼å§æ¯æ³ç¨å®æ¹çsinkï¼ç»ææ¯æ太天çäºï¼å®æ¹çhbase sinkçrowKey并ä¸æ»¡è¶³ä¸å¡éæ±ï¼èä¸kafkaçæ°æ®å段æ¯ä¸ç¡®å®çï¼æäºå天ï¼ç½å¿æ´»ãåç°èªå·±å®ä¹çæ¯è¾ç¬¦åä¸å¡éæ±ã
ä½æ¯ï¼èªå®ä¹çsinkä¹æ¯æ¯è¾åçï¼å¼å§èªå®sinkï¼ææå¤çeventçé»è¾å ¨é½æ¾å¨process() æ¹æ³éï¼ç»æå¾æ½é£ï¼æ ¹æ¬å°±æ²¡ææ§è¡å°æçé»è¾éï¼ç¶åæå¨å®ç½process()æ¹æ³çä¸çå°è¿ä¹ä¸å¥è¯: Send the Event to the external repository.
大æ¦æææ¯è®©ææeventåéå°å¤é¨åºï¼äºæ¯ææeventå¤çç¬ç«åºprocess()ãç»æamazingï¼æååå ¥hbaseäºï¼æç¶è¿æ°ä¸éï¼
Flume常用Source、Channel、sink组件类型选型
在Flume中,常用组件的选型与应用对数据收集与传输至关重要。以下将分别介绍Source、Channel、sink组件的常用类型与使用方法。 Source组件 1. netcat 类型:用于监听指定端口,收集端口数据,适用于实时监控与数据接收。 例如:检测端口是否被占用,使用命令 `netstat -nlp | grep 端口号` 打印到控制台。 2. Exec 类型:可以将命令的输出作为数据源,适用于监控命令行输出结果。 3. spooldir 类型:监控目录下的文件,实时读取目录文件到HDFS,适用于实时追踪文件变化。 4. taildir 类型:监控文件内容,越界源码分析适合于监听实时追加的文件。与Spooldir相比,Taildir支持断点续传。 5. Kafka:支持从Kafka主题中读取数据,适用于大规模数据流处理。支持多种版本,最新测试支持到2.0.1。 6. Avro:结合Avro sink使用,用于数据序列化与传输,适用于复杂数据结构的处理。 Channel组件 1. Memory:基于内存存储事件,传输速度快,适用于数据量较小或允许数据丢失的场景。 例如:在监控文件变动的场景中,Memory Channel用于实时传输数据。 2. File:事件保存在本地文件中,数据恢复性高,但传输速度相对较慢。 3. JDBC:事件保存在关系型数据库中,适用于需要持久化存储的驱动公式源码数据。 sink组件 1. HDFS:将事件写入Hadoop分布式文件系统,支持文本和序列文件,适用于大数据存储与处理。 例如:实时监控文件变动,数据被直接写入HDFS。 2. Avro:用于多Agent级联场景,如两个Agent串联或多个Agent多路复用数据传输。 3. Hive:将事件直接传输到Hive表或分区,适用于实时查询与数据处理。 4. Logger:用于测试或日志输出,提供事件记录。 5. FailoverSinkProcessor:实现故障转移功能,确保数据传输的可靠性。 在实际应用中,选择组件时需考虑数据量、实时性、持久化与可靠性等因素。例如,使用Memory Channel与Avro sink在数据量较小且允许数据丢失的场景下,实现高效的成长指标源码数据收集与传输。Flume面试题
Flume架构原理确保数据不会丢失,内部有完善的事务机制。数据从Source到Channel,以及从Channel到Sink均是事务性的,因此在正常运行时不会出现数据丢失情况。唯一可能丢失数据的是当使用memoryChannel时,若agent宕机导致数据丢失,或是Channel存储满导致未写入数据丢失。
Flume和Kafka在数据采集层各有优势。Flume是一个管道流方式的工具,提供了丰富的默认实现和扩展API,主要用于往HDFS或HBase发送数据。Kafka则是一个分布式的消息队列,具有通用性,支持多个生产者和消费者共享多个主题。Kafka在多个系统间共享数据时更优,而Flume专为Hadoop设计,内置多种source和sink组件,支持实时数据处理和拦截器。Flume在数据流处理上表现良好,Kafka则需要配合流处理系统使用。如果数据最终用于Hadoop,则Flume更为合适,但Kafka也支持与Flume结合使用。
Flume与Kafka的结合使用可以实现数据的高可用性。Kafka提供容错机制,确保零数据丢失,但不支持副本事件。Flume的宕机数据丢失问题可以通过集群或主备模式解决。Flume采集日志通过流式直接传输到存储层,而Kafka则缓存数据在集群中,后端采集存储。若Flume采集中断,可以采用文件系统记录日志,而Kafka则使用offset记录。
Flume组件包括source、channel和sink。source负责采集数据,将数据流传输到channel;channel作为桥梁,类似于队列,连接source和sink;sink从channel收集数据,并将数据写入目标源,如HDFS、HBase等。使用Flume的主要原因在于其高效的数据采集能力,支持多种数据源,如web服务器日志等。
Flume组成架构包括source、channel和sink,以及内部事务机制。source消耗外部数据源的事件,channel作为数据缓冲区连接source和sink,sink则持续轮询channel中的事件并批量写入存储或索引系统。Flume自带内存和文件channel,其中内存channel不适用于关注数据丢失的场景。若需要关心数据完整性,应使用文件channel。其他channel如JDBC通道等也存在。sink组件目标包括HDFS、logger、avro、thrift等,实现数据的最终存储或发送。
Flume的事务机制与数据库类似,确保数据流的完整性和一致性。事务机制在source到channel及channel到sink的事件传递过程中分别启动,确保数据的正确处理和存储。spooling directory source会为文件的每一行创建事件,确保事务中所有事件的完整传递。事务处理流程包括数据的创建、提交或回滚,以确保数据的一致性和完整性。所有的事件都会保持在channel中,以便在发生异常时进行重试或回滚操作。
flumeä¸çagerntå å«äºåªä¸ä¸ªç»ä»¶
Agentä¸å å«äºä¸ä¸ªéè¦çç»ä»¶ï¼Sourceï¼Channelï¼Sinkã
Sourceæ¯ä»å ¶ä»ç产æ°æ®çåºç¨ä¸æ¥åæ°æ®çç»ä»¶ãSourceå¯ä»¥çå¬ä¸ä¸ªæè å¤ä¸ªç½ç»ç«¯å£ï¼ç¨äºæ¥åæ°æ®æè ä»æ¬å°æ件系ç»ä¸è¯»åæ°æ®ï¼æ¯ä¸ªSourceå¿ é¡»è³å°è¿æ¥ä¸ä¸ªChannelãå½ç¶ä¸ä¸ªSourceä¹å¯ä»¥è¿æ¥å¤ä¸ªChannnelï¼è¿åå³äºç³»ç»è®¾è®¡çéè¦ãChannel主è¦æ¯ç¨æ¥ç¼å²Agent以åæ¥åï¼ä½å°æªååºå°å¦å¤ä¸ä¸ªAgentæè åå¨ç³»ç»çæ°æ®ãChannelçè¡ä¸ºæ¯è¾åéåï¼Sourceåå ¥å°ä»ä»¬ï¼Sinkä»ä»ä»¬ä¸è¯»åæ°æ®ãå¤ä¸ªSourceå¯ä»¥å®å ¨çåå ¥å°åä¸Channelä¸ï¼å¹¶ä¸å¤ä¸ªSinkå¯ä»¥ä»åä¸ä¸ªChannelä¸è¯»åæ°æ®ãå¯æ¯ä¸ä¸ªSinkåªè½ä»ä¸ä¸ªChannel读åæ°æ®ï¼å¦æå¤ä¸ªSinkä»ç¸åçChannelä¸è¯»åæ°æ®ï¼ç³»ç»å¯ä»¥ä¿è¯åªæä¸ä¸ªSinkä¼ä»Channel读åä¸ä¸ªç¹å®çäºä»¶ãSinkä¼è¿ç»è½®è®åèªçChannelæ¥è¯»ååå é¤äºä»¶ãSinkå°äºä»¶æ¨éå°ä¸ä¸é¶æ®µï¼RPCSinkçæ åµä¸ï¼ï¼æè å°è¾¾æç»ç®çå°ãä¸æ¦å¨ä¸ä¸é¶æ®µæè å ¶ç®çå°ä¸æ°æ®æ¯å®å ¨çï¼Sinkéè¿äºå¡æ交éç¥Channelï¼å¯ä»¥ä»Channelä¸å é¤è¿ä¸äºä»¶ã
Agentç»ä»¶å®è´¨ä¸æ¯Multi-Clientç»ä»¶ãå ä¸ºå ¶ä¸Serverç»ä»¶éç¨ç¸åçææ¯æ¶æãä¸ä¸ªAgentç»ä»¶å¯¹è±¡å¯åæ¶å»ºç«åé«æå¤ç大è§æ¨¡Socketè¿æ¥ãæ以ï¼Agentç»ä»¶æ¬è´¨ä¸æ¯Clientç»ä»¶ãä¸ä¸ªAgent对象è½åæ¶ç®¡çå¤ä¸ªå®¢æ·ç«¯è¿æ¥ã
flume工作进程由什么组件构成
Flume工作进程主要由以下组件构成: 1. 源(Source)组件 Flume的源组件负责接收各种类型的数据,是数据进入Flume的第一个组件。常见的源包括Avro、Kafka、Twitter等,它们可以从不同的应用场景或系统中捕获数据。这些源能够将数据发送至Flume进行后续的传输和处理。 2. 通道(Channel)组件 通道是Flume中用于缓存数据的一个组件,它充当数据的临时存储库。当源组件接收到数据时,这些数据首先存储在通道中。通道的设计是为了实现数据的缓冲功能,确保数据的传输不会因为速度差异或其他因素而中断。Flume支持多种类型的通道,如内存通道、文件通道等。 3. 目的地(Destination)组件 目的地是Flume中数据的最终去处。当数据从通道中被取出时,目的地组件负责将这些数据发送到指定的目标,如Hadoop、HDFS、数据库或其他系统。目的地组件确保数据能够准确地送达其目的地并进行相应的处理。 详细解释: Flume作为一个分布式的数据收集、聚合和传输系统,其核心工作原理是通过上述三个组件实现的。源组件负责从外部系统捕获数据,这些数据被收集后存储在通道中,形成一个临时的数据存储队列。随后,目的地组件从通道中取出数据并将其发送到最终的目标系统。在这个过程中,通道起到了缓冲的作用,确保数据的传输不会因为各种原因而中断。这三个组件协同工作,使得Flume能够有效地在分布式系统中进行数据的传输和处理。通过配置不同的源、通道和目的地,Flume可以灵活地适应不同的应用场景和需求。flume çsource ãchannelåsink å¤ç§ç»å
flume æä¸å¤§ç»ä»¶source ãchannelåsinkï¼å个ç»ä»¶ä¹é´é½å¯ä»¥ç¸äºç»å使ç¨ï¼åç»ä»¶é´è¦å度ä½ã使ç¨çµæ´»ï¼æ¹ä¾¿ã
1.å¤sink
channel çå 容åªè¾åºä¸æ¬¡ï¼åä¸ä¸ªevent å¦æsink1 è¾åºï¼sink2 ä¸è¾åºï¼å¦æsink1 è¾åºï¼sink1 ä¸è¾åºã æç» sink1+sink2=channel ä¸çæ°æ®ã
é ç½®æ件å¦ä¸ï¼
a1.sources=r1a1.sinks= k1 k2a1.channels= c1# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# channela1.channels.c1.type= memorya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:a1.sinks.k1.kafka.flumeBatchSize=a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c1#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp
2.å¤ channel å¤sink ï¼æ¯ä¸ªsink è¾åºå 容ä¸è´
ï¼memory channel ç¨äºkafkaæä½ï¼å®æ¶æ§é«ï¼file channel ç¨äº sink file æ°æ®å®å ¨æ§é«ï¼
ï¼å¤channel å sink çæ åµæ²¡æ举ä¾ï¼ä¸ªäººæè§ç¨å¤ä¸å¹¿æ³ãï¼
é ç½®æ件å¦ä¸ï¼
a1.sources=r1a1.sinks= k1 k2a1.channels= c1 c2# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1 c2a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log#å¤ä¸ªchannel çæ°æ®ç¸åa1.sources.r1.selector.type=replicating# channel1a1.channels.c1.type= memorya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=#channel2a1.channels.c2.type= filea1.channels.c2.checkpointDir= /opt/apps/flume-1.7.0/checkpointa1.channels.c2.dataDirs= /opt/apps/flume-1.7.0/data#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:a1.sinks.k1.kafka.flumeBatchSize=a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c2#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp
3. å¤source å channel å sink
å¤ä¸ªsource å¯ä»¥è¯»åå¤ç§ä¿¡æ¯æ¾å¨ä¸ä¸ªchannel ç¶åè¾åºå°åä¸ä¸ªå°æ¹
é ç½®æ件å¦ä¸ï¼
a1.sources=r1r2a1.sinks= k1a1.channels= c1# source1a1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# source2a1.sources.r2.type= execa1.sources.r2.shell= /bin/bash -ca1.sources.r2.channels= c1a1.sources.r2.command= tail -F /opt/apps/logs/tail2.log# channel1 in memorya1.channels.c1.type= memorya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:a1.sinks.k1.kafka.flumeBatchSize=a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy
flume åä¹é«ç§¯æ¨ä¸æ ·å¯ä»¥èªå·±éå¿æ欲å°ä¸åçç»ä»¶è¿è¡æé 使ç¨ï¼è¦å度ä½ã
Source
rpcè¿ç¨è¿ç¨è°ç¨åè®®ï¼å®¢æ·æºä¸æå¡æºçè°ç¨æ¨¡å¼éè¦å¯¹æ°æ®è¿è¡åºååã
1ï¼å®¢æ·æºå°åæ°åºåå并以äºè¿å¶å½¢å¼éè¿ç½ç»ä¼ è¾å°æå¡å¨ã
2ï¼æå¡å¨æ¥æ¶å°åè¿è¡ååºåååè°ç¨æ¹æ³è·åè¿åå¼ã
3ï¼æå¡å¨å°è¿åå¼åºååååéè¿ç½ç»ä¼ è¾ç»å®¢æ·æºã
4ï¼å®¢æ·æºæ¥æ¶å°ç»æååè¿è¡ååºååè·åç»æã
Avro sourceï¼
Avroå°±æ¯ä¸ç§åºååå½¢å¼ï¼avrosourceçå¬ä¸ä¸ªç«¯å£åªæ¥æ¶avroåºåååçæ°æ®ï¼å ¶ä»ç±»åçä¸æ¥æ¶ã
typeï¼avrosourceçç±»åï¼å¿ é¡»æ¯avroã
bindï¼è¦çå¬ç(æ¬æºç)主æºåæè ipãæ¤çå¬ä¸æ¯è¿æ»¤åéæ¹ãä¸å°çµèä¸æ¯è¯´åªæä¸ä¸ªIPãæå¤ç½å¡ççµèï¼å¯¹åºå¤ä¸ªIPã
portï¼ç»å®çæ¬å°ç端å£ã
Thrif sourceï¼
åavroä¸æ ·æ¯ä¸ç§æ°æ®åºååå½¢å¼ï¼Thrifsourceåªééthriftæ°æ®åºåååçæ°æ®
Exec sourceï¼
éélinuxå½ä»¤çè¿åç»æä¼ è¾ç»channel
type:sourceçç±»åï¼å¿ é¡»æ¯execã
commandï¼è¦æ§è¡å½ä»¤ã
tail âf è¥æ件被å é¤å³ä½¿éæ°å建ååæ件ä¹ä¸ä¼çå¬
tail -F åªè¦æ件ååå°±å¯ä»¥ç»§ç»çå¬
以ä¸å¯ä»¥ç¨å¨æ¥å¿æ件åå²æ¶ççå¬
JMS Sourceï¼
Javaæ¶æ¯æå¡æ°æ®æºï¼Javaæ¶æ¯æå¡æ¯ä¸ä¸ªä¸å ·ä½å¹³å°æ å ³çAPIï¼è¿æ¯æ¯æjmsè§èçæ°æ®æºééï¼
Spooling Directory Sourceï¼éè¿æ件夹éçæ°å¢çæ件ä½ä¸ºæ°æ®æºçééï¼
Kafka Sourceï¼ä»kafkaæå¡ä¸ééæ°æ®ã
NetCat Sourceï¼ç»å®ç端å£ï¼tcpãudpï¼ï¼å°æµç»ç«¯å£çæ¯ä¸ä¸ªææ¬è¡æ°æ®ä½ä¸ºEventè¾å ¥
typeï¼sourceçç±»åï¼å¿ é¡»æ¯netcatã
bindï¼è¦çå¬ç(æ¬æºç)主æºåæè ipãæ¤çå¬ä¸æ¯è¿æ»¤åéæ¹ãä¸å°çµèä¸æ¯è¯´åªæä¸ä¸ªIPãæå¤ç½å¡ççµèï¼å¯¹åºå¤ä¸ªIPã
portï¼ç»å®çæ¬å°ç端å£ã
HTTP Sourceï¼çå¬HTTP POSTå GET产ççæ°æ®çéé
Chanel
æ¯ä¸ä¸ªæ°æ®åå¨æ± ï¼ä¸é´ééï¼ä»sourceä¸æ¥æ¶æ°æ®ååsinkç®çå°ä¼ è¾ï¼å¦æsinkåå ¥å¤±è´¥ä¼èªå¨éåå æ¤ä¸ä¼é ææ°æ®ä¸¢å¤±ã
Memoryï¼ç¨å ååå¨ï¼ä½æå¡å¨å®æºä¼ä¸¢å¤±æ°æ®ã
Typechannelçç±»åï¼å¿ 须为memory
capacityï¼channelä¸çæ大eventæ°ç®
transactionCapacityï¼channelä¸å 许äºå¡çæ大eventæ°ç®
Fileï¼ä½¿ç¨æ件åå¨æ°æ®ä¸ä¼ä¸¢å¤±æ°æ®ä½ä¼èè´¹ioã
Typechannelçç±»åï¼å¿ 须为 file
checkpointDir ï¼æ£æ¥ç¹çæ°æ®åå¨ç®å½
dataDirs ï¼æ°æ®çåå¨ç®å½
transactionCapacityï¼channelä¸å 许äºå¡çæ大eventæ°ç®
SpillableMemory Channelï¼å åæ件综å使ç¨ï¼å åå ¥å åè¾¾å°éå¼åflushå°æ件ä¸ã
Typechannelçç±»åï¼å¿ 须为SPILLABLEMEMORY
memoryCapacityï¼å åç容éeventæ°
overflowCapacityï¼æ°æ®åå°æ件çeventéå¼æ°
checkpointDirï¼æ£æ¥ç¹çæ°æ®åå¨ç®å½
dataDirsï¼æ°æ®çåå¨ç®å½
Jdbcï¼ä½¿ç¨jdbcæ°æ®æºæ¥åå¨æ°æ®ã
Kafkaï¼ä½¿ç¨kafkaæå¡æ¥åå¨æ°æ®ã
Sink
åç§ç±»åçç®çå°ï¼æ¥æ¶channelåå ¥çæ°æ®å¹¶ä»¥æå®çå½¢å¼è¡¨ç°åºæ¥ãSinkæå¾å¤ç§ç±»åã
typeï¼sinkçç±»å å¿ é¡»æ¯hdfsã
hdfs.pathï¼hdfsçä¸ä¼ è·¯å¾ã
hdfs.filePrefixï¼hdfsæ件çåç¼ãé»è®¤æ¯:FlumeData
hdfs.rollInterval:é´éå¤ä¹ 产çæ°æ件ï¼é»è®¤æ¯:ï¼ç§ï¼ 0表示ä¸ä»¥æ¶é´é´é为åã
hdfs.rollSizeï¼æ件å°è¾¾å¤å¤§å产çä¸ä¸ªæ°æ件ï¼é»è®¤æ¯:ï¼bytesï¼0表示ä¸ä»¥æ件大å°ä¸ºåã
hdfs.rollCountï¼eventè¾¾å°å¤å¤§å产çä¸ä¸ªæ°æ件ï¼é»è®¤æ¯:ï¼ä¸ªï¼0表示ä¸ä»¥eventæ°ç®ä¸ºåã
hdfs.batchSizeï¼æ¯æ¬¡å¾hdfséæ交å¤å°ä¸ªeventï¼é»è®¤ä¸º
hdfs.fileTypeï¼hdfsæ件çæ ¼å¼ä¸»è¦å æ¬ï¼SequenceFile,DataStream ,CompressedStreamï¼å¦æ使ç¨äºCompressedStreamå°±è¦è®¾ç½®å缩æ¹å¼ã
hdfs.codeCï¼å缩æ¹å¼ï¼gzip,bzip2, lzo, lzop, snappy
注ï¼%{ host}å¯ä»¥ä½¿ç¨headerçkeyã以å%Y%m%dæ¥è¡¨ç¤ºæ¶é´ï¼ä½å ³äºæ¶é´ç表示éè¦å¨headeréætimestampè¿ä¸ªkeyã
Logger Sinkå°æ°æ®ä½ä¸ºæ¥å¿å¤çï¼æ ¹æ®flumeä¸ç设置çæ¥å¿æ¹å¼æ¥æ¾ç¤ºï¼
è¦å¨æ§å¶å°æ¾ç¤ºå¨è¿è¡agentçæ¶åå å ¥ï¼-Dflume.root.logger=INFO,consoleã
typeï¼sinkçç±»åï¼å¿ é¡»æ¯loggerã
maxBytesToLogï¼æå°bodyçæé¿çåèæ° é»è®¤ä¸º
Avro Sinkï¼æ°æ®è¢«è½¬æ¢æAvro Eventï¼ç¶ååéå°æå®çæå¡ç«¯å£ä¸ã
typeï¼sinkçç±»åï¼å¿ é¡»æ¯ avroã
hostnameï¼æå®åéæ°æ®ç主æºåæè ip
portï¼æå®åéæ°æ®ç端å£
å®ä¾
1ï¼çå¬ä¸ä¸ªæ件çå¢å ååï¼ééæ°æ®å¹¶å¨æ§å¶å°æå°ã
å¨è¿ä¸ªä¾åä¸æ使ç¨exec sourceï¼memory chanelï¼logger sinkãå¯ä»¥çæçagentç»æå¾
以ä¸æ¯æå建çexec_source.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F/usr/local/success.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=
a1.channels.c1.transactioncapacity=
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
æ§è¡å½ä»¤ï¼
bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &
ç¶åæ´æ¹/usr/local/success.logæ件ä¸çå 容åå¯ä»¥çå°flumeééå°äºæ件çåå并å¨æ§å¶å°ä¸æå°åºæ¥ãæ件åå§å 容helloåhow are youï¼å©ä¸çi am fineåok为æ°å¢å å 容ã
2ï¼çæ§ä¸ä¸ªæ件åå并å°å ¶åéå°å¦ä¸ä¸ªæå¡å¨ä¸ç¶åæå°
è¿ä¸ªä¾åå¯ä»¥å»ºç«å¨ä¸ä¸ä¸ªä¾åä¹ä¸ï¼ä½æ¯éè¦å¯¹flumeçç»æåä¸äºä¿®æ¹ï¼æ使ç¨avroåºååæ°æ®ååéå°æå®çæå¡å¨ä¸ã详æ çç»æå¾ã
å®é ä¸flumeå¯ä»¥è¿è¡å¤ä¸ªèç¹å ³èï¼æ¬ä¾ä¸æåªä½¿ç¨ååéæ°æ®
,ä¸é½å¿ é¡»å¯å¨agent
æå¡å¨é ç½®
以ä¸æ¯æå建çexec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F/usr/local/success.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=
a1.channels.c1.transactioncapacity=
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=...
a1.sinks.k1.port=
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
æ§è¡å½ä»¤å¯å¨agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
æå¡å¨é ç½®
æ§è¡å½ä»¤æ·è´flumeå°
scp -r apache-flume-1.7.0-bin/root@...:/usr/local/
ä¿®æ¹exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=
a1.channels.c1.type=memory
a1.channels.c1.capacity=
a1.channels.c1.transactioncapacity=
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
æ§è¡å½ä»¤å¯å¨agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
ç»æå¯ä»¥å¨æ§å¶å°ä¸çå°ä¸ä¿®æ¹success.logçååä¿¡æ¯
3ï¼avro-clientå®ä¾
æ§è¡bin/flume-ngä¼æ示æå½ä»¤å¦ä¸
help display this help text
agent run aFlume agent
avro-client run anavro Flume client
version show Flume version info
avro-clinetæ¯avro客æ·ç«¯ï¼å¯ä»¥ææ¬å°æ件以avroåºååæ¹å¼åºååååéå°æå®çæå¡å¨ç«¯å£ãæ¬ä¾å°±æ¯å°çä¸ä¸ªæ件ä¸æ¬¡æ§çåéå°ä¸å¹¶æå°ã
Agentç»æå¾å¦ä¸
å¯å¨çæ¯ä¸ä¸ªavro-clientï¼å®ä¼å»ºç«è¿æ¥ï¼åéæ°æ®ï¼æå¼è¿æ¥ï¼å®åªæ¯ä¸ä¸ªå®¢æ·ç«¯ã
å¯å¨ä¸ä¸ªavro客æ·ç«¯
bin/flume-ngavro-client --conf conf/ --host ... --port --filename/usr/local/success.log --headerFile /usr/local/kv.log
--headerFileæ¯ç¨æ¥åºåæ¯åªä¸ªæå¡å¨åéçæ°æ®ï¼kv.logä¸çå 容ä¼è¢«åéå°ï¼å¯ä»¥ä½ä¸ºæ è¯æ¥ä½¿ç¨ã
çavro_client.confå¦ä¸
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=
a1.channels.c1.type=memory
a1.channels.c1.capacity=
a1.channels.c1.transactioncapacity=
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
å¯å¨agent
bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &
æ§å¶å°æ¾ç¤ºå¦ä¸
å¯ä»¥çå°headersçå 容headers:{ hostname=...}
注æï¼
1ï¼Flumeæå¡æ²¡æstopå½ä»¤éè¦éè¿killæ¥ææè¿è¡ï¼å¯ä»¥ä½¿ç¨jps -mæ¥ç¡®è®¤æ¯é£ä¸ªagentçnumber
[root@shb conf]# jps -m
Jps -m
Application --conf-fileconf/exec_source.conf --name a1
2ï¼ä¿®æ¹flumeçé ç½®æ件åå¦avro_client.confï¼flumeä¼èªå¨éå¯
3ï¼logger sinké»è®¤åªæ¾ç¤ºä¸ªåè
4ï¼flumeæ¯ä»¥event为åä½è¿è¡æ°æ®ä¼ è¾çï¼å ¶ä¸headersæ¯ä¸ä¸ªmap容å¨map
Event: { headers:{ hostname=...}body: 1a }
5ï¼flumeæ¯æå¤èç¹å ³èä½æ¯sinkåsourceçç±»åè¦ä¸è´ï¼æ¯å¦avro-clientåéæ°æ®é£ä¹æ¥æ¶æ¹çsourceä¹å¿ é¡»æ¯avroå¦åä¼è¦åã