Hadoop MapReduce case wordcount + statistics mobile traffic usage
Transkai 2021-07-21 04:12:41

mapreduce design idea

Concept :
It is an application framework of distributed parallel computing
It provides a relatively simple api Model , We just need to write the program according to these model rules ,
That is to say " Distributed parallel computing " The function of .

Case a :wordcount Classic case

First write map Method

package com.gec.demo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/**  effect : reflect mapreduce Of map The realization of stage
* KEYIN: Input parameters key Data type of
* VALUEIN: Input parameters value Data type of
* KEYOUT, Output key Data type of
* VALUEOUT: Output value Data type of
*
*  Input :
*      map(key,value)= Offset , Row content
*
*  Output :
*      map(key,value)= word ,1
*
*  data type :
* java data type :
* int-------------->IntWritable
* long------------->LongWritable
* String----------->Text
*  It all implements serialization
*
 * */public class WcMapTask extends Mapper{    /*
    * According to the key value pair of split input data , Call this method , How many keys are there , How many times does it trigger map Method
    *  Parameter one : Enter the key value of the data : The offset of the row
    *  Parameter two : The key of input data corresponds to value value : The offset corresponds to the line content
    * */
    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String words[]=line.split(" ");        for (String word : words) {
            context.write(new Text(word),new IntWritable(1));
        }
    }
}

The following is a reduce Method

package com.gec.demo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/**  Such kind : Handle reducer Stage
*    Sum up the number of words
* KEYIN: input data key Data type of
* VALUEIN: input data value Data type of
* KEYOUT: Output data key Data type of
* VALUEOUT: Output data value Data type of
*
*
* */public class WcReduceTask extends Reducer{    /*
    *  The first parameter : Word data
    *  The second parameter : Collection data type summary : The number of words
    *
    * */
    @Override    protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException {        int count=0;        for (IntWritable value : values) {
            count+=value.get();
        }
        context.write(key,new IntWritable(count));
    }
}

Finally, the main class

package com.gec.demo;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**  Indicates the operation of map task Class
*  Indicates the operation of reducer task class
*  Indicates the input file io Stream type
*  Indicates the output file path
*
* */public class WcMrJob
{    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);        // Set up Driver class
        job.setJarByClass(WcMrJob.class);        // Set up to run that map task
        job.setMapperClass(WcMapTask.class);        // Set up to run that reducer task
        job.setReducerClass(WcReduceTask.class);        // Set up map task Output key Data type of
        job.setMapOutputKeyClass(Text.class);        // Set up map task Output value Data type of
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);        // Specify the location of the data to be processed
        FileInputFormat.setInputPaths(job, "hdfs://hadoop-001:9000/wordcount/input/big.txt");        // Specify the location where the results will be saved after processing
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop-001:9000/wordcount/output/"));        // towards yarn The cluster submits this job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

 

  double-click package, Can generate mapreducewordcount-1.0-SNAPSHOT.jar This jar package , Will be jar The package is sent to hadoop-oo1

The instructions are as follows : hadoop jar mapreducewordcount-1.0-SNAPSHOT.jar com.gec.demo.WcMrJob

You can run word count Program

Check it out in the browser yarn

The command to view the results is as follows :

hadoop fs -cat /wordcount/output/part-r-00000

 

 

Case 2 :

The content of the file to be counted is as follows :

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com Video website 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn Information security 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com Site statistics 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com Search engine 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com Site statistics 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com Integrated portal 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com Integrated portal 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com Search engine 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com Search engine 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200

The explanation of each field is as follows , To count the uplink traffic of mobile phone number , Downstream traffic and total traffic , And the total flow = Uplink traffic + Downstream traffic

The code is as follows :

FlowBean:

package com.gec.demo.bean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
*  Serialized classes
* */
public class FlowBean implements Writable
{
    // Uplink traffic
    private long upFlow;
    // Downstream traffic
    private long downFlow;
    // Total discharge
    private long sumFlow;
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow, long sumFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }
    public void setFlowData(long upFlow, long downFlow)
    {
        this.upFlow=upFlow;
        this.downFlow=downFlow;
        sumFlow=this.upFlow+this.downFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    // Serialization     @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.getUpFlow());
        out.writeLong(this.getDownFlow());
        out.writeLong(this.getSumFlow());
    }
    // Reverse processing     @Override
    public void readFields(DataInput in) throws IOException {
        setUpFlow(in.readLong());
        setDownFlow(in.readLong());
        setSumFlow(in.readLong());
    }
    @Override
    public String toString() {
        return getUpFlow()+"\t"+getDownFlow()+"\t"+getSumFlow();
    }
}

 

