write down,forget
  • adidaseqt
  • eqtturbored
  • eqtsupport9317
  • eqtsupport
  • 9317adidas
  • adidaseqtboost9317
  • eqtsupport93
  • 9317eqt
  • eqt support 9317 adv
  • support 9317 adv
  • eqtadv
  • eqt9317
  • eqtadv9317
  • support93
  • originalseqt
  • adidas eqt
  • eqt support 9317
  • eqt support
  • eqt adv
  • eqt 9317
  • flume搭建调试

    <Category: 日志分析> 查看评论

    flume搭建调试

    Installing CDH3
    https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation
    流水账,备忘。

    Installing CDH3 Components
    https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation#CDH3Installation-InstallingCDH3Components

    yum install

    install/Use
    ———————————
    flume
    Sqoop sqoop
    Hue hue
    Pig hadoop-pig
    Hive hadoop-hive
    HBase hadoop-hbase
    ZooKeeper hadoop-zookeeper
    Oozie server oozie
    Oozie client oozie-client
    Whirr whirr
    Snappy hadoop-0.20-native
    Mahout mahout

    flume分为:

    flume 核心
    flume.node 作为节点的服务自启动脚本
    flume.master 作为maaster的服务自启动脚本

    yum install flume*

    flume文档
    http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html

    flume总的来说,是面向流的设计,“source“和”sink”分别代表产生和消费,push、pull都支持,可以扩展支持各种数据源,及数据的处理,非常灵活。

    先停掉服务,以前台模式运行,方便查看各种输出,直观的了解一把

    启动flume

    启动之后,你可以在输入任何字符,然后会有来自flume的回显,因为我们参数指定了console,这个其实是配置flume的source为console的输入,默认sink也是console

    source为文件的情况
    flume dump ‘text(“/etc/services”)’

    tail文件末尾信息的方法
    flume dump ‘tail(“testfile”)’

    testfile可以不存在,没有问题,我们在另外的console里面创建这个文件,并添加些内容

    在flume这边,就可以实时的看到反馈

    多个文件,也是可以的

    默认情况下,tail会处理文件的每一行,并分别生成event,默认分隔符是“\n”,并且不会排除分隔符本身,如果你需要自定义分隔符(采用正则表达式),也是可以的,支持
    ”prev”:分隔符属于前一个event
    “next”:分隔符属于下一个event
    “exclude”:分隔符丢弃

    开启一个UDP服务,并监听5140端口

    flume web console
    http://10.129.8.125:35871/flumemaster.jsp

    Cloudera Manager Free Edition
    https://ccp.cloudera.com/display/express37/Cloudera+Manager+Free+Edition+Documentation

    安装之前,先禁用Selinux

    ./cloudera-manager-installer.bin
    安装失败,查看日志,发现安装包下载不下来,只能手动下载安装了。

    手动安装JDK

    http://archive.cloudera.com/cloudera-manager/redhat/5/x86_64/cloudera-manager/3/RPMS/cloudera-manager-daemons-3.7.2.143-1.noarch.rpm

    —————————-华丽的不行了的分割线—————————————–

    2台机器:125 126
    125上配置:

    启动flume各节点

    HDFS服务器设置(新配)

    hdfs://10.129.8.126/

    修改hadoop配置,使用外部ip

    设置访问权限:

    126节点,启动flume

    打开flume master
    http://10.129.8.125:35871/flumemaster.jsp

    ??
    Flume’s Tiered Event Sources

    collectorSource[(port)]
    Collector source. Listens for data from agentSinks forwarding to port port. If port is not specified, the node default collector TCP port, 35853.
    !!

    hadoop dfs -ls hdfs://10.129.8.126/flume/

    125上报错:

    vi /etc/hadoop/conf/hdfs-site.xml
    设置replica为0.也不行

    vi /etc/hadoop/conf/masters
    替换localhost为ip:10.129.8.126

    还是不行,在125上手动执行upload操作

    报一样的错误,

    在126上执行如上操作,报同样错误,MD

    看来是datanode挂了,但是服务显示启动,重启试试。

    ok了。

    如果报safemode了

    执行
    hadoop dfsadmin -safemode leave

    ok,再来一遍

    125上面;
    flume node_nowatch -n collector

    126上面:
    flume node_nowatch

    ok搞定

    新加flume node
    126上面:

    flume node_nowatch -n agentAB

    flume-master页面上面添加配置
    agentAB : text(“/var/log/dmesg”) | agentSink(“10.129.8.125”,35853);

    OK,没有问题,下面试试默认配置

    flume node_nowatch -n agentABC
    agentABC : text(“/tmp/medcl”) | agentSink(“10.129.8.125”);

    这个时候,
    node status里面
    agentABC agentABC flume-hadoop-node-1 OPENING Fri Feb 03 02:31:11 CST 2012 3 Fri Feb 03 02:32:49 CST 2012

    console端报错:

    创建文件
    echo “hello world” > /tmp/medcl

    继续失败着,不能自动恢复,只能重启node

    text sink只能执行一次,后续文件有变化,并不处理

    tail就可以实现监听

    collector每30秒写一次hadoop,hadoop文件每次新建一个

    如果是替换文件内容,不是追加,第一条记录会造成丢失,此处应该特别注意(bug?)

    再追加一条数据

    果然,数据丢了一条了。

    ok,前面提到了flume使用3种工作模式来保证数据的可靠性与可用性:
    1.End2End,2端确认,失败会自动重试(重试次数多少,重试失败之后怎样处理,还要继续研究)
    agentE2ESink[(“machine”[,port])]

    2.DiskFailover,失败写本地磁盘,周期性检查,collector可用的时候,自动重做任务。
    agentDFOSink[(“machine”[,port])]

    3.高效模式,collector失败就丢弃日志,够狠够绝
    agentBESink[(“machine”[,port])]

    前面使用到的agentSink,是第一种End2End的别名,效果和End2End一样。

    多收集器的配置

    多个collector能够提高吞吐量,因为日志收集都是平行,前面提到过,为保证可靠性,如果collector挂了,agent需要写本地磁盘,然后周期性的去重新连接collector,另外,日志收集停止了,后面的日志处理与分析也歇菜了,这个可不行的。
    多个collector就可以解决这个问题,汗!

    另外多个collector中,如果其中一个挂了,agent应该是能够自动切换的,怎么配呢?

    使用failover chains,

    如上配置,chain指定了2个,第一个collector失败了之后,自动切换使用第二个。

    自动FailoverChain,主要是通过使用特殊的source和sink名字(多master下不适用)

    source使用:
    autoCollectorSource

    sink使用:
    autoE2EChain, autoDFOChain, or autoBEChain

    配置为:
    agentA : src | autoE2EChain ;
    agentB : src | autoE2EChain ;
    agentC : src | autoE2EChain ;
    agentD : src | autoE2EChain ;
    agentE : src | autoE2EChain ;
    agentF : src | autoE2EChain ;
    collectorA : autoCollectorSource | collectorSink(“hdfs://…”, “src”);
    collectorB : autoCollectorSource | collectorSink(“hdfs://…”, “src”);
    collectorC : autoCollectorSource | collectorSink(“hdfs://…”, “src”);

    Logical Configurations
    一个physical node包含若干个logical node,logical node又分为:logical sources 和logical sinks ,使用flow来隔离nodes和分组

    logical node允许一个JVM实例包含多个logical nodes,实现在一个JVM上跑多个Source和Sink的线程。

    每个logical node的名称必须唯一,包括physical node 名称或者 host名称都不能相同

    logical定义分两步,

    1.定义node类型
    agent1 : _source_ | autoBEChain ;
    collector1 : autoCollectorSource | collectorSink(“hdfs://….”) ;

    2.mapping logical node和 physical node
    map host1 agent1
    map host2 collector1

    3.解除一个logical节点
    decommission agent1

    试试

    126上

    flume master页面
    config:

    注:主机名-ip
    cloudera-node-1:10.129.8.126
    flume-hadoop-node-1:10.129.8.125

    raw command:

    (注意空格,decommission两端不能有空格)

    或者unmap和map操作来移动logicalnode


    抓包得到请求为:
    curl -XPOST http://10.129.8.125:35871/mastersubmit.jsp -d’cmd=unmap&args=10.129.8.125+agent1′

    注:logical sources和logical sinks在多master下不适用

    通过logical source和logical sink可以在不知道具体物理节点的时候就进行流程的配置,flume有一种翻译的机制,会自动将logical节点名称替换成实际的主机名和端口
    事实上,autoSinks和auto-Chain也是这样来实现的。

    Flow 隔离,(注,多master下也不适用,悲催啊)

    假设你需要收集一个物理机的多种数据,并存放到不同的地方,一种方式是对所有的数据打上tag,通过同一个管道来传数据,然后通过后处理来分离数据

    另一种是在整个传输过程中通过将两两种数据隔离,避免后处理的产生

    Flume两种都支持,并且延时很低,通过引入flow的概念,将节点进行分组,配置方式如下:
    flume master页面:
    raw commands

    命令:config
    参数:[logincal node] [flow name] fooSrc autoBEChain

    实际例子:

    !!!!

    ————
    1.问题:
    fail( “logical node not mapped to physical node yet” )

    1.使用主机名来做map,node status显示的是什么名称,map的时候就用什么名称
    2.先map好logical node,然后再更新config配置

    正常工作的配置,

    !!!!

    多master配置
    多master之间自动同步,一个master挂了,其下node会自动转移到其他master上去。

    flume master有两种工作模式:standalone和distributed
    如何配置呢?

    一个Host则是standalone模式,多个host即distributed模式【分布式模式下,每个master的配置文件必须一样】
    另外,每个master必须要配置不同的serverid,如下:

    【数字和前面配置的服务器列表的下标保持一致即可】
    分布式环境下,至少需要3台服务器来保证允许一台失败,如果要允许同时两台挂掉,则至少需要5台服务器
    ,如果master节点存活率不能超过总数的一半,整个flume master 集群就会block住,无法读写配置信息

    flume master存放配置信息的地方叫做:configuration store,支持插拔,本身支持两种实现:
    基于内存的:MBCS和基于ZooKeeper的:ZBCS
    默认ZBCS,flume内置zookeeper,支持配置到现有的zookeeper集群去

    【value值可选:zookeeper或者memory】

    ZBCS配置

    flume.master.zk.logdir:存储配置文件信息,更新日志,失败信息等
    flume.master.zk.server.quorum.port:默认3182,zookeeper server本地监听
    flume.master.zk.server.election.port:默认3183,zookeeper server用来寻找其它节点
    flume.master.zk.client.port:默认3181,用来与zookeeper server通讯

    FlumeMaster的gossip协议支持:

    分布式模型下,flume node的配置也需要调整,从连一个改成连接多个master

    flume node通过定期与master的端口做心跳检测,一旦master 连接失败,自动随机切换到剩下的可以连上的master上去。【master节点通过配置flume.master.heartbeat.port来配置心跳端口】

    如果要使用外部的zookeeper,配置如下
    conf/flume-site.xml.

    Flume与数据源集成
    Flume强大就在于灵活,支持各种数据源,结构化的,非结构化的,半结构化等等
    三种方式:
    pushing、polling、embedding(嵌入flume组件到你的应用程序中)

    Push Sources:
    syslogTcp,syslogUdp:syslog,syslog-ng日志协议
    scribe:scribe日志系统的协议

    Polling:
    tail,mulitail:监视文件内容的追加信息
    exec:适合从现有系统抽取数据
    poller:收集来着flume node本身的信息

    Flume Event的数据模型
    6个主要的字段;
    Unix timestamp
    Nanosecond timestamp 【纳秒级别的时间戳】
    Priority
    Source host
    Body
    Metadata table with an arbitrary number of attribute value pairs.

    所有的event都有这几个字段,不过body长度可能为0,metadata表可能为空。

    priority :TRACE, DEBUG, INFO, WARN, ERROR, or FATAL,这几种
    body:raw格式,默认最大32KB,多余的截掉,通过参数flume.event.max.size.bytes来进行配置

    使用event的字段来自定义输出位置
    collectorSink(“hdfs://namenode/flume/webdata/%H00/”, “%{host}-“)
    %H 为时间timestamp字段里的小时,host为field里面的主机名

    快速参考:
    [horizontal] %{host}
    host
    %{nanos}
    nanos
    %{priority}
    priority string
    %{body}
    body
    %%
    a % character.
    %t
    Unix time in millis

    时间比较特殊,直接使用,不需要{}
    collectorSink(“hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/”, “web-“)

    快速参考:
    %a

    locale’s short weekday name (Mon, Tue, …)

    %A

    locale’s full weekday name (Monday, Tuesday, …)

    %b

    locale’s short month name (Jan, Feb,…)

    %B

    locale’s long month name (January, February,…)

    %c

    locale’s date and time (Thu Mar 3 23:05:25 2005)

    %d

    day of month (01)

    %D

    date; same as %m/%d/%y

    %H

    hour (00..23)

    %I

    hour (01..12)

    %j

    day of year (001..366)

    %k

    hour ( 0..23)

    %l

    hour ( 1..12)

    %m

    month (01..12)

    %M

    minute (00..59)

    %P

    locale’s equivalent of am or pm

    %s

    seconds since 1970-01-01 00:00:00 UTC

    %S

    second (00..60)

    %y

    last two digits of year (00..99)

    %Y

    year (2010)

    %z

    +hhmm numeric timezone (for example, -0400)

    输出文件格式

    两种方式:
    一直是在 flume-site.xml里面设置默认值,另外是由特定的sink来决定

    1.flume-site.xml
    flume.collector.output.format

    格式快速参考

    avro

    Avro Native file format. Default currently is uncompressed.

    avrodata

    Binary encoded data written in the avro binary format.

    avrojson

    JSON encoded data generated by avro.

    default

    a debugging format.

    json

    JSON encoded data.

    log4j

    a log4j pattern similar to that used by CDH output pattern.

    raw

    Event body only. This is most similar to copying a file but does not preserve any uniqifying metadata like host/timestamp/nanos.

    syslog

    a syslog like text output format.

    seqfile

    the binary hadoop Sequence file format with WritableEventKeys keys, and WritableEvent as values.

    2.分别配置

    压缩seqfile
    formatDfs(“hdfs://nn/dir/file”, seqfile(“bzip2”))

    HDFS大量小文件与高延迟的处理
    Flume两种策略来处理
    1.合并小文件到大的文件
    2.使用CombinedFileInputFormat

    seqfile和avrodata支持内部的压缩,具体再研究

    DataFlow定义语言

    Fan out,往所有sinks写:
    [ console, collectorSink ]

    Fail over,当前失败,转移到下一个,尝试候选sink:

    配置样例:

    Roll sink,每隔一段时间,关闭当前实例,创建新的实例,每次会创建新的独立的文件:
    roll(millis) sink
    配置样例:

    Sink Decorators,sink装饰器
    Fan out和Failover影响messages去哪里,但不修改数据,如果要过滤数据什么的,使用sink decorator

    sink decorator可以做很多事情,如可以给数据流添加属性,可以通过写ahead 日志来确保可靠性,或者通过批量、压缩来提供网络吞吐,抽样甚至轻量级的分析

    flumenode: source | intervalSampler(10) sink;
    flumenode: source | batch(100) sink;
    flumenode: source | batch(100) gzip sink;
    collector(15000) { escapedCustomDfs(“xxx”,”yyy-%{rolltag}”) }
    collector(15000) { [ escapedCustomDfs(“xxx”,”yyy-%{rolltag}”), hbase(“aaa”, “bbb-%{rolltag}”), elasticSearch(“eeee”,”ffff”) ] } 【同时往3个sink里面写数据,可能有些是持久化的,有些是瞬时的,都成功之后,才会确认成功】

    node1 : tail(“foo”) | ackedWriteAhead batch(100) gzip lazyOpen stubbornAppend logicalSink(“bar”);【write ahead,批量100,gzip压缩】

    Metadata支持正则来进行抽取
    支持类似select语法来筛选

    thriftSink and thriftSource

    扩展与插件
    http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_semantics_of_flume_extensions

    附录真是好啊
    http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_source_catalog

    测试syslog信息

    .NET Agent 25个线程,结果压趴下了[另外后续测试发现经常无原因socket断开,服务端socket直接挂掉,flume显示error]。
    2012-02-10 21:29:44,154 ERROR com.cloudera.flume.core.connector.DirectDriver: Exiting driver logicalNode collector2-20 in error state SyslogTcpSourceThreads | Collector because null

    syslogTcp不稳定,果断换thriftRpc作为Source,经测果然很稳定

    此异常可能是因为服务端和客户端使用了不相同的transport,如framed和buffered不匹配

    vi flume-site.xml,添加压缩和默认roll时间

    测试文件模板

    更新

    结果:

    本文来自: flume搭建调试

    
    eqt support adidas eqt support 93 primeknit og colorway ba7506 adidas eqt running 93 updated with primeknit construction adidas eqt boost 93 17 white turbo red adidas eqt support 9317 white turbo red adidas eqt support 93 17 adidas eqt support 9317 adidas eqt support 9317 turbo red releases tomorrow adidas originals adidas eqt tactile green pack adidas eqt tactile green pack adidas eqt light green pack womens adidas eqt light green pack coming soon adidas eqt milled leather pack release date adidas originals eqt milled leather pack adidas eqt support ultra boost turbo red white adidas adv support burnt orange grey where to buy the adidas eqt support 9317 turbo red adidas eqt boost 91 16 turbo red adidas eqt support 93 turbo red adidas eqt support 9317 white turbo red available now