Comprehensive summary of spark data skew solutions

Big data 2021-10-14 07:56:54
 Big data
Big data
Mainly share big data framework , Such as spark,flink,kafka,hbase Principle source code , At the same time, it will share the data warehouse , Figure calculation and other wave crest fields .
367 Original content
official account

Abstract

This paper expounds in detail with examples Spark Several scenarios of data skew and corresponding solutions , Including avoiding data source skew , Adjust parallelism , Use customization Partitioner, Use Map Side Join Instead of Reduce Side Join, Give me a tilt Key Add random prefixes, etc .

Why handle data skew (Data Skew)

What is data skew

Yes Spark/Hadoop In terms of such a big data system , Large amount of data is not terrible , The scary thing is data skew .

What is data skew ? Data skew means , Data sets for parallel processing , Some part ( Such as Spark or Kafka One of the Partition) There are significantly more data than other parts , Thus, the processing speed of this part becomes the bottleneck of the whole dataset processing .

For distributed systems , Ideally , With the scale of the system ( Number of nodes ) An increase in , The overall application time decreases linearly . If a machine processes a lot of data, it needs 120 minute , When the number of machines increases to three , The ideal time is 120 / 3 = 40 minute , As shown in the figure below

however , The above situation is only ideal , After actually converting a stand-alone task into a distributed task , There will be overhead, The total amount of tasks is increased compared with that of single machine , So the total execution time of each machine is greater than that of a single machine . These are not considered here overhead, Suppose that after a single task is converted to a distributed task , The total number of tasks remains unchanged . But even so , In order to achieve the distributed situation, the execution time of each machine is single machine 1 / N, You have to make sure that the tasks of each machine are equal . Unfortunately , A lot of times , The distribution of tasks is uneven , Even uneven so that most tasks are assigned to individual machines , Most of the other machines are assigned only a small part of the total . For example, a machine is responsible for handling 80% The task of , The other two machines deal with each 10% The task of , As shown in the figure below

In the diagram above , Machine data tripled , But the execution time is only reduced to the original 80%, Far below ideal .

The harm of data skew

As can be seen from the above figure , When data skews , Small tasks take much longer than other tasks , So the whole time is too long , The advantages of parallel computing in distributed systems are not fully utilized . in addition , When data skews , Some tasks process too much data , It may cause the task to fail due to insufficient memory , And then introduce the whole application failure .

How data skew is caused

stay Spark in , The same Stage Different Partition It can be processed in parallel , And the difference with dependency Stage It's serial processing between them . Suppose that one Spark Job It is divided into Stage 0 and Stage 1 Two Stage, And Stage 1 Depend on Stage 0, that Stage 0 It won't be processed until it's completely processed Stage 1. and Stage 0 May contain N individual Task, this N individual Task It can be done in parallel . If one N-1 individual Task All in 10 seconds , And the other Task But it takes time 1 minute , Then Stage The total time is at least 1 minute . let me put it another way , One Stage Time spent , Mainly by the slowest one Task decision .

Because of the same Stage In all of the Task Perform the same calculation , On the premise of excluding the difference of computing power of different computing nodes , Different Task The time difference between them is mainly due to Task Determination of the amount of data processed .

Stage The data sources of are mainly divided into the following two categories

  • Read directly from the data source . Such as reading HDFS,Kafka

  • Read last Stage Of Shuffle data

How to alleviate / Eliminate data skew

Avoid data skew of data source ———— read Kafka

With Spark Stream adopt DirectStream Mode reading Kafka Take the data . because Kafka Every one of Partition Corresponding Spark One of the Task(Partition), therefore Kafka Internal correlation Topic Of Partition Is the data balanced between , Decide directly Spark Will data skew occur when processing this data .

Such as 《Kafka Design analysis ( One )- Kafka Background and structure Introduction 》 As stated in the article ,Kafka A certain Topic The internal news is different Partition The distribution between , Mainly by Producer What the end uses Partition Implementation class decision . If you use random Partitioner, Each message will be randomly sent to a Partition in , So in terms of probability , various Partition The data will be balanced . At this time source Stage( Direct reading Kafka Data Stage) No data skew .

