blog 2

This section provides a complete MapReduce implementation of the secondary sort problem using the Hadoop framework.

 

Input

The input will be a set of files, where each record (line) will have the following format:

Format:

<year><,><month><,><day><,><temperature>

Example:

2012, 01, 01, 35 2011, 12, 23, -4

 

Expected output

The expected output will have the following format:

Format:

<year><-><month>: <temperature1><,><temperature2><,> … where temperature1 <= temperature2 <= …

Example:

2012-01: 5, 10, 35, 45, …

2001-11: 40, 46, 47, 48, …

2005-08: 38, 50, 52, 70, …

 

map() function

The map() function parses and tokenizes the input and then injects the value (temper ature) into the reducer key, as shown in Example 4.

Example 4. map() for secondary sorting

image010

image011

reduce() function

The reducer’s primary function is to concatenate the values (which are already sorted through the Secondary Sort design pattern) and emit them as output. The reduce() function is given in Example 5.

Example 5. reduce() for secondary sorting

image012

image013

Hadoop implementation classes

The classes shown in Table 1 are used to solve the problem.

Table 1. Classes used in MapReduce/Hadoop solution

image014

How is the value injected into the key? The first comparator (the DateTemperature Pair.compareTo() method) controls the sort order of the keys, while thesecond comparator (the DateTemperatureGroupingComparator.compare() method) controls which keys are grouped together into a single call to thereduce() method. The combination of these two comparators allows one to set up jobs that act like one has defined an order for the values.

The SecondarySortDriver is the driver class, which registers the custom plug-in classes (DateTemperaturePartitioner andDateTemperatureGroupingComparator) with the MapReduce/Hadoop framework. This driver class is presented in Example 6.

Example 6. SecondarySortDriver class

image015

image016

Sample run of Hadoop implementation

 

Input

image017

HDFS input

image019

The script

image020

Log of sample run

image021

image022

Inspecting the output

image023

How to sort in ascending or descending order

You can easily control the sorting order of the values (ascending or descending) by using the DateTemperaturePair.compareTo() method as follows:

image024

Spark solution in secondary sort

To solve a secondary sorting problem in Spark, we have at least two options:

Option #1

Read and buffer all of the values for a given key in an Array or List data structure and then do an in-reducer sort on the values. This solution works if one has a small set of values (which will fit in memory) per reducer key.

Option #2

Use the Spark framework for sorting the reducer values (this option does not require in-reducer sorting of values passed to the reducer). This approach involves “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” This option always scales (because you are not limited by the memory of a commodity server).

 

Time series as input

To demonstrate secondary sorting, let’s use time series data:

image025

image026

Expected output

Our expected output is as follows. Note that the values of reducers are grouped by name and sorted by time:

name  t1    t2 t3    t4    t5 … x => [3, 9, 6]

y => [7,   5, 1]

z => [4,   8, 7,    0]

p => [9,   6, 7,    0,    3]

 

Option 1: Secondary sorting in memory

Since Spark has a very powerful and high-level API, I will present the entire solution in a single Java class. The Spark API is built upon the basic abstractionconcept of the RDD (resilient distributed data set). To fully utilize Spark’s API, we have to under‐ stand RDDs. An RDD<T> (i.e., an RDD of type T) object represents an immutable, partitioned collection of elements (of type T) that can be operated on in parallel. The RDD<T> class contains the basic MapReduceoperations available on all RDDs, such as map(), filter(), and persist(), while the JavaPairRDD<K,V> class contains MapReduce operations such as mapToPair(),flatMapToPair(), and groupByKey(). In addition, Spark’s PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as reduce(),groupByKey(), and join()., JavaRDD<T> is a list of objects of type T, and JavaPairRDD<K,V> is a list of objects of type Tuple2<K,V> (where each tuplerepresents a key-value pair).

The Spark-based algorithm is listed next. Although there are 10 steps, most of them are trivial and some are provided for debugging purposes only:

  1. We import the required Java/Spark classes. The main Java classes for MapReduce are given in the org.apache.spark.api.java package. This packageincludes the following classes and interfaces:
  • JavaRDDLike (interface)
  • JavaDoubleRDD
  • JavaPairRDD
  • JavaRDD
  • JavaSparkContext
  • StorageLevels
  1. We pass input data as arguments and validate.
  2. We connect to the Spark master by creating a JavaSparkContext object, which is used to create new RDDs.
  3. Using the context object (created in step 3), we create an RDD for the input file; the resulting RDD will be a JavaRDD<String>. Each element of this RDDwill be a record of time series data: <name><,><time><,><value>.
  4. Next we want to create key-value pairs from a JavaRDD<String>, where the key is the name and the value is a pair of (time, value). The resulting RDD willbe a JavaPairRDD<String, Tuple2<Integer, Integer>>.
  5. To validate step 5, we collect all values from the JavaPairRDD<> and print them.
  6. We group JavaPairRDD<> elements by the key (name). To accomplish this, we use

the groupByKey() method.

The result will be the RDD:

JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>>

