write down,forget

flume搭建调试

<Category: 日志分析> 查看评论

flume搭建调试

Installing CDH3
https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation
流水账,备忘。

Installing CDH3 Components
https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation#CDH3Installation-InstallingCDH3Components

yum install

install/Use
———————————
flume
Sqoop sqoop
Hue hue
Pig hadoop-pig
Hive hadoop-hive
HBase hadoop-hbase
ZooKeeper hadoop-zookeeper
Oozie server oozie
Oozie client oozie-client
Whirr whirr
Snappy hadoop-0.20-native
Mahout mahout

flume分为:

flume 核心
flume.node 作为节点的服务自启动脚本
flume.master 作为maaster的服务自启动脚本

yum install flume*

flume文档
http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html

flume总的来说,是面向流的设计,“source“和”sink”分别代表产生和消费,push、pull都支持,可以扩展支持各种数据源,及数据的处理,非常灵活。

先停掉服务,以前台模式运行,方便查看各种输出,直观的了解一把

启动flume

启动之后,你可以在输入任何字符,然后会有来自flume的回显,因为我们参数指定了console,这个其实是配置flume的source为console的输入,默认sink也是console

source为文件的情况
flume dump ‘text(“/etc/services”)’

tail文件末尾信息的方法
flume dump ‘tail(“testfile”)’

testfile可以不存在,没有问题,我们在另外的console里面创建这个文件,并添加些内容

在flume这边,就可以实时的看到反馈

多个文件,也是可以的

默认情况下,tail会处理文件的每一行,并分别生成event,默认分隔符是“\n”,并且不会排除分隔符本身,如果你需要自定义分隔符(采用正则表达式),也是可以的,支持
”prev”:分隔符属于前一个event
“next”:分隔符属于下一个event
“exclude”:分隔符丢弃

开启一个UDP服务,并监听5140端口

flume web console
http://10.129.8.125:35871/flumemaster.jsp

Cloudera Manager Free Edition
https://ccp.cloudera.com/display/express37/Cloudera+Manager+Free+Edition+Documentation

安装之前,先禁用Selinux

./cloudera-manager-installer.bin
安装失败,查看日志,发现安装包下载不下来,只能手动下载安装了。

手动安装JDK

http://archive.cloudera.com/cloudera-manager/redhat/5/x86_64/cloudera-manager/3/RPMS/cloudera-manager-daemons-3.7.2.143-1.noarch.rpm

—————————-华丽的不行了的分割线—————————————–

2台机器:125 126
125上配置:

启动flume各节点

HDFS服务器设置(新配)

hdfs://10.129.8.126/

修改hadoop配置,使用外部ip

设置访问权限:

126节点,启动flume

打开flume master
http://10.129.8.125:35871/flumemaster.jsp

??
Flume’s Tiered Event Sources

collectorSource[(port)]
Collector source. Listens for data from agentSinks forwarding to port port. If port is not specified, the node default collector TCP port, 35853.
!!

hadoop dfs -ls hdfs://10.129.8.126/flume/

125上报错:

vi /etc/hadoop/conf/hdfs-site.xml
设置replica为0.也不行

vi /etc/hadoop/conf/masters
替换localhost为ip:10.129.8.126

还是不行,在125上手动执行upload操作

报一样的错误,

在126上执行如上操作,报同样错误,MD

看来是datanode挂了,但是服务显示启动,重启试试。

ok了。

如果报safemode了

执行
hadoop dfsadmin -safemode leave

ok,再来一遍

125上面;
flume node_nowatch -n collector

126上面:
flume node_nowatch

ok搞定

新加flume node
126上面:

flume node_nowatch -n agentAB

flume-master页面上面添加配置
agentAB : text(“/var/log/dmesg”) | agentSink(“10.129.8.125”,35853);

OK,没有问题,下面试试默认配置

flume node_nowatch -n agentABC
agentABC : text(“/tmp/medcl”) | agentSink(“10.129.8.125”);