But a lot of times , Business scenarios may require that data with the same characteristics be consumed sequentially , At this point, we need to put the data with the same characteristics in the same Partition in . A typical scenario is , You need to associate the same user with PV The information is placed in the same Partition in . here , If data skew occurs , It needs to be handled in other ways .

Avoid data skew of data source ———— Reading documents

principle

Spark In order to pass the textFile(path, minPartitions) Method to read the file , Use TextFileFormat.

For non sharable files , One for each file Split So as to correspond to a Partition. Check whether the file sizes are consistent , It largely determines whether there is data skew on the data source side . in addition , For non sharable compressed files , Even if the compressed file size is the same , The actual amount of data it contains may also vary a lot , Because the higher the data repetition of the source file , Higher compression ratio . In turn, , Even if the compressed file size is close to , But because the compression ratio may vary greatly , The amount of data to be processed can also vary widely .

At this time, the non sharable file can be stored as a sharable file at the data generation end , Or ensure that each file contains the same amount of data to avoid data skew .

For sharable files , Every Split The size is determined by the following algorithm . among goalSize Equal to the total size of all files divided by minPartitions. and blockSize, If it is HDFS file , By the file itself block Size decides ; If it is Linux Local files , And use local mode , from fs.local.block.size decision .

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

By default Split It won't be too big , Generally equivalent to a Block size ( stay Hadoop 2 in , The default value is 128MB), So the data skew problem is not obvious . If there is a serious data skew , It can be adjusted by the above parameters .

Case study

Now generate some text files through scripts , And simple word counting through the following code . To avoid Shuffle, Just count the total number of words , It is not necessary to count words in groups .

SparkConf sparkConf = new SparkConf()
.setAppName("ReadFileSkewDemo");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
long count = javaSparkContext.textFile(inputFile, minPartitions)
.flatMap((String line) -> Arrays.asList(line.split(" ")).iterator()).count();
System.out.printf("total words : %s", count);
javaSparkContext.stop();

A total of 11 individual csv file , among 10 Both sizes are 271.9MB, The other size is 8.5GB.

After the 8.5GB The size of the file used gzip Compress , The compressed size is only 25.3MB.

Use the above code to count words in uncompressed folders .Split The size is max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 10+8.5 1024) / 1 MB, 128 MB) = 128MB. No significant data skew .

Use the same code to perform the same word counting operation on the folder containing compressed files . Of uncompressed files Split The size is still 128MB, And compressed files (gzip Compress ) Because it is not separable , And the size is only 25.3MB, Therefore, the file is treated as a separate Split/Partition. Although the file is relatively small , But it consists of 8.5GB Compressed files , The amount of data contained is that of other uncompressed files 32 times , So deal with this Split/Partition/ Of documents Task Time consuming to 4.4 minute , Much higher than other Task Of 10 second .

Because of the above gzip The compressed file size is 25.3MB, Less than 128MB Of Split size , Not prove gzip Compressed files are not sharable . Will now minPartitions From the default 1 Set to 229, So as to achieve the goal Split The size is max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 * 10+25.3) / 229 MB, 128 MB) = 12 MB. If gzip Compressed files can be segmented , Then all Split/Partition No size is much larger than 12. conversely , If it still exists 25.3MB Of Partition, shows gzip Compressed files are really not sharable , When generating non sharable files, it is necessary to ensure that the number of files is roughly the same as described above .

As shown in the figure below ,gzip Compressed file corresponds to Split/Partition The size is 25.3MB, Other Split All sizes are 12MB about . And the Task Time consuming 4.7 minute , Much larger than others Task Of 4 second .

summary

Applicable scenario There are non sharable files on the data source side , And the amount of data contained in the file varies greatly .

Solution Try to use sharable format instead of non sharable format , Or ensure that the actual amount of data contained in each document is roughly the same .

advantage The bottom can be removed to eliminate the data skew on the data source side , Remarkable effect .

Inferiority Data sources generally come from external systems , Need the support of external system .

