Massive log collection Flume(HA)

1. Introduce :

Flume yes Cloudera Provides a highly available , Highly reliable , Distributed massive log collection 、 Converged and transported systems ,Flume Support for customizing various data senders in the logging system , To collect data ; meanwhile ,Flume Provides simple processing of data , And write to the various data recipients ( Customizable ) The ability of .

2. Log collection

Flume— To which ip   Which port to monitor  --- Data monitoring — receive data ---- Memory — Store local hard disk

3. Data processing

Flume Provides simple processing of data , And write to the various data recipients ( Customizable ) The ability of . Flume Provided from Console( Console )、RPC(Thrift-RPC)、Text( file )、Tail(UNIX tail)、Syslog(Syslog Log system , Support TCP and UDP etc. 2 Patterns ),exec( Command execution ) And so on .

4.Flume principle :

Flume OG:

Flume Logically, there are three layers of architecture :Agent,Collector,Storage. Adopt many Master, To keep the data consistent , Use zookeeper, Keep data highly available and consistent .

characteristic :

·  3 A character : Proxy node (agent), Collection node (collector), Master node (master).

·   gent Collect log data from various data sources , Gather the collected data into Collector, Then, the collection node collects the data and stores it into HDFS.master Responsible for managing the agent,collector The activities of .
            ·  agent、collector from source、sink form , Represents that the data in the current node is from source Transferred to the sink.

Flume NG

            
            NG Only one node : Proxy node (agent)Flume NG Of  agent from source、sink、Channel  form . The following relationship
       
                        

· Source: Complete the collection of log data , Divide into transtion and event Break into Channel In .

Source type

explain

Avro Source