Mapper:

package com.gec.demo;
import com.gec.demo.bean.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
/*<
    KEYIN
    VALUEIN
    KEYOUT
    VALUEOUT
    */
public class PhoneFlowMapper extends Mapper{
    private FlowBean flowBean=new FlowBean();
    private Text keyText=new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Get line content
        String line=value.toString();
        String []fields=StringUtils.split(line,'\t');
        // Get cell phone number
        String phoneNum=fields[1];
        // Get the upload traffic data
        long upflow=Long.parseLong(fields[fields.length-3]);
        // Get download traffic data
        long downflow=Long.parseLong(fields[fields.length-2]);
        flowBean.setFlowData(upflow,downflow);
        keyText.set(phoneNum);
        context.write(keyText,flowBean);
    }
}

Reducer:

package com.gec.demo;
import com.gec.demo.bean.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PhoneFlowReducer extends Reducer{
    private FlowBean flowBean=new FlowBean();
    /**
     *key:phonenum( Phone number )
     *values:
     */
    @Override
    protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException {
        long sumDownFlow=0;
        long sumUpFlow=0;
        // Count the total traffic consumed by each mobile phone
        for (FlowBean value : values) {
            sumUpFlow+=value.getUpFlow();
            sumDownFlow+=value.getDownFlow();
        }
        flowBean.setFlowData(sumUpFlow,sumDownFlow);
        context.write(key,flowBean);
    }
}

Driver:

package com.gec.demo;
import com.gec.demo.bean.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class PhoneFlowApp {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        job.setJarByClass(PhoneFlowApp.class);
        //job.setJar("");
        job.setMapperClass(PhoneFlowMapper.class);
        job.setReducerClass(PhoneFlowReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // Specify the location of the data to be processed
        FileInputFormat.setInputPaths(job, "hdfs://hadoop-001:9000/flowcount/input/HTTP_20130313143750.dat");
        // Specify the location where the results will be saved after processing
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop-001:9000/flowcount/output/"));
        // towards yarn The cluster submits this job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

 

The result of the generated file is as follows :

          cell-phone number             Uplink traffic       Downstream traffic         Total discharge

 

If you want to sort by the amount of total traffic , And output to six different files by phone number , There are the following codes :

FlowBean: To achieve WritableComparable Interface 
package com.gec.demo.Bean;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable
//FlowPartitioner Class inheritance Partitioner class , You can define which file the mobile phone number starts with 
package com.gec.demo.partitioner;
import com.gec.demo.Bean.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartitioner extends Partitioner
package com.gec.demo;
import com.gec.demo.Bean.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import org.mortbay.util.StringUtil;
import java.io.IOException;
public class PhoneFlowMapper extends Mapper {
    private  FlowBean flowBean=new FlowBean();
    private  Text text=new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] fields= StringUtils.split(line,'\t');
        String phoneNum=fields[1];
       long upFlow=Long.parseLong(fields[fields.length-3]);
       long downFlow=Long.parseLong(fields[fields.length-2]);
       flowBean.setFlowData(upFlow,downFlow);
       flowBean.setSumFlow(upFlow+downFlow);
       text.set(phoneNum);
       context.write(flowBean,text);
        }
    }
