Flume一个数据源对应多个channel,多个sink

一、概述

1、现在有三台机器,分别是:Hadoop1,Hadoop2,Hadoop3,以Hadoop1为日志汇总

2、Hadoop1汇总的同时往多个目标进行输出



3、Flume一个数据源对应多个channel,多个sink,是在consolidation-accepter.conf文件里配置的

二、部署Flume来采集日志和汇总日志

1、在Hadoop1上运行

flume-ng agent --conf ./ -f consolidation-accepter.conf -n agent1 -Dflume.root.logger=INFO,console

其脚本(consolidation-accepter.conf)内容如下

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1 ch2
agent1.sources = source1
agent1.sinks = hdfssink1 sink2
agent1.source.source1.selector.type = replicating
agent1.source.source1.selector.optional = ch1

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 1000000
agent1.channels.ch1.keep-alive = 10

agent1.channels.ch2.type = memory
agent1.channels.ch2.capacity = 1000000
agent1.channels.ch2.transactionCapacity = 100000
agent1.channels.ch2.keep-alive = 10

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.source1.channels = ch1 ch2
agent1.sources.source1.type = avro
agent1.sources.source1.bind = con
agent1.sources.source1.port = 44444
agent1.sources.source1.threads = 5

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfssink1.channel = ch1
agent1.sinks.hdfssink1.type = hdfs
agent1.sinks.hdfssink1.hdfs.path = hdfs://mycluster/flume/%Y-%m-%d/%H%M
agent1.sinks.hdfssink1.hdfs.filePrefix = S1PA124-consolidation-accesslog-%H-%M-%S
agent1.sinks.hdfssink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfssink1.hdfs.writeFormat = Text
agent1.sinks.hdfssink1.hdfs.fileType = DataStream
agent1.sinks.hdfssink1.hdfs.rollInterval = 1800
agent1.sinks.hdfssink1.hdfs.rollSize = 5073741824
agent1.sinks.hdfssink1.hdfs.batchSize = 10000
agent1.sinks.hdfssink1.hdfs.rollCount = 0
agent1.sinks.hdfssink1.hdfs.round = true
agent1.sinks.hdfssink1.hdfs.roundValue = 60
agent1.sinks.hdfssink1.hdfs.roundUnit = minute


agent1.sinks.sink2.type = logger
agent1.sinks.sink2.sink.batchSize=10000
agent1.sinks.sink2.sink.batchTimeout=600000
agent1.sinks.sink2.sink.rollInterval = 1000
agent1.sinks.sink2.sink.directory=/root/data/flume-logs/
agent1.sinks.sink2.sink.fileName=accesslog
agent1.sinks.sink2.channel = ch2
2、分别在Hadoop2和Hadoop3运行如下命令

flume-ng agent --conf ./  --conf-file collect-send.conf --name agent1

Flume数据发送器配置文件collect-send.conf内容如下

agent2.sources = source2
agent2.sinks = sink1
agent2.channels = ch2
agent2.sources.source2.type = exec
agent2.sources.source2.command = tail -F /root/data/flume.log
agent2.sources.source2.channels = ch2

#channels configuration
agent2.channels.ch2.type = memory
agent2.channels.ch2.capacity = 10000
agent2.channels.ch2.transactionCapacity = 10000
agent2.channels.ch2.keep-alive = 3

#sinks configuration
agent2.sinks.sink1.type = avro
agent2.sinks.sink1.hostname=consolidationIpAddress
agent2.sinks.sink1.port = 44444
agent2.sinks.sink1.channel = ch2

三、总结

1、启动Flume汇总进程
  flume-ng agent --conf ./ -f consolidation-accepter.conf -n agent1 -Dflume.root.logger=INFO,console
2、启动Flume采集进程
  flume-ng agent --conf ./  --conf-file collect-send.conf --name agent1
3、配置参数说明(以下两个条件是or的关系,也就是当一个条件满足就触发)
(1)每半小时把channel里的数据冲刷到sink中去,并且另起新的文件来存储
    agent1.sinks.hdfssink1.hdfs.rollInterval = 1800
(2)当文件大小为5073741824字节时,另起新的文件来存储
    agent1.sinks.hdfssink1.hdfs.rollSize = 5073741824
 




郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。