Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

Problem 8 . Suppose we have the following workers’ information. name age gender

ID: 3712225 • Letter: P

Question

Problem 8. Suppose we have the following workers’ information.

name

age

gender

occupation

Mike

19

Male

Computer Scientist

Paul

26

Male

Computer Scientist

Bob

25

Male

Computer Scientist

Olivia

30

Female

Accountant

Rob

32

Male

Computer Scientist

Susan

36

Female

Computer Scientist

David

35

Male

Accountant

Emma

44

Female

Accountant

Lisa

32

Female

Accountant

The data is stored in a json file “/home/rob/exam2/workers_spark.json”. The data file is uploaded into iCollege.

We want to compute the number of workers above age 20 in each gender and each occupation. That is, we want to get the following table from the above one.

gender

occupation

count

Male

Computer Scientist

3

Female

Computer Scientist

1

Male

Accountant

1

Female

Accountant

3

Note that Mike is 19, which is larger than 20. Therefore, he is filtered out and not counted.

From the above result table, we can see that there are more male workers in Computer Science and more female workers in Accounting. This is what we learnt from the original workers’ information table.

Now please design Python Spark algorithm to implement this function. You are required to use Spark Dataframe APIs. The data file is uploaded into iCollege. You may want to program with the data and debug and make sure that your answers are correct. The last line should show the results in the terminal.

Answer: (Only show the key lines of the source code. Do not need the preparation code for Spark Context and Spark SQL Context)

1:

2:

3:

4:

We can also use Spark SQL to implement the same function. Spark SQL allows you to use the SQL-like sentences like “SELECT * FROM …” to operate on the dataset. In this method, you need to call “createOrReplaceTempView()” and “spark.sql(…)” functions to achieve filter and group by functions. Please provide the source code. The last line should show the results in the terminal.

Answer: (Only show the key lines of the source code. Do not need the preparation code for Spark Context and Spark SQL Context)

1:

2:

3:

4:

5:

6:

If we want to use Pig to achieve the same goal, what are the source code for doing that?

The data is stored in a different json file “/home/rob/exam2/workers_pig.json”. The format in this file is slightly different than that in the previous “workers_spark.json” file because Spark and Pig have different parsers for Json files. The json data file is also uploaded into iCollege (in the folder of “Exam 2”). You may want to program with the data and make sure that your answers have no bugs. The last line of the code should write the results into the folder “PigOutput” on the disk.

Answer: (Please provide the entire source code for Pig)

1:

2:

3:

4:

5:

name

age

gender

occupation

Mike

19

Male

Computer Scientist

Paul

26

Male

Computer Scientist

Bob

25

Male

Computer Scientist

Olivia

30

Female

Accountant

Rob

32

Male

Computer Scientist

Susan

36

Female

Computer Scientist

David

35

Male

Accountant

Emma

44

Female

Accountant

Lisa

32

Female

Accountant

Explanation / Answer

Answer 1 in detail program

import org.apache.calcite.avatica.ColumnMetaData.StructType
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField}
import org.apache.spark.sql.{SparkSession, types}

object wc {

case class workers (name: String,age: Int, gender: String , occupation: String)
def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "C://hadoop")
val conf = new SparkConf()
val spark = SparkSession.builder().master("local").appName("wc").config(conf)
.config("hive.metastore.warehouse.dir", "C:\tmp\hive").config("spark.sql.warehouse.dir", "C:\tmp\hive")
.enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val input = spark.read.json("/home/rob/exam2/workers_spark.json").rdd
val schema = types.StructType(Array(StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("gender",StringType,true),
StructField("occupation",StringType,true)
))
val workers = spark.createDataFrame(input,schema)
val workers_filter = workers.filter('age > 20)
val window = workers_filter.groupBy('gender,'occupation).count()
window.show()

// Answer 2
workers.createGlobalTempView("workers")
val sql_out = spark.sqlContext.sql("select count(age),occupation,gender from workers where age > 20 group by gender,occupation")
sql_out.show(0)
}
}

workers = LOAD '/home/rob/exam2/workers_spark.json'

USING JsonLoader('name:chararray, age: Int, gender:chararray, occupation:chararray');

filter_workers = FILTER workers BY age > 20;

group_workers = GROUP filter_workers by (gender,occupation);

group_workers_count = FOREACH group_workers generate FLATTEN(group) as (gender,occupation),count($1)