package sia.xml

import java.io.{FileInputStream, ObjectInputStream, FileOutputStream, ObjectOutputStream}
import java.nio.file.{Paths, Files}

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.StructType
import sia.utils.SparkConfigUtils._

import scala.collection.mutable

object App {

  private val schemaFile = "dev/hadoopdev/scala-spark-inaction/nflx.file"

  def main (args: Array[String]) {
    //----- For testing turn off logging
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)
    //-----

    val sc = sparkContext(args)
    // IMPORTANT: HiveContext is required for window functions
    val sqlContext = sparkHiveSQLContext(sc)

    val homeDir = System.getenv("HOME")

    run(sqlContext,
      Array(
        s"file://${homeDir}/dev/hadoopdev/scala-spark-inaction/src/test/resources/documents/*.xml"),
      s"file://${homeDir}/dev/hadoopdev/scala-spark-inaction/src/test/resources/DocSchema.xml")
  }

  /**
   * Load xml schema as dataframe structure. Cache it as serialized object in local file.
   * This method is accessible within the same {@code sia.xml} package.
   * @param sqlContext Spark SQL Context, that actually is HiveContext
   * @param schemaPath a path, either HDFS or local file, with xml containing all tags (only then schema could be fully defined)
   * @param cacheSchema if true loaded schema should be cached
   * @return
   */
  protected[xml] def loadSchema(sqlContext: SQLContext, schemaPath: String, cacheSchema: Boolean = false): Option[StructType] = {
    var schema : StructType = null

    println("SCHEMAPATH: " + schemaPath)

    if (!(Option(schemaPath).getOrElse("").isEmpty)) {
      val homeDir = System.getenv("HOME")
      if (cacheSchema && Files.exists(Paths.get(schemaPath))) {
        val ois = new ObjectInputStream(new FileInputStream(s"${homeDir}/${schemaFile}"))
        schema = ois.readObject.asInstanceOf[StructType]
        ois.close
      } else {
        val dfSchema = sqlContext.read
          .format("com.databricks.spark.xml")
          .option("rowTag", "dc:Document")
          .load(schemaPath)
        schema = dfSchema.schema
        if (cacheSchema) {
          val oos = new ObjectOutputStream(new FileOutputStream(s"${homeDir}/dev/hadoopdev/scala-spark-inaction/nflx.file"))
          oos.writeObject(schema)
          oos.close
        }
      }
    }
    Option(schema)
  }

  def run(sqlContext: SQLContext, inputPaths: Array[String], schemaPath: String = null, cacheSchema: Boolean = false): Unit = {
    // Import SQL functions
    import org.apache.spark.sql.functions._

    val homeDir = System.getenv("HOME")

    val schema = loadSchema(sqlContext, schemaPath, cacheSchema)

    val dfr = sqlContext.read
      .format("com.databricks.spark.xml")
      .option("rowTag", "dc:Document")

    if (schema.isDefined) {
      schema.get.printTreeString()
      dfr.schema(schema.get)
    }
    else {
      println("---- ERROR: SCHEMA NOT DEFINED")
    }

    val df = if (inputPaths.length == 1) dfr.load(inputPaths(0)) else dfr.load(inputPaths: _*)

    val df2 = df
      .filter(df.col("dc:Content.dc:Application.dc:Statement.dc:Version2").===(true))
      .filter(df.col("dc:Content.dc:Application.dc:Version1Data").isNull)
      .select(df.col("dc:Header.meta:Identity"))

    df2.show(10)

    df2
      .collect()
      .zipWithIndex
      .foreach {case(r, i) => println(s"DOCUMENT [$i] " + r.getList(0).toArray.mkString(" - "))}

    df2
      .collect()
      .map(r => r.getList(0).toArray)                     // => [[docType,DOC1], [docId,DOC_ID_1], [docTarget,JO_1]]
      .map(r => r.find {case Row(t, id) => t == "docId"}) // => Some([docId,DOC_ID_1])
      .filter(_.isDefined)                                // => filter records containing docId
      .map(_.get)                                         // => get Row value
      .map {case Row(t, id) => id}                        // => get document id
      .foreach(r => println("REC: " + r.toString))
  }
}

results matching ""

    No results matching ""