这个时候,
node status里面
agentABC agentABC flume-hadoop-node-1 OPENING Fri Feb 03 02:31:11 CST 2012 3 Fri Feb 03 02:32:49 CST 2012

console端报错:

创建文件
echo “hello world” > /tmp/medcl

继续失败着,不能自动恢复,只能重启node

text sink只能执行一次,后续文件有变化,并不处理

tail就可以实现监听

collector每30秒写一次hadoop,hadoop文件每次新建一个

如果是替换文件内容,不是追加,第一条记录会造成丢失,此处应该特别注意(bug?)

再追加一条数据

果然,数据丢了一条了。

ok,前面提到了flume使用3种工作模式来保证数据的可靠性与可用性:
1.End2End,2端确认,失败会自动重试(重试次数多少,重试失败之后怎样处理,还要继续研究)
agentE2ESink[(“machine”[,port])]

2.DiskFailover,失败写本地磁盘,周期性检查,collector可用的时候,自动重做任务。
agentDFOSink[(“machine”[,port])]

3.高效模式,collector失败就丢弃日志,够狠够绝
agentBESink[(“machine”[,port])]

前面使用到的agentSink,是第一种End2End的别名,效果和End2End一样。

多收集器的配置

多个collector能够提高吞吐量,因为日志收集都是平行,前面提到过,为保证可靠性,如果collector挂了,agent需要写本地磁盘,然后周期性的去重新连接collector,另外,日志收集停止了,后面的日志处理与分析也歇菜了,这个可不行的。
多个collector就可以解决这个问题,汗!

另外多个collector中,如果其中一个挂了,agent应该是能够自动切换的,怎么配呢?

使用failover chains,

如上配置,chain指定了2个,第一个collector失败了之后,自动切换使用第二个。

自动FailoverChain,主要是通过使用特殊的source和sink名字(多master下不适用)

source使用:
autoCollectorSource

sink使用:
autoE2EChain, autoDFOChain, or autoBEChain