package com.gec.demo;
import com.gec.demo.Bean.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PhoneFlowReducer extends Reducer {
    private  FlowBean flowBean=new FlowBean();
    @Override
    protected void reduce(FlowBean key, Iterablevalues, Context context) throws IOException, InterruptedException {
        // Is there a group merge operation ?
       context.write(values.iterator().next(),key);
    }
}
package com.gec.demo;
import com.gec.demo.Bean.FlowBean;
import com.gec.demo.partitioner.FlowPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class PhoneFlowApp  {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        job.setJarByClass(PhoneFlowApp.class);
        job.setMapperClass(PhoneFlowMapper.class);
        job.setReducerClass(PhoneFlowReducer.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setPartitionerClass(FlowPartitioner.class);
        job.setNumReduceTasks(6);
        // Specify the location of the data to be processed
        FileInputFormat.setInputPaths(job, "D://Bigdata//4、mapreduce//day02//HTTP_20130313143750.dat");
        // Specify the location where the results will be saved after processing
        FileOutputFormat.setOutputPath(job, new Path("D://Bigdata//4、mapreduce//day02//output"));
        // towards yarn The cluster submits this job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

  Case three :

Count common friends

Mission requirements :

The following text ,

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J


Find out which people have common friends , And who are their mutual friends

b -a
c -a
d -a
a -b

c -b

b -e

b -j

Their thinking :

Write two mapreduce

first MR The output is as follows :
b -> a e j
c ->a b e f h

 

the second MR The output is as follows :
a-e b
a-j b
e-j b
a-b c 
a-e c

  such as :

a-e b c d
a-m e f

The code is as follows :

 first mapper:FindFriendMapTaskByOne

 

package com.gec.demo;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.io.PrintStream;
public class FindFriendMapTaskByOne extends Mapper

first reducer:

package com.gec.demo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FindFriendReducerTaskByOne extends Reducer {
    @Override
    protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException {
        StringBuffer strBuf=new StringBuffer();
        for (Text value : values) {
            strBuf.append(value).append("-");
        }
        context.write(key,new Text(strBuf.toString()));
    }
}

first job

package com.gec.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FindFriendJobByOne {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        // Set up Driver class
        job.setJarByClass(FindFriendJobByOne.class);
        // Set up to run that map task
        job.setMapperClass(FindFriendMapTaskByOne .class);
        // Set up to run that reducer task
        job.setReducerClass(FindFriendReducerTaskByOne .class);
        // Set up map task Output key Data type of
        job.setMapOutputKeyClass(Text.class);
        // Set up map task Output value Data type of
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // Specify the location of the data to be processed
        FileInputFormat.setInputPaths(job, "D://Bigdata//4、mapreduce//day05//homework//friendhomework.txt");
        // Specify the location where the results will be saved after processing
        FileOutputFormat.setOutputPath(job, new Path("D://Bigdata//4、mapreduce//day05//homework//output"));
        // towards yarn The cluster submits this job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

The results of :

the second mapper:

package com.gec.demo;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FindFriendMapTaskByTwo extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String []datas=line.split("\t");
        String []userlist=datas[1].split("-");
        for (int i=0;i<userlist.length-1;i++){
            for (int j=i+1;j<userlist.length;j++){
                String user1=userlist[i];
                String user2=userlist[j];
                String friendkey=user1+"-"+user2;
                context.write(new Text(friendkey),new Text(datas[0]));
            }
        }
    }
}

the second reducer:

package com.gec.demo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FindFriendReducerTaskByTwo extends Reducer {
    @Override
    protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException {
        StringBuffer stringBuffer=new StringBuffer();
        for (Text value : values) {
            stringBuffer.append(value).append(",");
        }
        context.write(key,new Text(stringBuffer.toString()));
    }
}

the second job:

package com.gec.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FindFriendJobByTwo {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        // Set up Driver class
        job.setJarByClass(FindFriendJobByTwo.class);
        // Set up to run that map task
        job.setMapperClass(FindFriendMapTaskByTwo .class);
        // Set up to run that reducer task
        job.setReducerClass(FindFriendReducerTaskByTwo .class);
        // Set up map task Output key Data type of
        job.setMapOutputKeyClass(Text.class);
        // Set up map task Output value Data type of
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // Specify the location of the data to be processed
        FileInputFormat.setInputPaths(job, "D://Bigdata//4、mapreduce//day05//homework//friendhomework3.txt");
        // Specify the location where the results will be saved after processing
        FileOutputFormat.setOutputPath(job, new Path("D://Bigdata//4、mapreduce//day05//homework//output"));
        // towards yarn The cluster submits this job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

The results of :

Case four

MapReduce A case study of merging multiple tables in China

1) demand :

Order data sheet t_order:

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

 

Commodity information table t_product

id

pname

01

millet

02

Huawei

03

gree

 

       According to the data in the commodity information table id Merge into order data table .

The final data form :

id

pname

amount

1001

millet

1

1001

millet

1

1002

Huawei

2

1002

Huawei

2

1003

gree

3

1003

gree

3

3.4.1 demand 1:reduce End table merge ( Data skew )

By using association conditions as map Output key, Meet the requirements of two tables join Conditional data and carry the file information of the data source , To the same reduce task, stay reduce Data concatenation in .

 

 

1) Create product and order combined bean class

package com.gec.mapreduce.table;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

 

public class TableBean implements Writable {

       private String order_id; // Order id

       private String p_id; // product id

       private int amount; // Product quantity

       private String pname; // The product name

       private String flag;// Label of table

 

       public TableBean() {

              super();

       }

 

