Liver explosion! Take you to understand Hadoop serialization

ZSYL 2021-09-15 08:53:53

1. Serialization Overview

1) What is serialization

serialize Namely Object in memory , Convert to a sequence of bytes ( Or other data transfer protocols ) For easy storage to disk ( Persistence ) And network transmission .

Deserialization Is to receive a sequence of bytes ( Or other data transfer protocols ) Or is it Persistent data on disk , Convert to objects in memory .

2) Why serialize

Generally speaking ,“ Live ” Objects only exist in memory , Turn off the power and it's gone . and “ Live ” Object can only be used by local processes , Can't be sent to another computer on the network .

However serialize Can be stored “ Live ” object , Can be “ Live ” Object to the remote computer .

3) Why not Java Serialization

Java Serialization It's a Heavyweight serialization framework (Serializable), After an object is serialized , There will be a lot of additional information attached ( All kinds of verification information ,Header, Inheritance system, etc ), It's not easy to transmit efficiently in the network .

therefore ,Hadoop I have developed a serialization mechanism (Writable).

4)Hadoop Serialization features :

(1) compact : Efficient use of storage space .

(2) Fast : The extra cost of reading and writing data is small .

(3) interoperability : Support multi language interaction

2. Customize bean object

Customize bean Object implements the serialization interface (Writable)

The basic serialization types often used in enterprise development cannot meet all requirements , For example Hadoop Pass a... Inside the framework bean object , Then the object needs implements Serializable .

Concrete realization bean The object serialization steps are as follows 7 Step :

(1) Must be realized Writable Interface

(2) When deserializing , Need reflection to call null parameter constructor , So there has to be a null parameter construct

public FlowBean() {

super();
}

(3) Override the serialization method

@Override
public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

(4) Override the deserialization method

@Override
public void readFields(DataInput in) throws IOException {

upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

(5) Note that the order of deserialization is exactly the same as that of serialization

(6) To display the results in a file , need rewrite toString(), You can use "\t" Separate , Convenient for subsequent use .

(7) If you need to customize bean Put it in key Transmission of , Then we need to realize Comparable Interface , because MapReduce In the box Shuffle Process requirements for key Must be able to sort .

See the following sorting cases for details :

@Override
public int compareTo(FlowBean o) {

// Reverse order , From big to small 
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

3. Serial case operation

1) demand

Count the total uplink traffic consumed by each mobile phone number 、 Total downlink traffic 、 Total discharge .

(1) input data

 Insert picture description here

1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200

(2) Input data format :

7 13560436666 120.196.100.99 1116 954 200
id Phone number The Internet ip Uplink traffic Downstream traffic Network status code

(3) Expected output data format

13560436666 1116 954 2070
Phone number Uplink traffic Downstream traffic Total discharge

2) Demand analysis

 Insert picture description here
3) To write MapReduce Program

(1) Write the flow statistics Bean object

package com.zs.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/** * 1. Define class implementation writable Interface * 2. Override serialization and deserialization methods * 3. Override null parameter construction * 4.toString() Method * serialize :sumFlow -- downFlow -- upFlow--->upFlow -- downFlow -- sumFlow */
public class FlowBean implements Writable {

private long upFlow; // Uplink traffic 
private long downFlow; // Downstream traffic 
private long sumFlow; // Total discharge 
// Space parameter structure 
public FlowBean() {

}
public long getUpFlow() {

return upFlow;
}
public void setUpFlow(long upFlow) {

this.upFlow = upFlow;
}
public long getDownFlow() {

return downFlow;
}
public void setDownFlow(long downFlow) {

this.downFlow = downFlow;
}
public long getSumFlow() {

return sumFlow;
}
public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;
}
// heavy load 
public void setSumFlow() {

this.sumFlow = this.upFlow + this.downFlow;
}
// serialize 
@Override
public void write(DataOutput out) throws IOException {

out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
// Deserialization 
@Override
public void readFields(DataInput in) throws IOException {

this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}

(2) To write Mapper class

package com.zs.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 1. Get a row 
// 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String line = value.toString();
// 2. cutting 
String[] split = line.split("\t");
// 3. Grab want The data of 
String phone = split[1];
String up = split[split.length - 3];
String down = split[split.length - 2];
// 4. encapsulation 
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
// 5. Write 
context.write(outK, outV);
}
}

(3) To write Reducer class

package com.zs.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

// 1. Traverse the set and accumulate values 
long totalUp = 0;
long totaldown = 0;
for (FlowBean value : values) {

totalUp += value.getUpFlow();
totaldown += value.getDownFlow();
}
// 2. encapsulation outK,outV
outV.setUpFlow(totalUp);
outV.setDownFlow(totaldown);
outV.setSumFlow();
// 3. Write 
context.write(key, outV);
}
}

(4) To write Driver Drive class

package com.zs.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 1. obtain job object 
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. Set up jar package 
job.setJarByClass(FlowDriver.class);
// 3. relation mapper and reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4. Set up mapper Output key and value type 
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5. Set the final data output key and value type 
job.setOutputValueClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6. Set the input path and output path of data 
FileInputFormat.setInputPaths(job, new Path("D:\\software\\hadoop\\input\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("D:\\software\\hadoop\\output\\output2"));
// 7. Submit job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}

 Insert picture description here

 Insert picture description here
come on. !

thank !

Strive !

Please bring the original link to reprint ,thank
Similar articles

2021-09-15

2021-09-15

2021-09-15