This section provides a complete MapReduce implementation of the secondary sort problem using the Hadoop framework.
The input will be a set of files, where each record (line) will have the following format:
2012, 01, 01, 35 2011, 12, 23, -4
The expected output will have the following format:
<year><-><month>: <temperature1><,><temperature2><,> … where temperature1 <= temperature2 <= …
2012-01: 5, 10, 35, 45, …
2001-11: 40, 46, 47, 48, …
2005-08: 38, 50, 52, 70, …
The map() function parses and tokenizes the input and then injects the value (temper ature) into the reducer key, as shown in Example 4.
Example 4. map() for secondary sorting
The reducer’s primary function is to concatenate the values (which are already sorted through the Secondary Sort design pattern) and emit them as output. The reduce() function is given in Example 5.
Example 5. reduce() for secondary sorting
Hadoop implementation classes
The classes shown in Table 1 are used to solve the problem.
Table 1. Classes used in MapReduce/Hadoop solution
How is the value injected into the key? The first comparator (the DateTemperature Pair.compareTo() method) controls the sort order of the keys, while thesecond comparator (the DateTemperatureGroupingComparator.compare() method) controls which keys are grouped together into a single call to thereduce() method. The combination of these two comparators allows one to set up jobs that act like one has defined an order for the values.
The SecondarySortDriver is the driver class, which registers the custom plug-in classes (DateTemperaturePartitioner andDateTemperatureGroupingComparator) with the MapReduce/Hadoop framework. This driver class is presented in Example 6.
Example 6. SecondarySortDriver class
Sample run of Hadoop implementation
Log of sample run
Inspecting the output
How to sort in ascending or descending order
You can easily control the sorting order of the values (ascending or descending) by using the DateTemperaturePair.compareTo() method as follows:
Spark solution in secondary sort
To solve a secondary sorting problem in Spark, we have at least two options:
Read and buffer all of the values for a given key in an Array or List data structure and then do an in-reducer sort on the values. This solution works if one has a small set of values (which will fit in memory) per reducer key.
Use the Spark framework for sorting the reducer values (this option does not require in-reducer sorting of values passed to the reducer). This approach involves “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” This option always scales (because you are not limited by the memory of a commodity server).
Time series as input
To demonstrate secondary sorting, let’s use time series data:
Our expected output is as follows. Note that the values of reducers are grouped by name and sorted by time:
name t1 t2 t3 t4 t5 … x => [3, 9, 6]
y => [7, 5, 1]
z => [4, 8, 7, 0]
p => [9, 6, 7, 0, 3]
Option 1: Secondary sorting in memory
Since Spark has a very powerful and high-level API, I will present the entire solution in a single Java class. The Spark API is built upon the basic abstractionconcept of the RDD (resilient distributed data set). To fully utilize Spark’s API, we have to under‐ stand RDDs. An RDD<T> (i.e., an RDD of type T) object represents an immutable, partitioned collection of elements (of type T) that can be operated on in parallel. The RDD<T> class contains the basic MapReduceoperations available on all RDDs, such as map(), filter(), and persist(), while the JavaPairRDD<K,V> class contains MapReduce operations such as mapToPair(),flatMapToPair(), and groupByKey(). In addition, Spark’s PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as reduce(),groupByKey(), and join()., JavaRDD<T> is a list of objects of type T, and JavaPairRDD<K,V> is a list of objects of type Tuple2<K,V> (where each tuplerepresents a key-value pair).
The Spark-based algorithm is listed next. Although there are 10 steps, most of them are trivial and some are provided for debugging purposes only:
- We import the required Java/Spark classes. The main Java classes for MapReduce are given in the org.apache.spark.api.java package. This packageincludes the following classes and interfaces:
- JavaRDDLike (interface)
- We pass input data as arguments and validate.
- We connect to the Spark master by creating a JavaSparkContext object, which is used to create new RDDs.
- Using the context object (created in step 3), we create an RDD for the input file; the resulting RDD will be a JavaRDD<String>. Each element of this RDDwill be a record of time series data: <name><,><time><,><value>.
- Next we want to create key-value pairs from a JavaRDD<String>, where the key is the name and the value is a pair of (time, value). The resulting RDD willbe a JavaPairRDD<String, Tuple2<Integer, Integer>>.
- To validate step 5, we collect all values from the JavaPairRDD<> and print them.
- We group JavaPairRDD<> elements by the key (name). To accomplish this, we use
the groupByKey() method.
The result will be the RDD:
JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>>
Note that the resulting list (Iterable<Tuple2<Integer, Integer>>) is unsorted. In general, Spark’s reduceByKey() is preferred over groupByKey() forperformance reasons, but here we have no other option than groupByKey() (since reduceByKey() does not allow us to sort the values in place for a given key).
- To validate step 7, we collect all values from the JavaPairRDD<String, Itera ble<Tuple2<Integer, Integer>>> and print them.
- We sort the reducer’s values to get the final output. We accomplish this by writing a custom mapValues() method. We just sort the values (the key remainsthe same).
- To validate the final result, we collect all values from the sorted JavaPairRDD<> and print them.
A solution for option #1 is implemented by a single driver class: SecondarySorting (see Example 7). All steps, 1–10, are listed inside the class definition, which will be presented in the following sections. Typically, a Spark application consists of a driver program that runs the user’s main() function and executes various parallel operations on a cluster. Parallel operations will be achieved through the extensive use of RDDs.
Example 7. SecondarySort class overall structure
Step 1: import required classes
As shown in Example 8, the main Spark package for the Java API is org.apache.spark.api.java, which includes the JavaRDD, JavaPairRDD, andJavaS parkContext classes. JavaSparkContext is a factory class for creating new RDDs (such as JavaRDD and JavaPairRDD objects).
Example 8. Step 1: Import required classes
Step 2: Read input parameters
This step, demonstrated in Example 9, reads the HDFS input file (Spark may read data from HDFS and other persistent stores, such as a Linux filesystem), which might look like /dir1/dir2/myfile.txt.
Example 9. Step 2: Read input parameters
Step 3: Connect to the Spark master
To work with RDDs, first you need to create a JavaSparkContext object (as shown in Example 10), which is a factory for creating JavaRDD and JavaPairRDDobjects. It is also possible to create a JavaSparkContext object by injecting a SparkConf object into the JavaSparkContext’s class constructor. This approach is useful when you read your cluster configurations from an XML file. In a nutshell, the JavaSparkContext object has the following responsibilities:
- Initializes the application driver.
- Registers the application driver to the cluster manager. (If you are using the Spark cluster, then this will be the Spark master; if you are using YARN, then it will be YARN’s resource manager.)
- Obtains a list of executors for executing your application driver.
Example 10. Step 3: Connect to the Spark master
Step 4: Use the JavaSparkContext to create a JavaRDD
This step, illustrated in Example 11, reads an HDFS file and creates a JavaRDD<String> (which represents a set of records where each record is a String object). By definition, Spark’s RDDs are immutable (i.e., they cannot be altered or modified). Note that Spark’s RDDs are the basic abstraction for parallel execution.Note also that one may use textFile() to read HDFS or non-HDFS files.
Example 11. Step 4: Create JavaRDD
Step 5: Create key-value pairs from the JavaRDD
This step, shown in Example 12, implements a mapper. Each record (from the JavaRDD<String> and consisting of <name><,><time><,><value>) is converted to a key-value pair, where the key is a name and the value is a Tuple2(time, value).
Example 12. Step 5: Create key-value pairs from JavaRDD
Step 6: Validate step 5
To debug and validate the steps in Spark (as shown in Example 13), one may use JavaRDD.collect() and JavaPairRDD.collect(). Note that collect() is used fordebugging and educational purposes (but avoid using collect() for debugging purposes in production clusters; doing so will impact performance). Also, one mayuse JavaRDD.saveAsTextFile() for debugging as well as creating the desired outputs.
Example 13. Step 6: Validate step5
Step 7: Group JavaPairRDD elements by the key (name)
We implement the reducer operation using groupByKey(). As it is in Example 14, it is much easier to implement the reducer through Spark thanMapReduce/Hadoop. Note that in Spark, in general, reduceByKey() is more efficient than groupByKey(). Here, however, we cannot use reduceByKey().
Example 14. Step 7: Group JavaPairRDD elements
Step 8: Validate step 7
This step, shown in Example 15, validates the previous step by using the collect() function, which gets all values from the groups RDD.
Example 15. Step 8: Validate step 7
The following shows the output of this step. As one can see, the reducer values are not sorted:
Step 9: Sort the reducer’s value in memory
This step, shown in Example 16, uses another powerful Spark method, mapVal ues(), to just sort the values generated by reducers. The mapValues() methodenables us to convert (K, V1) into (K, V2), where V2 is a sorted V1. One important note about Spark’s RDD is that it is immutable and cannot be altered/updatedby any means. For example, in this step, to sort our values, we have to copy them into another list first. Immutability applies to the RDD itself and itselements.
Example 16. Step 9: sort the reducer’s values in memory
Step 10: Output final result
The collect() method collects all of the RDD’s elements into a java.util.List object. Then we iterate through the List to get all the final elements (see Example 17).
Example 17. Step 10: Output final result