       public TableBean(String order_id, String p_id, int amount, String pname, String flag) {

              super();

              this.order_id = order_id;

              this.p_id = p_id;

              this.amount = amount;

              this.pname = pname;

              this.flag = flag;

       }

 

       public String getFlag() {

              return flag;

       }

 

       public void setFlag(String flag) {

              this.flag = flag;

       }

 

       public String getOrder_id() {

              return order_id;

       }

 

       public void setOrder_id(String order_id) {

              this.order_id = order_id;

       }

 

       public String getP_id() {

              return p_id;

       }

 

       public void setP_id(String p_id) {

              this.p_id = p_id;

       }

 

       public int getAmount() {

              return amount;

       }

 

       public void setAmount(int amount) {

              this.amount = amount;

       }

 

       public String getPname() {

              return pname;

       }

 

       public void setPname(String pname) {

              this.pname = pname;

       }

 

       @Override

       public void write(DataOutput out) throws IOException {

              out.writeUTF(order_id);

              out.writeUTF(p_id);

              out.writeInt(amount);

              out.writeUTF(pname);

              out.writeUTF(flag);

       }

 

       @Override

       public void readFields(DataInput in) throws IOException {

              this.order_id = in.readUTF();

              this.p_id = in.readUTF();

              this.amount = in.readInt();

              this.pname = in.readUTF();

              this.flag = in.readUTF();

       }

 

       @Override

       public String toString() {

              return order_id + "\t" + p_id + "\t" + amount + "\t" ;

       }

}

2) To write TableMapper Program

package com.gec.mapreduce.table;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class TableMapper extends Mapper

       TableBean bean = new TableBean();

       Text k = new Text();

      

       @Override

       protected void map(LongWritable key, Text value, Context context)

                     throws IOException, InterruptedException {

             

              // 1 Get the input file type

              FileSplit split = (FileSplit) context.getInputSplit();

              String name = split.getPath().getName();

             

              // 2 Get input data

              String line = value.toString();

             

              // 3 Different files are processed separately

              if (name.startsWith("order")) {// Order form processing

                     // 3.1 cutting

                     String[] fields = line.split(",");

                    

                     // 3.2 encapsulation bean object

                     bean.setOrder_id(fields[0]);

                     bean.setP_id(fields[1]);

                     bean.setAmount(Integer.parseInt(fields[2]));

                     bean.setPname("");

                     bean.setFlag("0");

                    

                     k.set(fields[1]);

              }else {// Product table processing

                     // 3.3 cutting

                     String[] fields = line.split(",");

                    

                     // 3.4 encapsulation bean object

                     bean.setP_id(fields[0]);

                     bean.setPname(fields[1]);

                     bean.setFlag("1");

                     bean.setAmount(0);

                     bean.setOrder_id("");

                    

                     k.set(fields[0]);

              }

              // 4 Write

              context.write(k, bean);

       }

}

3) To write TableReducer Program

package com.gec.mapreduce.table;

import java.io.IOException;

import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TableReducer extends Reducer

 

       @Override

       protected void reduce(Text key, Iterable

                     throws IOException, InterruptedException {

 

              // 1 Ready to store a collection of orders

              ArrayList

              // 2 Get ready bean object

              TableBean pdBean = new TableBean();

 

              for (TableBean bean : values) {

 

                     if ("0".equals(bean.getFlag())) {// The order sheet

                            // Copy each order data to the collection

                            TableBean orderBean = new TableBean();

                            try {

                                   BeanUtils.copyProperties(orderBean, bean);

                            } catch (Exception e) {

                                   e.printStackTrace();

                            }

 

                            orderBeans.add(orderBean);

                     } else {// Product list

                            try {

                                   // Copy the passed product table into memory

                                   BeanUtils.copyProperties(pdBean, bean);

                            } catch (Exception e) {

                                   e.printStackTrace();

                            }

                     }

              }

 

              // 3 The splicing of tables

              for(TableBean bean:orderBeans){

                     bean.setP_id(pdBean.getPname());

                    

                     // 4 Write the data out

                     context.write(bean, NullWritable.get());

              }

       }

}

4) To write TableDriver Program

