We have found an issue where records are dropped silently.
By default, the input XML file will be partitioned into several "splits" with size as set in the 'fs.local.block.size' config setting (by default 32MB). We've found that when this split happens in the middle of the starting row tag element, the record is lost and not read into the DataFrame.
The easy workaround is to set 'fs.local.block.size' larger than the file size, which is guaranteed to work, but has a severe performance penalty for large files.
To check this scenario, I've setup a quick and dirty test as follows (inside XmlSuite):
test("fails on file partitioning") {
val xml =
s"""<people>
| <person>
| <age born="1990-02-24">25</age>
| <name>Hyukjin</name>
| </person>
| <person>
| <age born="1985-01-01">30</age>
| <name>Lars</name>
| </person>
| <person>
| <age born="1980-01-01">30</age>
| <name>Lion</name>
| </person>
| </people>
""".stripMargin
val xmlFile = File.createTempFile("ages", ".xml")
val writer = new BufferedWriter(new FileWriter(xmlFile))
writer.write(xml)
writer.close()
println(s"Reading from $xmlFile")
spark.stop() // stop the existing one setup in `beforeAll`
spark = SparkSession.builder()
.master("local[2]")
.appName("XmlSuite")
.config("spark.ui.enabled", false)
.config("spark.hadoop.fs.local.block.size", "0x68")
.getOrCreate()
val results = spark.read
.option("rowTag", "person")
.option("mode", "FAILFAST")
.format("xml")
.load(xmlFile.getAbsolutePath)
results.show(false)
}
Setting block size to partition at "0x68" will break the second <person> element in the middle. Executing this code actually results in zero records being read... different symptom, but same underlying issue.
The problem seems to be inside XmlInputFormat where there are calls to filePosition.getPos. The goal seems to be to know the "index" of the character being read at any given time, so as to know when it should stop reading as it has gone beyond the split boundary
However, it seems that this call does not return the current index of the character being read, but the position in the file of the last read byte. The underlying Stream reader uses buffering (8192 bytes it seems) so the call to filePosition.getPos does not correspond to the index of the byte being read. It's always ahead up to 8192 bytes. So in the example above, the first call to filePosition.getPos will return the last byte in the file, making readUntilStartElement exit early and not reading any data.
In my local testing I added a mutable variable to keep track of the position, incrementing it by one after every reader.read() invocation, and replacing calls to filePosition.getPos with references to this variable. This seems to produce the desired result, both in the simple test above, as well as in the example that brought this issue to light (a 118M file that had two records dropped)
We have found an issue where records are dropped silently.
By default, the input XML file will be partitioned into several "splits" with size as set in the 'fs.local.block.size' config setting (by default 32MB). We've found that when this split happens in the middle of the starting row tag element, the record is lost and not read into the DataFrame.
The easy workaround is to set 'fs.local.block.size' larger than the file size, which is guaranteed to work, but has a severe performance penalty for large files.
To check this scenario, I've setup a quick and dirty test as follows (inside
XmlSuite):Setting block size to partition at "0x68" will break the second
<person>element in the middle. Executing this code actually results in zero records being read... different symptom, but same underlying issue.The problem seems to be inside
XmlInputFormatwhere there are calls tofilePosition.getPos. The goal seems to be to know the "index" of the character being read at any given time, so as to know when it should stop reading as it has gone beyond the split boundaryHowever, it seems that this call does not return the current index of the character being read, but the position in the file of the last read byte. The underlying Stream reader uses buffering (8192 bytes it seems) so the call to
filePosition.getPosdoes not correspond to the index of the byte being read. It's always ahead up to 8192 bytes. So in the example above, the first call tofilePosition.getPoswill return the last byte in the file, makingreadUntilStartElementexit early and not reading any data.In my local testing I added a mutable variable to keep track of the position, incrementing it by one after every
reader.read()invocation, and replacing calls tofilePosition.getPoswith references to this variable. This seems to produce the desired result, both in the simple test above, as well as in the example that brought this issue to light (a 118M file that had two records dropped)