Support Avro agreement ( It's actually Avro RPC), Provide a Avro The interface of , Need to send to the set address and port Avro news ,Source Can receive , Such as :Log4j Appender adopt Avro Source Send the message to Agent

Thrift Source

Support Thrift agreement , Provide a Thrift Interface , similar Avro

Exec Source

Source A setup will be run at startup UNIX command ( such as cat file), The command will continue to output to the standard (stdout) Output data , The data will be packaged into Event, To deal with

JMS Source

from JMS System ( news 、 The theme ) Read data from , similar ActiveMQ

Spooling Directory Source

Listen to a directory , When new files appear in this directory , Package the contents of the document into Event, To deal with

Netcat Source

Monitoring a port , Take every text line data flowing through the port as Event Input

Sequence Generator Source

Sequence generator data source , Production sequence data

Syslog Sources

Read syslog data , produce Event, Support UDP and TCP Two protocols

HTTP Source

be based on HTTP POST or GET Data source of the way , Support JSON、BLOB Representation form

Legacy Sources

Compatible with the old Flume OG in Source(0.9.x edition )

Customize Source

The user realizes Flume Provide the interface to customize to meet the needs of Source.

· Channel: It mainly provides the function of a queue , Yes source Provide the data in the simple cache .

Channel type

explain

Memory Channel

Event Data stored in memory

JDBC Channel

Event Data is stored in persistent storage , At present Flume Channel Built in support Derby

File Channel

Event Data is stored in a disk file

Spillable Memory Channel

Event Data is stored in memory and on disk , When the memory queue is full , Will persist to disk file ( The current experimental , Production environment is not recommended )

Pseudo Transaction Channel

Test purpose

Custom Channel

Customize Channel Realization

· Sink: Take out Channel Data in , Do the corresponding storage file system , database , Or commit to a remote server .

Sink type

explain

HDFS Sink

Data writing HDFS

Logger Sink

Data is written to the log file

Avro Sink

The data is converted into Avro Event, And then send it to the configured RPC On port

Thrift Sink

The data is converted into Thrift Event, And then send it to the configured RPC On port

IRC Sink

The data is in IRC Play back on

File Roll Sink

Store data to local file system

Null Sink

Discard to all data

HBase Sink

Data writing HBase database

Morphline Solr Sink

Data sent to Solr Search server ( colony )

ElasticSearch Sink

Data sent to Elastic Search Search server ( colony )

Kite Dataset Sink

Write data to the Kite Dataset, Of an experimental nature

Custom Sink

Customize Sink Realization


One sink and channel Only one type of data log can be collected , But there can be multiple sink and channel ,source You can accept multiple types of data logs , as follows :



Flume Installation and use :

install

Run configuration :

a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

function :

stay /home/bigdata/flume1.6 Run in directory

 flume-ng agent -n a1 -c . -f ./conf/avro.conf -Dflume.root.logger=INFO,console

When flume It can be run, and we can experience the collection of different data sources (source) journal , Both in the hdfs On

source:  avro

flume-ng avro-client -c /home/bigdata/flime1.6/ -H ry-hadoop1 -p4141 -F ./avro.txt 

source:  Exec

b1.sources=r1
b1.channels=c1
b1.sinks=k1 b1.sources.r1.type=exec
b1.sources.r1.command=tail -F /home/data/avro.txt b1.channels.c1.type=memory
b1.channels.c1.capacity=1000
b1.channels.c1.transactionCapacity=100 b1.sinks.k1.type=logger b1.sources.r1.channels=c1
b1.sinks.k1.channel=c1

source:   spooldir Only the first level directory can be collected

In the data Linux Create a local folder log

agent.sources=r1
agent.channels=c1
agent.sinks=k1 agent.sources.r1.type=spooldir
agent.sources.r1.spooldir=/home/data/log
agent.sources.r1.fileHeader=true agent.channels.c1.type=memory
agent.channels.c1.capacity=1000
agent.channels.c1.transactionCapacity=100 agent.sinks.k1.type=logger agent.sources.r1.channels=c1
agent.sinks.k1.channel=c1

start-up :

flume-ng agent -n agent -c /home/bigdata/flime1.6/ -f /home/bigdata/flime1.6/conf/spoolDir.conf -Dflume.root.logger=INFO,console

source: TCP

a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

source:JSONHandler

a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


source Just talk 5 individual .

And then storage

hdfsSinK.conf

To configure :

a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0 # Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ry-hadoop1:8020/flume
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=60 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

function :

flume-ng agent -n a1 -c . -f ./conf/hdfsSink.conf -Dflume.root.logger=INFO,console

Write a shell Script , Cyclic output tcp data , And then collect it in the hdfs Kind of

#!/bin/sh
int=1
while(( $int<=500000 ))
do
echo "this is message"$int | nc ry-hadoop1 5140
echo "this is message"$int
let "int++"
done

Set a specific time to collect logs .

So there's a problem , When hadoop When data cannot be stored during maintenance , Where do our log files exist ?

Local , So let's see how local

The channel type is text

a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0 # Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/data/log/
a1.sinks.k1.sink.serializer=TEXT # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

channels The channel type is in the form of a file

a1.sources = s1
a1.channels = c1
a1.sinks = k1 # For each one of the sources, the type is defined
a1.sources.s1.type = syslogtcp
a1.sources.s1.host = localhost
a1.sources.s1.port = 5180 # Each sink's type must be defined
a1.sinks.k1.type = logger # Each channel's type is defined.
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/log/checkpoint
a1.channels.c1.dataDir = /home/data/log/data #Bind the source and sinks to channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

flume Of HA:

Flume Support Fan out Flows from one source to multiple channels . There are two patterns of Fan out, They are replication and reuse . In the case of replication , Stream events are sent to all configuration channels . In the case of reuse , Events are sent to a subset of available channels .Fan out The stream needs to specify the source and Fan out The rules of passage . In the vernacular, it means , When you collect logs, you can use a agent Keep multiple logs . Start a multi station cluster and talk about the multi station flume Connect , You can receive data from one of them for backup at the same time , This is a little bit like zookeeper.

1) Replicating Channel Selector    Multiple Channel
stay 3 Start on machine flume Of avor, And then copy it master Connection start source by :replicating Of flume