package com.gec.mapreduce.table;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TableDriver {

 

       public static void main(String[] args) throws Exception {

              // 1 Get configuration information , perhaps job Object instances

              Configuration configuration = new Configuration();

              Job job = Job.getInstance(configuration);

 

              // 2 Specify the jar Local path of the package

              job.setJarByClass(TableDriver.class);

 

              // 3 Specify this business job To be used mapper/Reducer Business class

              job.setMapperClass(TableMapper.class);

              job.setReducerClass(TableReducer.class);

 

              // 4 Appoint mapper Of output data kv type

              job.setMapOutputKeyClass(Text.class);

              job.setMapOutputValueClass(TableBean.class);

 

              // 5 Specifies the kv type

              job.setOutputKeyClass(TableBean.class);

              job.setOutputValueClass(NullWritable.class);

 

              // 6 Appoint job The directory of the original input file of

              FileInputFormat.setInputPaths(job, new Path(args[0]));

              FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

              // 7 take job Related parameters configured in , as well as job Used java Class where jar package , Submit to yarn To run

              boolean result = job.waitForCompletion(true);

              System.exit(result ? 0 : 1);

       }

}

3) Run program to view results

1001       millet        1    

1001       millet        1    

1002       Huawei        2    

1002       Huawei        2    

1003       gree        3    

1003       gree        3    

shortcoming : In this way , The operation of merging is in reduce Stages to complete ,reduce Too much processing pressure at the end ,map The computing load of nodes is very low , Low resource utilization , And in reduce Data skew is easy to be generated in the stage

Solution : map End to achieve data consolidation

3.4.2 demand 2:map End table merge (Distributedcache)

1) analysis

It is applicable to the case that there are small tables in the associated table ;

Small tables can be distributed to all map node , such ,map Nodes can merge the large table data they read locally and output the final result , It can greatly improve the concurrency of merge operations , Speed up processing .

 

 

2) Practical operation cases

(1) First add the cache file to the driver module

package test;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class DistributedCacheDriver {

 

       public static void main(String[] args) throws Exception {

              // 1 obtain job Information

              Configuration configuration = new Configuration();

              Job job = Job.getInstance(configuration);

 

              // 2 Set load jar Package path

              job.setJarByClass(DistributedCacheDriver.class);

 

              // 3 relation map

              job.setMapperClass(DistributedCacheMapper.class);

             

              // 4 Set the final output data type

              job.setOutputKeyClass(Text.class);

              job.setOutputValueClass(NullWritable.class);

 

              // 5 Set I / O path

              FileInputFormat.setInputPaths(job, new Path(args[0]));

              FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

              // 6 Load cache data

              job.addCacheFile(new URI("file:/e:/cache/pd.txt"));

             

              // 7 map End join You don't have to reduce Stage , Set up reducetask The number of 0

              job.setNumReduceTasks(0);

 

              // 8 Submit

              boolean result = job.waitForCompletion(true);

              System.exit(result ? 0 : 1);

       }

}

(2) Read cached file data

package test;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.HashMap;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class DistributedCacheMapper extends Mapper

 

       Map

      

       @Override

       protected void setup(Mapper

                     throws IOException, InterruptedException {

              // 1 Get cached files

              BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt( Full path to )")));

             

              String line;

              while(StringUtils.isNotEmpty(line = reader.readLine())){

                     // 2 cutting

                     String[] fields = line.split("\t");

                    

                     // 3 Cache data into collections

                     pdMap.put(fields[0], fields[1]);

              }

             

              // 4 Shut off flow

              reader.close();

       }

      

       Text k = new Text();

      

       @Override

       protected void map(LongWritable key, Text value, Context context)

                     throws IOException, InterruptedException {

              // 1 Get a row

              String line = value.toString();

             

              // 2 Intercept

              String[] fields = line.split("\t");

             

              // 3 Get order id

              String orderId = fields[1];

             

              // 4 Get the product name

              String pdName = pdMap.get(orderId);

             

              // 5 Splicing

              k.set(line + "\t"+ pdName);

             

              // 6 Write

              context.write(k, NullWritable.get());

       }

}

  Case 5

Ask for the most expensive item in each order (GroupingComparator)

1) demand

There are the following order data

Order id

goods id

Clinch a deal amount

Order_0000001

Pdt_01

222.8

Order_0000001

Pdt_05

25.8

Order_0000002

Pdt_03

522.8

Order_0000002

Pdt_04

122.4

Order_0000002

Pdt_05

722.4

Order_0000003

Pdt_01

222.8

Order_0000003

Pdt_02

33.8

Now we need to find out the most expensive item in each order .

2) input data

 

The output data is expected to :

    

3) analysis

(1) utilize “ Order id And transaction amount ” As key, Can be map All order data read by the stage is based on id Partition , Sort by amount , Send to reduce.

