package sia.xml
import java.io.{FileInputStream, ObjectInputStream, FileOutputStream, ObjectOutputStream}
import java.nio.file.{Paths, Files}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.StructType
import sia.utils.SparkConfigUtils._
import scala.collection.mutable
object App {
private val schemaFile = "dev/hadoopdev/scala-spark-inaction/nflx.file"
def main (args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val sc = sparkContext(args)
val sqlContext = sparkHiveSQLContext(sc)
val homeDir = System.getenv("HOME")
run(sqlContext,
Array(
s"file://${homeDir}/dev/hadoopdev/scala-spark-inaction/src/test/resources/documents/*.xml"),
s"file://${homeDir}/dev/hadoopdev/scala-spark-inaction/src/test/resources/DocSchema.xml")
}
protected[xml] def loadSchema(sqlContext: SQLContext, schemaPath: String, cacheSchema: Boolean = false): Option[StructType] = {
var schema : StructType = null
println("SCHEMAPATH: " + schemaPath)
if (!(Option(schemaPath).getOrElse("").isEmpty)) {
val homeDir = System.getenv("HOME")
if (cacheSchema && Files.exists(Paths.get(schemaPath))) {
val ois = new ObjectInputStream(new FileInputStream(s"${homeDir}/${schemaFile}"))
schema = ois.readObject.asInstanceOf[StructType]
ois.close
} else {
val dfSchema = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "dc:Document")
.load(schemaPath)
schema = dfSchema.schema
if (cacheSchema) {
val oos = new ObjectOutputStream(new FileOutputStream(s"${homeDir}/dev/hadoopdev/scala-spark-inaction/nflx.file"))
oos.writeObject(schema)
oos.close
}
}
}
Option(schema)
}
def run(sqlContext: SQLContext, inputPaths: Array[String], schemaPath: String = null, cacheSchema: Boolean = false): Unit = {
import org.apache.spark.sql.functions._
val homeDir = System.getenv("HOME")
val schema = loadSchema(sqlContext, schemaPath, cacheSchema)
val dfr = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "dc:Document")
if (schema.isDefined) {
schema.get.printTreeString()
dfr.schema(schema.get)
}
else {
println("---- ERROR: SCHEMA NOT DEFINED")
}
val df = if (inputPaths.length == 1) dfr.load(inputPaths(0)) else dfr.load(inputPaths: _*)
val df2 = df
.filter(df.col("dc:Content.dc:Application.dc:Statement.dc:Version2").===(true))
.filter(df.col("dc:Content.dc:Application.dc:Version1Data").isNull)
.select(df.col("dc:Header.meta:Identity"))
df2.show(10)
df2
.collect()
.zipWithIndex
.foreach {case(r, i) => println(s"DOCUMENT [$i] " + r.getList(0).toArray.mkString(" - "))}
df2
.collect()
.map(r => r.getList(0).toArray)
.map(r => r.find {case Row(t, id) => t == "docId"})
.filter(_.isDefined)
.map(_.get)
.map {case Row(t, id) => id}
.foreach(r => println("REC: " + r.toString))
}
}