Adjust the parallelism to disperse the same Task Different Key

principle

Spark Doing it Shuffle when , By default HashPartitioner( Not Hash Shuffle) Partition the data . If the parallelism is not set properly , May cause a lot of different Key The corresponding data is assigned to the same Task On , Cause this Task The data processed is much larger than other Task, This causes data skew .

If you adjust Shuffle The parallelism of time , To cause to be assigned to the same Task Different Key The distribution is different Task Upper processing , Can reduce the original Task The amount of data to be processed , So as to alleviate the short board effect caused by data skew .

Case study

There is a test table , be known as student_external, There are 10.5 Billion data , Each data has a unique id value . Now take it out id The value is 9 Million to 10.5 A total of billion 1.5 Billion data , And through some processing , bring id by 9 Million to 9.4 All the data between 100 million pairs 12 The remainder after taking the modulus is zero 8( That is to say Shuffle The degree of parallelism is 12 The data set is completely deleted HashPartition Assign to 8 individual Task), Other data sets have no effect on it id Divide 100 integer , Thus making id Greater than 9.4 Billion data in Shuffle Can be evenly distributed to all Task in , and id Less than 9.4 100 million data are all assigned to the same Task in . The process is as follows

INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )
ELSE CAST(id/100 AS INTEGER)
END,
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

Through the above treatment , A test data that may cause subsequent data skew is ready . Next , Use Spark Read the test data , And pass groupByKey(12) Yes id Group processing , And Shuffle The degree of parallelism is 12. The code is as follows

public class SparkDataSkew {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("SparkDataSkewTunning")
.config("hive.metastore.uris", "thrift://hadoop1:9083")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> dataframe = sparkSession.sql( "select * from test");
dataframe.toJavaRDD()
.mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))
.groupByKey(12)
.mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {
int id = tuple._1();
AtomicInteger atomicInteger = new AtomicInteger(0);
tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
return new Tuple2<Integer, Integer>(id, atomicInteger.get());
}).count();

sparkSession.stop();
sparkSession.close();
}

}

The number of cluster nodes used in this experiment is 4, Each node can be Yarn The use of CPU Auditing for 16, Memory is 16GB. Submit the above application as follows , Will start 4 individual Executor, Every Executor The number of available cores is 12( This configuration is not optimal in a production environment , Only for this experiment ), Available memory is 12GB.

spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

GroupBy Stage Of Task The status is shown in the figure below ,Task 8 The number of records processed is 4500 ten thousand , Far greater than (9 Twice as much as ) Other 11 individual Task To deal with the 500 Ten thousand records . and Task 8 The time taken is 38 second , Much higher than other 11 individual Task The average time (16 second ). Whole Stage The time is also 38 second , This time is mainly composed of the slowest Task 8 decision .

under these circumstances , Can be adjusted by Shuffle Parallelism , So that it was originally assigned to the same Task( That is... In this example Task 8) Different Key Assign to different Task, To reduce Task 8 The amount of data to be processed , Ease data skew .

adopt groupByKey(48) take Shuffle The parallelism is adjusted to 48, Resubmit to Spark. new Job Of GroupBy Stage all Task The status is shown in the figure below .

It can be seen from the figure above , The one with the most records Task 20 The number of records processed is about 1125 ten thousand , The degree of parallelism is 12 when Task 8 Of 4500 ten thousand , To reduce the 75% about , And its time-consuming changes from the original Task 8 Of 38 Seconds down to 24 second .

In this case , Adjust parallelism , It doesn't mean you have to increase parallelism , It may also be to reduce the degree of parallelism . If you pass groupByKey(11) take Shuffle The parallelism is adjusted to 11, Resubmit to Spark. new Job Of GroupBy Stage All of the Task The status is shown in the figure below .

As can be seen from the above figure , Processing the largest number of records Task 6 The number of records processed is about 1045 ten thousand , Time consuming to 23 second . Process the least number of records Task 1 The number of records processed is about 545 ten thousand , Time consuming 12 second .

summary

Applicable scenario A lot of different Key Assigned to the same Task Cause this Task Too much data .