Note that the resulting list (Iterable<Tuple2<Integer, Integer>>) is unsorted. In general,       Spark’s reduceByKey() is preferred over groupByKey() forperformance reasons, but here we have no other option than groupByKey() (since reduceByKey() does not allow us to sort the values in place for a given key).

  1. To validate step 7, we collect all values from the JavaPairRDD<String, Itera ble<Tuple2<Integer, Integer>>> and print them.
  2. We sort the reducer’s values to get the final output. We accomplish this by writing a custom mapValues() method. We just sort the values (the key remainsthe same).
  3. To validate the final result, we collect all values from the sorted JavaPairRDD<> and print them.

A solution for option #1 is implemented by a single driver class: SecondarySorting (see Example 7). All steps, 1–10, are listed inside the class definition, which will be presented in the following sections. Typically, a Spark application consists of a driver program that runs the user’s main() function and executes various parallel operations on a cluster. Parallel operations will be achieved through the extensive use of RDDs.

 

Example 7. SecondarySort class overall structure

image027

Step 1: import required classes

As shown in Example 8, the main Spark package for the Java API is org.apache.spark.api.java, which includes the JavaRDD, JavaPairRDD, andJavaS parkContext classes. JavaSparkContext is a factory class for creating new RDDs (such as JavaRDD and JavaPairRDD objects).

Example 8. Step 1: Import required classes

image028

Step 2: Read input parameters

This step, demonstrated in Example 9, reads the HDFS input file (Spark may read data from HDFS and other persistent stores, such as a Linux filesystem), which might look like /dir1/dir2/myfile.txt.

 

Example 9. Step 2: Read input parameters

image029

Step 3: Connect to the Spark master

To work with RDDs, first you need to create a JavaSparkContext object (as shown in Example 10), which is a factory for creating JavaRDD and JavaPairRDDobjects. It is also possible to create a JavaSparkContext object by injecting a SparkConf object into the JavaSparkContext’s class constructor. This approach is useful when you read your cluster configurations from an XML file. In a nutshell, the JavaSparkContext object has the following responsibilities:

  • Initializes the application driver.
  • Registers the application driver to the cluster manager. (If you are using the Spark cluster, then this will be the Spark master; if you are using YARN, then it will be YARN’s resource manager.)
  • Obtains a list of executors for executing your application driver.

Example 10. Step 3: Connect to the Spark master

image030

Step 4: Use the JavaSparkContext to create a JavaRDD

This step, illustrated in Example 11, reads an HDFS file and creates a  JavaRDD<String> (which represents a set of records where each record is a  String object). By definition, Spark’s RDDs are immutable (i.e., they cannot be altered or modified). Note that Spark’s RDDs are the basic abstraction for parallel execution.Note also that one may use  textFile() to read HDFS or non-HDFS files.

Example 11. Step 4: Create JavaRDD

image031

Step 5: Create key-value pairs from the JavaRDD

This step, shown in Example 12, implements a mapper. Each record (from the JavaRDD<String> and consisting of <name><,><time><,><value>) is converted to a key-value pair, where the key is a name and the value is a Tuple2(time, value).

 

Example 12. Step 5: Create key-value pairs from JavaRDD

image032

Step 6: Validate step 5

To debug and validate the steps in Spark (as shown in Example 13), one may use JavaRDD.collect() and JavaPairRDD.collect(). Note that collect() is used fordebugging and educational purposes (but avoid using collect() for debugging purposes in production clusters; doing so will impact performance). Also, one mayuse JavaRDD.saveAsTextFile() for debugging as well as creating the desired outputs.

Example 13. Step 6: Validate step5

image033

Step 7: Group JavaPairRDD elements by the key (name)

We implement the reducer operation using groupByKey(). As it is in Example 14, it is much easier to implement the reducer through Spark thanMapReduce/Hadoop. Note that in Spark, in general, reduceByKey() is more efficient than groupByKey(). Here, however, we cannot use reduceByKey().

Example 14. Step 7: Group JavaPairRDD elements

image034

Step 8: Validate step 7

This step, shown in Example 15, validates the previous step by using the collect() function, which gets all values from the groups RDD.

Example 15. Step 8: Validate step 7

image035

The following shows the output of this step. As one can see, the reducer values are not sorted:

y

2,5

1,7

3,1

=====

x

2,9

1,3

3,6

=====

z

1,4

2,8

3,7

4,0

=====

p

2,6

4,7

6,0

7,3

1,9

=====

Step 9: Sort the reducer’s value in memory

This step, shown in Example 16, uses another powerful Spark method, mapVal ues(), to just sort the values generated by reducers. The mapValues() methodenables us to convert (K, V1) into (K, V2), where V2 is a sorted V1. One important note about Spark’s RDD is that it is immutable and cannot be altered/updatedby any means. For example, in this step, to sort our values, we have to copy them into another list first. Immutability applies to the RDD itself and itselements.

Example 16. Step 9: sort the reducer’s values in memory

image036

image037

Step 10: Output final result

The collect() method collects all of the RDD’s elements into a java.util.List object. Then we iterate through the List to get all the final elements (see Example 17).

Example 17. Step 10: Output final result

image038

 

 

Share on FacebookGoogle+Tweet about this on TwitterShare on LinkedIn