Aggregation using collect_set on Spark DataFrame

In this blog post you will learn how to use collect_set on Spark DataFrame and also how to map the data to a domain object.

Introduction

collect_set() : returns distinct values for a particular key specified to the collect_set(field) method

In order to understand collect_set, with practical first let us create a DataFrame from an RDD with 3 columns,

Let us understand the data set before we create an RDD. We have 3 columns “Id”,”Department” and “Name”.

Code to learn collect_set

Follow the steps to learn what is collect_set.

Step 01 : Read the data and create an RDD.

val employeeDataAsRDD = spark.sparkContext.textFile("employee_data.txt");

Step 02 : Create a domain object matching the data type according to the data set.

case class Employee(id:Int,department:String,name:String)

Step 03 : Map the data to the domain object

val employeeRDD = employeeDataAsRDD.map(line=>line.split("\\|")).map(arr=>Employee(arr(0).toInt,arr(1),arr(2)));

Step 04 : Convert RDD to DataFrame

val employeeDF = employeeRDD.toDF

Step 05 : We will perform groupBy “department” field and then use collect_set method for field “name”

var aggByDepartment = employeeDF.groupBy($"department").agg(collect_set($"name").as("Names"))

aggByDepartment.show();

Output

+-----------+--------------------+
| department|               Names|
+-----------+--------------------+
|         PO|   [Hu, Keith, Kirk]|
|Development|       [Barry, Dane]|
|    Testing|   [Salvador, Tanek]|
|    Manager|[Jackson, Leroy, ...|
+-----------+--------------------+

Complete Code

val employeeDataAsRDD = spark.sparkContext.textFile("employee_data.txt");
case class Employee(id:Int,department:String,name:String)

val employeeRDD = employeeDataAsRDD.map(line=>line.split("\\|")).
                                    map(arr=>Employee(arr(0).toInt,arr(1),arr(2)));
val employeeDF = employeeRDD.toDF
var aggByDepartment = employeeDF.groupBy($"department").agg(collect_set($"name").as("Names"))

aggByDepartment.show();

How to remove the square brackets?

var aggByDepartment = employeeDF.groupBy($"department").agg(                                          concat_ws(",",collect_set($"name").as("Names")))

Output

+-----------+------------------------------------------+
| department|concat_ws(,, collect_set(name) AS `Names`)|
+-----------+------------------------------------------+
|         PO|                             Hu,Keith,Kirk|
|Development|                                Barry,Dane|
|    Testing|                            Salvador,Tanek|
|    Manager|                      Jackson,Leroy,Ale...|
+-----------+------------------------------------------+

Download Data set and Code

Reference Documentation


About the course

The Apache Spark and Scala Training Program is designed to empower working professionals to develop relevant competencies and accelerate their career progression in Big Data/Spark technologies through complete Hands-on training.

Spark and Scala Training in Bangalore

big-data-masters-program

Related Post
Components of Spark Following are some important components of Spark Cluster Manager Is used to run the Spark Application in Cluster Mode Application ...

Naveen P.N

12+ years of experience in IT with vast experience in executing complex projects using Java, Micro Services , Big Data and Cloud Platforms. I found NPN Training Pvt Ltd a India based startup to provide high quality training for IT professionals. I have trained more than 3000+ IT professionals and helped them to succeed in their career in different technologies. I am very passionate about Technology and Training. I have spent 12 years at Siemens, Yahoo, Amazon and Cisco, developing and managing technology.