Solution Adjust parallelism . In general, it is to increase the degree of parallelism , But sometimes, as in this example, reducing the parallelism can also achieve the effect .

advantage Implement a simple , But in need Shuffle Set the parallelism directly or use spark.default.parallelism Set up . If it is Spark SQL, Can also be through SET spark.sql.shuffle.partitions=[num_tasks] Set parallelism . Solve the problem at the lowest cost . Generally, if there is data skew , You can test it several times in this way , If the problem is not solved , Try something else .

Inferiority Less applicable scenarios , Can only be assigned to the same Task Different Key Spread out , But for the same Key In case of serious inclination, this method is not applicable . And this method can only alleviate data skew , The problem has not been completely eliminated . From practical experience , Its effect is average .

Customize Partitioner

principle

Use custom Partitioner( The default is HashPartitioner), Assign the original to the same Task Different Key Assign to different Task.

Case study

Take the above data set as an example , Continue setting concurrency to 12, But in groupByKey On operator , Use custom Partitioner( The implementation is as follows )

 .groupByKey(new Partitioner() {
@Override
public int numPartitions() {
return 12;
}

@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})

As shown in the figure below , Use customization Partition after , The most time consuming Task 6 Processing about 1000 Ten thousand data , when 15 second . And each Task The size of the data set processed is quite .

summary

Applicable scenario A lot of different Key Assigned to the same Task Cause this Task Too much data .

Solution Use custom Partitioner Implementation classes instead of the default HashPartitioner, Try to put all the different Key Evenly distributed to different Task in .

advantage Does not affect the original parallelism design . If you change the parallelism , follow-up Stage The parallelism of will also change by default , May affect the following Stage.

Inferiority Limited application scenarios , Can only be different Key Spread out , For the same Key It's not suitable for scenarios with very large data sets . The effect is similar to adjusting the parallelism , Data skew can only be alleviated but not eliminated completely . And it needs to be customized according to the data characteristics Partitioner, inflexible .

take Reduce side Join Turn into Map side Join

principle adopt Spark Of Broadcast Mechanism , take Reduce Side Join Turn into Map Side Join, avoid Shuffle So as to completely eliminate Shuffle Data skew .

Case study

Through the following SQL Create a sheet with a slant Key And the total number of records is 1.5 It's a big watch test.

INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )
ELSE CAST(id/10 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

Use as follows SQL Create a uniform data distribution with a total number of records 50 Wan's little watch test_new.

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

Directly through Spark Thrift Server Submitted as follows SQL Will table test And watch test_new Conduct Join And will Join The results are stored in table test_join in .

INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

The SQL Corresponding DAG As shown in the figure below . As can be seen from this figure , The execution process is divided into three stages Stage, The first two are used from Hive Read data from , Both at the same time Shuffle, Through the last Stage Conduct Join And write the results to the table test_join in .

As can be seen from the figure below ,Join Stage various Task The processed data is heavily skewed , Processing the largest amount of data Task Time consuming 7.1 minute , Much higher than other methods without data tilt Task about 2 Seconds .

Next , Try to pass through Broadcast Realization Map Side Join. Realization Map Side Join Methods , Not directly through CACHE TABLE test_new Put the little watch test_new Conduct cache. It is hereby adopted as follows SQL Conduct Join.

CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

Through the following DAG The picture shows , The operation is still divided into three Stage, And there are still Shuffle There is , The only difference is , The reading of small tables is no longer directly scanned Hive surface , Instead, scan cached tables in memory .

And the data skew still exists . As shown in the figure below , The slowest Task Time consuming to 7.1 minute , Much higher than other Task About 2 second .

Proper use Broadcast Realization Map Side Join The way is , adopt SET spark.sql.autoBroadcastJoinThreshold=104857600; take Broadcast The threshold of is set large enough .

Pass again as follows SQL Conduct Join.

SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

Through the following DAG The picture shows , The scheme contains only one Stage.

And as can be seen from the figure below , various Task It takes quite , No obvious data skew . And the total time is 1.5 minute , Far below Reduce Side Join Of 7.3 minute .

summary

Applicable scenario Participate in Join One side of the dataset is small enough , Can be loaded into Driver And pass Broadcast Methods broadcast to each Executor in .

Solution stay Java/Scala The code pulls the small dataset data to Driver, And then through Broadcast The scheme broadcast the data of small data set to each Executor. Or in use SQL front , take Broadcast The threshold of is adjusted large enough , To use Broadcast take effect . And then Reduce Side Join Replace with Map Side Join.

advantage Avoided Shuffle, Completely eliminate the conditions of data skew , Can greatly improve performance .

Inferiority Ask to participate in Join Data set on one side of is small enough , And mainly for Join Scene , Not suitable for aggregation scenarios , Limited application conditions .

by skew Of key Before adding random / suffix

principle Because of the huge amount of data Key Before adding random / suffix , Make original Key The same data becomes Key Different data , So that slanted data sets are spread out into different Task in , Completely solve the data skew problem .Join In another piece of data , And tilt Key Part of the corresponding data , Cartesian product with random prefix set , So as to ensure no matter the data is tilted or tilted Key How to prefix , Can be normal with it Join.

Case study

Through the following SQL, take id by 9 Million to 9.08 100 million in total 800 Million pieces of data id To 9500048 perhaps 9500096, Of other data id Divide 100 integer . Thus, the dataset ,id by 9500048 and 9500096 Data for each 400 ten thousand , Other id The number of corresponding data records is 100 strip . These data are stored in a file called test In the table of .

For another small table test_new, Take out 50 Ten thousand data , And will id( Incremental and unique ) Divide 100 integer , Make all id All corresponding 100 Data .

INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
ELSE CAST(id/100 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/100 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

Through the following code , Read test The data in the folder corresponding to the table is and converted to JavaPairRDD Stored in leftRDD in , Also read test The data corresponding to the table is stored in rightRDD in . adopt RDD Of join Operator pairs leftRDD And rightRDD Conduct Join, And specify the parallelism as 48.

public class SparkDataSkew{
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");
sparkConf.set("spark.default.parallelism", String.valueOf(parallelism));
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

leftRDD.join(rightRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()))
.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

As can be seen from the figure below , Whole Join Time consuming 1 branch 54 second , among Join Stage Time consuming 1.7 minute .

Through analysis Join Stage All of the Task You know , On the other Task The number of records processed is 192.71 Million at the same time Task 32 The number of records processed is 992.72 ten thousand , So it takes 1.7 minute , Much higher than other Task About 10 second . This is the same as when preparing the dataset above , take id by 9500048 by 9500096 The corresponding data volume setting is very large , Other id The corresponding data set is very uniform and consistent .

Now through the following operations , Achieve tilt Key Decentralized treatment of

  • take leftRDD In a slant key( namely 9500048 And 9500096) The corresponding data is filtered out separately , And add 1 To 24 The random prefix of , The prefix and the original data are separated by commas ( To make it easier to remove the prefix later ) Form a separate leftSkewRDD

  • take rightRDD Medium tilt key The corresponding data is extracted , And pass flatMap The operation converts every data in the dataset to 24 Data ( Add... To each 1 To 24 The random prefix of ), Form a separate rightSkewRDD

  • take leftSkewRDD And rightSkewRDD Conduct Join, And set the parallelism to 48, And in Join In the process, the random prefix is removed , To get the tilt data set Join result skewedJoinRDD

  • take leftRDD Does not include tilt Key The data is extracted as a separate leftUnSkewRDD

  • Yes leftUnSkewRDD With the original rightRDD Conduct Join, Parallelism is also set to 48, obtain Join result unskewedJoinRDD

  • adopt union Operator will skewedJoinRDD And unskewedJoinRDD A merger , So as to get the complete Join Result set

The specific implementation code is as follows

public class SparkDataSkew{
public static void main(String[] args) {
int parallelism = 48;
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

String[] skewedKeyArray = new String[]{"9500048", "9500096"};
Set<String> skewedKeySet = new HashSet<String>();
List<String> addList = new ArrayList<String>();
for(int i = 1; i <=24; i++) {
addList.add(i + "");
}
for(String key : skewedKeyArray) {
skewedKeySet.add(key);
}

Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

JavaPairRDD<String, String> leftSkewRDD = leftRDD
.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));

JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);

JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
.join(rightSkewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));
JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));

skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

As can be seen from the figure below , Whole Join Time consuming 58 second , among Join Stage Time consuming 33 second .

Through analysis Join Stage All of the Task You know

  • because Join Split tilt dataset Join And non skewed data sets Join, And each Join The degree of parallelism is 48, Therefore, the total parallelism is 96

  • Because when submitting a task , Set up Executor The number is 4, Every Executor Of core The number of 12, So it can be used Core The number of 48, So before 48 individual Task At the same time to start ( Its Launch The same time ), after 48 individual Task Start up times vary ( Wait for the Task It starts when it's over )

  • Because of the tilt Key Be prefixed with random , Originally the same Key Become different Key, Dispersed to different Task Handle , So in all Task in , No data set was found to be significantly higher than other data sets Task The situation of

actually , Because of the tilt Key With no tilt Key The operation of is completely independent , It can be done in parallel . This experiment is limited by the total number of available cores 48, Total that can run simultaneously Task The number of 48, Therefore, the scheme only reduces the total time consumption by half ( Double the efficiency ). If resources are sufficient , Can be executed concurrently Task The number is increasing , The advantages of this scheme will be more obvious . In the actual project , This scheme can often be improved several times to 10 The efficiency of The Times .

summary

Applicable scenario Both tables are bigger , Can't use Map be Join. One of them RDD There are a few Key Too much data , Another one RDD Of Key Well distributed .

Solution There will be data skew RDD Medium tilt Key The corresponding data set is extracted separately and added with random prefix , Another one RDD Each data is combined with a random prefix to form a new RDD( It's equivalent to adding its data to the original N times ,N That is, the total number of random prefixes ), And then I'll put both Join And remove the prefix . Then there will be no tilt Key Of the remaining data Join. The last two will be Join The result set of union Merge , You get all of it Join result .

advantage be relative to Map be Join, More adaptable to big data sets Join. If resources are sufficient , The slanted part data set and the non slanted part data set can be carried out in parallel , The efficiency has improved significantly . And only for the data of the tilt part to do data expansion , Increased resource consumption is limited .

Inferiority If you tilt Key A lot , On the other side, the data expansion is very large , This scheme does not apply to . And it's right now Key With no tilt Key Separate the , You need to scan the dataset twice , Increased expenses .

Large tables are added randomly N Kind of random prefix , A small watch expands N times

principle

If there is data skew Key More , The last way is to tilt a lot of these Key Split it up , It doesn't make much sense . At this time, it is more suitable to directly add random prefixes to all data sets with data skew , Then the Cartesian product of the whole data set without serious data skew and the random prefix set is made ( To expand the amount of data N times ).

Case study

Here is the sample code , Readers can refer to a few examples in the above Key Method of adding random prefix , Self testing .

public class SparkDataSkew {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("ResolveDataSkewWithNAndRandom");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

List<String> addList = new ArrayList<String>();
for(int i = 1; i <=48; i++) {
addList.add(i + "");
}

Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));

JavaPairRDD<String, String> rightNewRDD = rightRDD
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);

JavaPairRDD<String, String> joinRDD = leftRandomRDD
.join(rightNewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

summary

Applicable scenario The skew of a dataset Key More , The other dataset is more evenly distributed .

advantage For most scenarios , The result is right .

Inferiority Need to expand a data set as a whole N times , Will increase resource consumption .

summary

For data skew , There is no one way to do it once and for all . More time , Combined with the characteristics of data ( Dataset size , tilt Key How much... Etc ) Use a combination of the methods described above .

 Big data
Big data
Mainly share big data framework , Such as spark,flink,kafka,hbase Principle source code , At the same time, it will share the data warehouse , Figure calculation and other wave crest fields .
367 Original content
official account
Please bring the original link to reprint ,thank
Similar articles