(2) stay reduce End utilization groupingcomparator Will order id same kv Aggregate into groups , Then the first is the maximum value .

 

4) Realization

Define order information OrderBean

package com.gec.mapreduce.order;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

public class OrderBean implements WritableComparable

 

       private String orderId;

       private double price;

 

       public OrderBean() {

              super();

       }

 

       public OrderBean(String orderId, double price) {

              super();

              this.orderId = orderId;

              this.price = price;

       }

 

       public String getOrderId() {

              return orderId;

       }

 

       public void setOrderId(String orderId) {

              this.orderId = orderId;

       }

 

       public double getPrice() {

              return price;

       }

 

       public void setPrice(double price) {

              this.price = price;

       }

 

       @Override

       public void readFields(DataInput in) throws IOException {

              this.orderId = in.readUTF();

              this.price = in.readDouble();

       }

 

       @Override

       public void write(DataOutput out) throws IOException {

              out.writeUTF(orderId);

              out.writeDouble(price);

       }

 

       @Override

       public int compareTo(OrderBean o) {

              // 1 Order first id Sort ( From small to large )

              int result = this.orderId.compareTo(o.getOrderId());

 

              if (result == 0) {

                     // 2 Sort by amount ( From big to small )

                     result = price > o.getPrice() ? -1 : 1;

              }

 

              return result;

       }

       @Override

       public String toString() {

              return orderId + "\t" + price ;

       }

}

To write OrderSortMapper Processing flow

package com.gec.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OrderSortMapper extends Mapper

       OrderBean bean = new OrderBean();

      

       @Override

       protected void map(LongWritable key, Text value,

                     Context context)throws IOException, InterruptedException {

              // 1 Get a row of data

              String line = value.toString();

             

              // 2 Intercept fields

              String[] fields = line.split("\t");

             

              // 3 encapsulation bean

              bean.setOrderId(fields[0]);

              bean.setPrice(Double.parseDouble(fields[2]));

             

              // 4 Write

              context.write(bean, NullWritable.get());

       }

}

To write OrderSortReducer Processing flow

package com.gec.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OrderSortReducer extends Reducer

       @Override

       protected void reduce(OrderBean bean, Iterable

                     Context context) throws IOException, InterruptedException {

              // Write directly

              context.write(bean, NullWritable.get());

       }

}

To write OrderSortDriver Processing flow

package com.gec.mapreduce.order;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OrderSortDriver {

 

       public static void main(String[] args) throws Exception {

              // 1 Get configuration information

              Configuration conf = new Configuration();

              Job job = Job.getInstance(conf);

 

              // 2 Set up jar Package load path

              job.setJarByClass(OrderSortDriver.class);

 

              // 3 load map/reduce class

              job.setMapperClass(OrderSortMapper.class);

              job.setReducerClass(OrderSortReducer.class);

 

              // 4 Set up map Output data key and value type

              job.setMapOutputKeyClass(OrderBean.class);

              job.setMapOutputValueClass(NullWritable.class);

 

              // 5 Set the key and value type

              job.setOutputKeyClass(OrderBean.class);

              job.setOutputValueClass(NullWritable.class);

 

              // 6 Set input data and output data path

              FileInputFormat.setInputPaths(job, new Path(args[0]));

              FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

              // 10 Set up reduce The end group

              job.setGroupingComparatorClass(OrderSortGroupingComparator.class);

             

              // 7 Set up zones

              job.setPartitionerClass(OrderSortPartitioner.class);

             

              // 8 Set up reduce Number

              job.setNumReduceTasks(3);

 

              // 9 Submit

              boolean result = job.waitForCompletion(true);

              System.exit(result ? 0 : 1);

       }

}

To write OrderSortPartitioner Processing flow

package com.gec.mapreduce.order;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class OrderSortPartitioner extends Partitioner

 

       @Override

       public int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {

             

              return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;

       }

}

To write OrderSortGroupingComparator Processing flow

package com.gec.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

public class OrderSortGroupingComparator extends WritableComparator {

 

       protected OrderSortGroupingComparator() {

              super(OrderBean.class, true);

       }

 

       @Override

       public int compare(WritableComparable a, WritableComparable b) {

             

              OrderBean abean = (OrderBean) a;

              OrderBean bbean = (OrderBean) b;

             

              // take orderId same bean All as a group

              return abean.getOrderId().compareTo(bbean.getOrderId());

       }

}

Please bring the original link to reprint ,thank
Similar articles

2021-06-04

2021-06-04

2021-06-06

2021-06-27

2021-06-29