Problem 8 . Suppose we have the following workers’ information. name age gender
ID: 3712225 • Letter: P
Question
Problem 8. Suppose we have the following workers’ information.
name
age
gender
occupation
Mike
19
Male
Computer Scientist
Paul
26
Male
Computer Scientist
Bob
25
Male
Computer Scientist
Olivia
30
Female
Accountant
Rob
32
Male
Computer Scientist
Susan
36
Female
Computer Scientist
David
35
Male
Accountant
Emma
44
Female
Accountant
Lisa
32
Female
Accountant
The data is stored in a json file “/home/rob/exam2/workers_spark.json”. The data file is uploaded into iCollege.
We want to compute the number of workers above age 20 in each gender and each occupation. That is, we want to get the following table from the above one.
gender
occupation
count
Male
Computer Scientist
3
Female
Computer Scientist
1
Male
Accountant
1
Female
Accountant
3
Note that Mike is 19, which is larger than 20. Therefore, he is filtered out and not counted.
From the above result table, we can see that there are more male workers in Computer Science and more female workers in Accounting. This is what we learnt from the original workers’ information table.
Now please design Python Spark algorithm to implement this function. You are required to use Spark Dataframe APIs. The data file is uploaded into iCollege. You may want to program with the data and debug and make sure that your answers are correct. The last line should show the results in the terminal.
Answer: (Only show the key lines of the source code. Do not need the preparation code for Spark Context and Spark SQL Context)
1:
2:
3:
4:
We can also use Spark SQL to implement the same function. Spark SQL allows you to use the SQL-like sentences like “SELECT * FROM …” to operate on the dataset. In this method, you need to call “createOrReplaceTempView()” and “spark.sql(…)” functions to achieve filter and group by functions. Please provide the source code. The last line should show the results in the terminal.
Answer: (Only show the key lines of the source code. Do not need the preparation code for Spark Context and Spark SQL Context)
1:
2:
3:
4:
5:
6:
If we want to use Pig to achieve the same goal, what are the source code for doing that?
The data is stored in a different json file “/home/rob/exam2/workers_pig.json”. The format in this file is slightly different than that in the previous “workers_spark.json” file because Spark and Pig have different parsers for Json files. The json data file is also uploaded into iCollege (in the folder of “Exam 2”). You may want to program with the data and make sure that your answers have no bugs. The last line of the code should write the results into the folder “PigOutput” on the disk.
Answer: (Please provide the entire source code for Pig)
1:
2:
3:
4:
5:
name
age
gender
occupation
Mike
19
Male
Computer Scientist
Paul
26
Male
Computer Scientist
Bob
25
Male
Computer Scientist
Olivia
30
Female
Accountant
Rob
32
Male
Computer Scientist
Susan
36
Female
Computer Scientist
David
35
Male
Accountant
Emma
44
Female
Accountant
Lisa
32
Female
Accountant
Explanation / Answer
Answer 1 in detail program
import org.apache.calcite.avatica.ColumnMetaData.StructType
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField}
import org.apache.spark.sql.{SparkSession, types}
object wc {
case class workers (name: String,age: Int, gender: String , occupation: String)
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "C://hadoop")
val conf = new SparkConf()
val spark = SparkSession.builder().master("local").appName("wc").config(conf)
.config("hive.metastore.warehouse.dir", "C:\tmp\hive").config("spark.sql.warehouse.dir", "C:\tmp\hive")
.enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val input = spark.read.json("/home/rob/exam2/workers_spark.json").rdd
val schema = types.StructType(Array(StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("gender",StringType,true),
StructField("occupation",StringType,true)
))
val workers = spark.createDataFrame(input,schema)
val workers_filter = workers.filter('age > 20)
val window = workers_filter.groupBy('gender,'occupation).count()
window.show()
// Answer 2
workers.createGlobalTempView("workers")
val sql_out = spark.sqlContext.sql("select count(age),occupation,gender from workers where age > 20 group by gender,occupation")
sql_out.show(0)
}
}
workers = LOAD '/home/rob/exam2/workers_spark.json'
USING JsonLoader('name:chararray, age: Int, gender:chararray, occupation:chararray');
filter_workers = FILTER workers BY age > 20;
group_workers = GROUP filter_workers by (gender,occupation);
group_workers_count = FOREACH group_workers generate FLATTEN(group) as (gender,occupation),count($1)