Data aggregation is one of the roads we use to understand our data diversity. SQL "Selective Query Language" is the easiest way we use to do so. below is how to map our SQL syntax to Pig or Spark.
SQL Structure:
A complete Spark example:
SQL Structure:
- What to retrieve, stating which column we choose to display from my data structure
- SQL: Select student, age From mathClass
- Pig: namedMathClass = foreach mathClass generate (chararray) $0 as student:chararray, (int) $2 as age:int ;
- Spark: namedMathClass = mathClass.map( row => row(0), row(2) )
- Whether this row is to be added in our data-set or not "Condition"
- SQL: where age > 10
- Pig: greater_10 = Filter namedMathClass by age > 10 ;
- Spark: greater_10 = namedMathClass.filter( col => col(1) > 10 )
- How to aggregate, we group similar data together in one bag then apply our aggregate function on this bags
- SQL: Select age, Count(student) From mathClass group by age
- Pig:
- groupAge = Group mathClass by age;
- Iterate_Age = Foreach groupAge generate group as age, COUNT(mathClass) as total;
- Spark: Iterate_Age = mathClass.groupBy("age").agg(count("student").alias("total")
A complete Spark example:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val file = sc.textFile("hdfs://MyServer:9000/path/to/data")
val splitData = file.map(line => line.split(";") ).
map(row => Row(row(0).substring(0,10), row(1),row(2),row(3)))
// column names
val schemaString = "day user Err_Code Err_MSG"
// map each column to a data type
val fields = schemaString.split(" ").
map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val dataDF = spark.createDataFrame(splitData, schema)
val aggregated = dataDF.groupBy("day","Err_Code").
agg(count("user").alias("user"),countDistinct("user").alias("unique"),
count(when($"Err_MSG"==="SUCCESS",true) ).alias("Condtion"))
val result = aggregated.rdd
result.saveAsTextFile("hdfs://MyServer:9000/path/to/saveAt")
Comments
Post a Comment