a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

stay master Start the connection :

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating # Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = slave1
a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

When you write a piece of data into the log , other 3 Every machine will respond

1) MulChnSel_a1.conf

Input data mapping matching .

a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2 # For each one of the sources, the type is defined
a1.sources.s1.type = org.apache.flume.source.http.HTTPSource
a1.sources.s1.port = 8887
a1.sources.s1.channels = c1 c2
a1.sources.s1.selector.type = multiplexing a1.sources.s1.selector.header = company
a1.sources.s1.selector.mapping.ali = c1
a1.sources.s1.selector.mapping.baidu = c2
a1.sources.s1.selector.default = c2 # Each sink's type must be defined
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 5555
a1.sinks.k1.channel = c1 a1.sinks.k2.type = avro
a1.sinks.k2.hostname = slave1
a1.sinks.k2.port = 5555
a1.sinks.k2.channel = c2 # Each channel's type is defined.
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100


3)Flume Sink Processors

failover The machine is always sent to one of them sink, When this sink When not available , Automatically send to the next sink.

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2 # This is configuration failover The key to , There needs to be one sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
# The type of processing is failover
a1.sinkgroups.g1.processor.type = failover
# priority , The larger the number, the higher the priority , Every sink You have to have different priorities
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
# Set to 10 second , Of course, it can be changed to faster or very slow according to your actual situation
a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating # Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = m1
a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = m2
a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100 stay hadoop1 establish Flume_Sink_Processors_avro.conf The configuration file
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5555 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

start-up :

flume-ng agent -c . -f /home/bigdata/flume/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

test :

And then in hadoop1 or hadoop2 On any of our machines , Tests produce log

# echo "idoall.org test1 failover" | nc localhost 5140

4) Load balancing Sink Processor

load balance type and failover The difference is ,load balance There are two configurations , One is polling , One is random . In both cases, if the chosen sink Unavailable , Will automatically try to send to the next available sink above .

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 # This is configuration Load balancing The key to , There needs to be one sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1 # Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = m1
a1.sinks.k1.port = 5555 a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = m2
a1.sinks.k2.port = 5555 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

start-up :

#flume-ng agent -c . -f /home/bigdata/flume/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console

test :

If the input is too fast, the generated log may fall on a machine

 echo "idoall.org test1" | nc localhost 5140

flume Massive logs are collected offline for storage . Different data sources , Different ways to store data ( Local and hdfs), Load balancing storage , Storage time , Setting of storage data size and so on .

Massive log collection Flume(HA) More articles about

  1. Log collection framework Flume as well as Flume Installation and deployment of ( A distribution 、 reliable 、 And high availability of massive log collection 、 Converged and transported systems )

    Flume Support a lot of source and sink type , Please refer to official documents for detailed manual , more source and sink Components http://flume.apache.org/FlumeUserGuide.html Flum ...

  2. Massive log collection system flume Structure and principle

    1.Flume Concept flume It's a distributed log collection system , Collect the data of each server and send it to the designated place . Flume yes Cloudera A highly available . Highly reliable . Distributed massive log collection . Converged and transported systems .Flum ...

  3. flume Log collection tool

    Flume yes Cloudera Provides a highly available , Highly reliable , Distributed massive log collection . Converged and transported systems ,Flume Support for customizing various data senders in the logging system , To collect data : meanwhile ,Flume Provides simple processing of data , and ...

  4. Log collection framework Flume

    Preface In a complete big data processing system , except hdfs+mapreduce+hive Beyond the core of the analytical system , It also requires data collection . Result data export . Task scheduling and other indispensable auxiliary systems , And these assistive tools are in hadoop In the ecosystem ...

  5. Log collection framework Flume

    Log collection framework Flume 1 summary  Flume It's a distribution . reliable . And high availability of massive log collection . Converged and transported systems . Flume You can collect files ,socket Data packets and other forms of source data , It can also output the collected data to H ...

  6. be based on Flume+Kafka+ Elasticsearch+Storm Massive log real-time analysis platform ( turn )

    0 Background introduction As the number of machines increases . Various services . Expansion of various components . Incremental growth of developers , The problem of log operation and maintenance is becoming more and more acute . Usually , Logs are stored on the local machine where the service is running , Use scripts to manage , Generally, the uncompressed logs are kept for the last three days , Compression keeps the most recent 1 ...

  7. Flume The use of log collection framework

    The authors :foochane  Link to the original text :https://foochane.cn/article/2019062701.html Flume Log collection framework Install and deploy Flume Operating mechanism Collect static files to h ...

  8. 01_ Log collection framework Flume Introduction and operation mechanism

    Offline Auxiliary System Overview : 1. summary : In a complete big data processing system , except hdfs+mapreduce+hive Beyond the core of the analytical system , It also requires data collection . Result data export . Task scheduling and other indispensable auxiliary systems , And these AIDS ...

  9. 【Hadoop Offline basic summary 】 Log collection framework Flume

    Log collection framework Flume Flume Introduce summary Flume It's a distribution . reliable . And high availability of massive log collection . Converged and transported systems . It can collect files ,socket Data packets . file . Folder .kafka And so on , Again ...

