// File in local filesystem with xml records
val homeDir = System.getenv("HOME")
val inputFile = s"file://$homeDir/test/tesources/DocSchema.xml"
// Import SQL functions
import org.apache.spark.sql.functions._
// Load XML DataFrame
val dfSchema = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "dc:Document")
.load(inputFile)
// Print loaded schema
dfSchema.printSchema()
// Access schema
val schema = dfSchema.schema
Spark DataFrame with XML source
Spark DataFrames are very handy in processing structured data sources like json, or xml files. DataFrame automatically recognizes data structure.
XML data structure
DataFrame recognizes XML data structure from xml records provided as its source. If XML schema is richer, so contains tags not visible in provided XML records, be aware of exceptions. When referencing missing tags in filter or select statements, exception throws.
The solution to this problem is to either provide full schema, or at least one xml record with all tags.
Further examples assume following XML Document Structure.
Load XML as DataFrame
Let spark-xml library be on the list of dependencies.
rowTag - defines xml tag that is a record indicator.
Let assume that all tags in xml records contain prefix of xmlns definition. Then instead of usual column access, like 'Content.Person
or $"Content.Person"
, we have to use df.col("dc:Content.dc:Person")
approach.
Having xml schema loaded, we can use it for xml records further analysis.
Let load xml files from the local disk folder (we can load them from HDFS as well).
val inputPath = s"file://${homeDir}/test/resources/documents/*.xml"
val df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag", "dc:Document")
.schema(schema)
.load(inputPath)
df.show(10)
Bear in mind that already loaded schema
is a base for xml records interpretation. Then it is possible to filter records by missing tags, without exceptions.
As an example lets filter xml records with dc:Content.dc:Application.dc:Statement.dc:Version2
set to true
and does not containing dc:Content.dc:Application.dc:Version1Data
tag.
val df2 = df
.filter(df.col("dc:Content.dc:Application.dc:Statement.dc:Version2").===(true))
.filter(df.col("dc:Content.dc:Application.dc:Version1Data").isNull)
.select(df.col("dc:Header.meta:Identity.meta:Value"))
df2.show(10)
Lets format the selected list of identity values. They are of type WrappedArray
, subset of scala’s Set
.
df2
.take(10)
.zipWithIndex
.foreach {case(r, i) => println(s"DOCUMENT [$i] " + r.getList(0).toArray.mkString(" - "))}
One more thing. Our xml documents contains a dc:Header
section with collection of identities. They are of type meta:Identity
. If we would like to extract identity of @type=docId
we have to do more conversions.
df2
.collect()
.map(r => r.getList(0).toArray) // => [[docType,DOC1], [docId,DOC_ID_1], [docTarget,JO_1]]
.map(r => r.find {case Row(t, id) => t == "docId"}) // => Some([docId,DOC_ID_1])
.filter(_.isDefined) // => filter records containing docId
.map(_.get) // => get Row value
.map {case Row(t, id) => id} // => get document id
.foreach(r => println("REC: " + r.toString))
We collect records in the Spark Driver. Every record is a collection of WrappedArray`s. We get the array of identities. We extract identities of type `docId
. Because map returns Option
records, so we filter records containing some data. From the Row we take document id value. That is our final result.