配置为:
agentA : src | autoE2EChain ;
agentB : src | autoE2EChain ;
agentC : src | autoE2EChain ;
agentD : src | autoE2EChain ;
agentE : src | autoE2EChain ;
agentF : src | autoE2EChain ;
collectorA : autoCollectorSource | collectorSink(“hdfs://…”, “src”);
collectorB : autoCollectorSource | collectorSink(“hdfs://…”, “src”);
collectorC : autoCollectorSource | collectorSink(“hdfs://…”, “src”);

Logical Configurations
一个physical node包含若干个logical node,logical node又分为:logical sources 和logical sinks ,使用flow来隔离nodes和分组

logical node允许一个JVM实例包含多个logical nodes,实现在一个JVM上跑多个Source和Sink的线程。

每个logical node的名称必须唯一,包括physical node 名称或者 host名称都不能相同

logical定义分两步,

1.定义node类型
agent1 : _source_ | autoBEChain ;
collector1 : autoCollectorSource | collectorSink(“hdfs://….”) ;

2.mapping logical node和 physical node
map host1 agent1
map host2 collector1

3.解除一个logical节点
decommission agent1

试试

126上

flume master页面
config:

注:主机名-ip
cloudera-node-1:10.129.8.126
flume-hadoop-node-1:10.129.8.125

raw command:

(注意空格,decommission两端不能有空格)

或者unmap和map操作来移动logicalnode


抓包得到请求为:
curl -XPOST http://10.129.8.125:35871/mastersubmit.jsp -d’cmd=unmap&args=10.129.8.125+agent1′

注:logical sources和logical sinks在多master下不适用

通过logical source和logical sink可以在不知道具体物理节点的时候就进行流程的配置,flume有一种翻译的机制,会自动将logical节点名称替换成实际的主机名和端口
事实上,autoSinks和auto-Chain也是这样来实现的。

Flow 隔离,(注,多master下也不适用,悲催啊)

假设你需要收集一个物理机的多种数据,并存放到不同的地方,一种方式是对所有的数据打上tag,通过同一个管道来传数据,然后通过后处理来分离数据

另一种是在整个传输过程中通过将两两种数据隔离,避免后处理的产生

Flume两种都支持,并且延时很低,通过引入flow的概念,将节点进行分组,配置方式如下:
flume master页面:
raw commands

命令:config
参数:[logincal node] [flow name] fooSrc autoBEChain

实际例子:

!!!!

————
1.问题:
fail( “logical node not mapped to physical node yet” )

1.使用主机名来做map,node status显示的是什么名称,map的时候就用什么名称
2.先map好logical node,然后再更新config配置

正常工作的配置,

!!!!

多master配置
多master之间自动同步,一个master挂了,其下node会自动转移到其他master上去。

flume master有两种工作模式:standalone和distributed
如何配置呢?

一个Host则是standalone模式,多个host即distributed模式【分布式模式下,每个master的配置文件必须一样】
另外,每个master必须要配置不同的serverid,如下:

【数字和前面配置的服务器列表的下标保持一致即可】
分布式环境下,至少需要3台服务器来保证允许一台失败,如果要允许同时两台挂掉,则至少需要5台服务器
,如果master节点存活率不能超过总数的一半,整个flume master 集群就会block住,无法读写配置信息

flume master存放配置信息的地方叫做:configuration store,支持插拔,本身支持两种实现:
基于内存的:MBCS和基于ZooKeeper的:ZBCS
默认ZBCS,flume内置zookeeper,支持配置到现有的zookeeper集群去

【value值可选:zookeeper或者memory】

ZBCS配置

flume.master.zk.logdir:存储配置文件信息,更新日志,失败信息等
flume.master.zk.server.quorum.port:默认3182,zookeeper server本地监听
flume.master.zk.server.election.port:默认3183,zookeeper server用来寻找其它节点
flume.master.zk.client.port:默认3181,用来与zookeeper server通讯

FlumeMaster的gossip协议支持:

分布式模型下,flume node的配置也需要调整,从连一个改成连接多个master

flume node通过定期与master的端口做心跳检测,一旦master 连接失败,自动随机切换到剩下的可以连上的master上去。【master节点通过配置flume.master.heartbeat.port来配置心跳端口】

如果要使用外部的zookeeper,配置如下
conf/flume-site.xml.

Flume与数据源集成
Flume强大就在于灵活,支持各种数据源,结构化的,非结构化的,半结构化等等
三种方式:
pushing、polling、embedding(嵌入flume组件到你的应用程序中)

Push Sources:
syslogTcp,syslogUdp:syslog,syslog-ng日志协议
scribe:scribe日志系统的协议

Polling:
tail,mulitail:监视文件内容的追加信息
exec:适合从现有系统抽取数据
poller:收集来着flume node本身的信息

Flume Event的数据模型
6个主要的字段;
Unix timestamp
Nanosecond timestamp 【纳秒级别的时间戳】
Priority
Source host
Body
Metadata table with an arbitrary number of attribute value pairs.

所有的event都有这几个字段,不过body长度可能为0,metadata表可能为空。

priority :TRACE, DEBUG, INFO, WARN, ERROR, or FATAL,这几种
body:raw格式,默认最大32KB,多余的截掉,通过参数flume.event.max.size.bytes来进行配置

使用event的字段来自定义输出位置
collectorSink(“hdfs://namenode/flume/webdata/%H00/”, “%{host}-“)
%H 为时间timestamp字段里的小时,host为field里面的主机名

快速参考:
[horizontal] %{host}
host
%{nanos}
nanos
%{priority}
priority string
%{body}
body
%%
a % character.
%t
Unix time in millis

时间比较特殊,直接使用,不需要{}
collectorSink(“hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/”, “web-“)

快速参考:
%a

locale’s short weekday name (Mon, Tue, …)

%A

locale’s full weekday name (Monday, Tuesday, …)

%b

locale’s short month name (Jan, Feb,…)

%B

locale’s long month name (January, February,…)

%c

locale’s date and time (Thu Mar 3 23:05:25 2005)

%d

day of month (01)

%D

date; same as %m/%d/%y

%H

hour (00..23)

%I

hour (01..12)

%j

day of year (001..366)

%k

hour ( 0..23)

%l

hour ( 1..12)

%m

month (01..12)

%M

minute (00..59)

%P

locale’s equivalent of am or pm

%s

seconds since 1970-01-01 00:00:00 UTC

%S

second (00..60)

%y

last two digits of year (00..99)

%Y

year (2010)

%z

+hhmm numeric timezone (for example, -0400)

输出文件格式

两种方式:
一直是在 flume-site.xml里面设置默认值,另外是由特定的sink来决定

1.flume-site.xml
flume.collector.output.format

格式快速参考

avro

Avro Native file format. Default currently is uncompressed.

avrodata

Binary encoded data written in the avro binary format.

avrojson

JSON encoded data generated by avro.

default

a debugging format.

json

JSON encoded data.

log4j

a log4j pattern similar to that used by CDH output pattern.

raw

Event body only. This is most similar to copying a file but does not preserve any uniqifying metadata like host/timestamp/nanos.

syslog

a syslog like text output format.

seqfile

the binary hadoop Sequence file format with WritableEventKeys keys, and WritableEvent as values.

2.分别配置

压缩seqfile
formatDfs(“hdfs://nn/dir/file”, seqfile(“bzip2”))

HDFS大量小文件与高延迟的处理
Flume两种策略来处理
1.合并小文件到大的文件
2.使用CombinedFileInputFormat

seqfile和avrodata支持内部的压缩,具体再研究

DataFlow定义语言

Fan out,往所有sinks写:
[ console, collectorSink ]

Fail over,当前失败,转移到下一个,尝试候选sink:

配置样例:

Roll sink,每隔一段时间,关闭当前实例,创建新的实例,每次会创建新的独立的文件:
roll(millis) sink
配置样例:

Sink Decorators,sink装饰器
Fan out和Failover影响messages去哪里,但不修改数据,如果要过滤数据什么的,使用sink decorator

sink decorator可以做很多事情,如可以给数据流添加属性,可以通过写ahead 日志来确保可靠性,或者通过批量、压缩来提供网络吞吐,抽样甚至轻量级的分析

flumenode: source | intervalSampler(10) sink;
flumenode: source | batch(100) sink;
flumenode: source | batch(100) gzip sink;
collector(15000) { escapedCustomDfs(“xxx”,”yyy-%{rolltag}”) }
collector(15000) { [ escapedCustomDfs(“xxx”,”yyy-%{rolltag}”), hbase(“aaa”, “bbb-%{rolltag}”), elasticSearch(“eeee”,”ffff”) ] } 【同时往3个sink里面写数据,可能有些是持久化的,有些是瞬时的,都成功之后,才会确认成功】

node1 : tail(“foo”) | ackedWriteAhead batch(100) gzip lazyOpen stubbornAppend logicalSink(“bar”);【write ahead,批量100,gzip压缩】

Metadata支持正则来进行抽取
支持类似select语法来筛选

thriftSink and thriftSource

扩展与插件
http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_semantics_of_flume_extensions

附录真是好啊
http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_source_catalog

测试syslog信息

.NET Agent 25个线程,结果压趴下了[另外后续测试发现经常无原因socket断开,服务端socket直接挂掉,flume显示error]。
2012-02-10 21:29:44,154 ERROR com.cloudera.flume.core.connector.DirectDriver: Exiting driver logicalNode collector2-20 in error state SyslogTcpSourceThreads | Collector because null

syslogTcp不稳定,果断换thriftRpc作为Source,经测果然很稳定

此异常可能是因为服务端和客户端使用了不相同的transport,如framed和buffered不匹配

vi flume-site.xml,添加压缩和默认roll时间

测试文件模板

更新

结果:

本文来自: flume搭建调试