Spark DataFrame with XML source

Spark DataFrames are very handy in processing structured data sources like json, or xml files. DataFrame automatically recognizes data structure.

XML data structure

DataFrame recognizes XML data structure from xml records provided as its source. If XML schema is richer, so contains tags not visible in provided XML records, be aware of exceptions. When referencing missing tags in filter or select statements, exception throws.

The solution to this problem is to either provide full schema, or at least one xml record with all tags.

Further examples assume following XML Document Structure.

Load XML as DataFrame

Let spark-xml library be on the list of dependencies.

// File in local filesystem with xml records
val homeDir = System.getenv("HOME")
val inputFile = s"file://$homeDir/test/tesources/DocSchema.xml"

// Import SQL functions
import org.apache.spark.sql.functions._

// Load XML DataFrame
val dfSchema = sqlContext.read
  .format("com.databricks.spark.xml")
  .option("rowTag", "dc:Document")
  .load(inputFile)

// Print loaded schema
dfSchema.printSchema()

// Access schema
val schema = dfSchema.schema

rowTag - defines xml tag that is a record indicator.

Let assume that all tags in xml records contain prefix of xmlns definition. Then instead of usual column access, like 'Content.Person or $"Content.Person", we have to use df.col("dc:Content.dc:Person") approach.

Having xml schema loaded, we can use it for xml records further analysis.

Let load xml files from the local disk folder (we can load them from HDFS as well).

val inputPath = s"file://${homeDir}/test/resources/documents/*.xml"

val df = sqlContext.read
  .format("com.databricks.spark.xml")
  .option("rowTag", "dc:Document")
  .schema(schema)
  .load(inputPath)

df.show(10)

Bear in mind that already loaded schema is a base for xml records interpretation. Then it is possible to filter records by missing tags, without exceptions.

As an example lets filter xml records with dc:Content.dc:Application.dc:Statement.dc:Version2 set to true and does not containing dc:Content.dc:Application.dc:Version1Data tag.

val df2 = df
  .filter(df.col("dc:Content.dc:Application.dc:Statement.dc:Version2").===(true))
  .filter(df.col("dc:Content.dc:Application.dc:Version1Data").isNull)
  .select(df.col("dc:Header.meta:Identity.meta:Value"))
df2.show(10)

Lets format the selected list of identity values. They are of type WrappedArray, subset of scala’s Set.

df2
  .take(10)
  .zipWithIndex
  .foreach {case(r, i) => println(s"DOCUMENT [$i] " + r.getList(0).toArray.mkString(" - "))}

One more thing. Our xml documents contains a dc:Header section with collection of identities. They are of type meta:Identity. If we would like to extract identity of @type=docId we have to do more conversions.

df2
  .collect()
  .map(r => r.getList(0).toArray)  // => [[docType,DOC1], [docId,DOC_ID_1], [docTarget,JO_1]]
  .map(r => r.find {case Row(t, id) => t == "docId"}) // => Some([docId,DOC_ID_1])
  .filter(_.isDefined)                                // => filter records containing docId
  .map(_.get)                                         // => get Row value
  .map {case Row(t, id) => id}                        // => get document id
  .foreach(r => println("REC: " + r.toString))

We collect records in the Spark Driver. Every record is a collection of WrappedArray`s. We get the array of identities. We extract identities of type `docId. Because map returns Option records, so we filter records containing some data. From the Row we take document id value. That is our final result.

results matching ""

    No results matching ""