Random recommendation

  1. MCMC 、 Sampling algorithm and software implementation

    One .MCMC brief introduction 1. Monte Carlo Monte Carlo Monte Carlo method (Monte Carlo) It's a random number that passes through a specific distribution ( Or pseudo random numbers ) The way to do the simulation . A typical example is Pu Feng's injection . Definite integral calculation and so on , Its basis is ...

  2. 【 primary 】css To achieve end alignment 3 Methods

    When it comes to alignment , You are familiar with it , stay word.powerpoint.outlook Wait for the interface navigation , There's actually one end aligned ( Spread alignment ) The button , I don't use much , We're more used to aligning with the left . Align center . Right align the page ...

  3. Linux to open up 1521 Ports allow network connections Oracle Listene

    symptoms :1. TCP/IP The connection is connected . It can be used ping Command test . 2. Server Oracle Listener Has been launched .  lsnrctl status  see listener state   lsnrctl s ...

  4. Eclipse Tips

      Ctrl+1 Quick fix ( The most classic shortcut , No need to say more )Ctrl+D: Delete current row Ctrl+Alt+↓ Copy the current line to the next line ( Replication increase )Ctrl+Alt+↑ Copy the current line to the previous line ( Replication increase )Alt+↓ ...

  5. Ask for advice jsp problem , Can run on a computer , Can't run on another machine ?

    package com.mvc; import java.io.IOException; import javax.servlet.RequestDispatcher; import javax.se ...

  6. UVA 11624 Fire! (bfs)

    Algorithm guide white book Find the shortest time for a person and a fire to reach each point #include<cstdio> #include<cstring> #include<queue> #incl ...

  7. JavaWeb Learning notes --Servlet Code set

    Catalog : Log in to the system and submit the form data PDFCookieURL Pass parameters URL Rewrite trace session using HttpSession Object tracking session Servlet Collaboration filter between Filter Login system <!DOCTYPE HT ...

  8. cocos2d-x Switch button class CCControlSwitch

    stay test In the project ControlExtensionText\ CCControlSwitchTest Under directory CCControlSwitchTest.cpp in , Through this example , We can also make good switches ...

  9. curl Encapsulation

    First, build a httpserver, Here we use tomcat6 For example : The process : Create a new one Servlet, And use tomcat Default port Number 8080 monitor , Finally, write a jsp To test if you can access the server 1) newly build ...

  10. pure Python Integrated image processing tools (2) Image enhancement

    < background > The script shared this time is to enhance the image , Contains color enhancement of image pixels . Brightness enhancement . Contrast enhancement . Image sharpening and other enhancement operations , Based mainly on PIL Bag lambda and ImageEnhance modular . Usage method ...