Improve performance of Files.walk on the JVM#3383
Conversation
djspiewak
left a comment
There was a problem hiding this comment.
I'm assuming the reason the overhead begins to converge in larger traversals is because there's a single big eval.
| bldr += Path.fromNioPath(path) | ||
| size += 1 | ||
| if (size >= limit) { | ||
| val result = dispatcher.unsafeRunSync(channel.send(Chunk.from(bldr.result()))) |
There was a problem hiding this comment.
I really wish there were a way to suspend the visitation and continue it later. That would allow us to avoid the unsafeRunSync here and use unsafeRunAndForget instead, likely bouncing out of the interruptible once every n enqueues and passing through a Stream#append in order to preserve backpressure.
Is walkFileTree meaningfully faster than just doing the traversal by hand?
There was a problem hiding this comment.
Even eagerly collecting everything is only 5% faster than the channel based solution (using the 4096 limit):
def walkEager(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = {
val doWalk = Sync[F].interruptibleMany {
val bldr = Vector.newBuilder[Path]
JFiles.walkFileTree(
start.toNioPath,
if (followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
maxDepth,
new SimpleFileVisitor[JPath] {
private def enqueue(path: JPath): FileVisitResult = {
bldr += Path.fromNioPath(path)
FileVisitResult.CONTINUE
}
override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
enqueue(file)
override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
FileVisitResult.CONTINUE
override def preVisitDirectory(dir: JPath, attrs: JBasicFileAttributes): FileVisitResult =
enqueue(dir)
override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)
Chunk.from(bldr.result())
}
Stream.eval(doWalk).flatMap(Stream.chunk)
}There was a problem hiding this comment.
Wow, that's wild honestly. Have to ponder that. It's nice that we can just be lazy about our thread blocking though, since it simplifies this stuff.
There was a problem hiding this comment.
@djspiewak BTW, there's a bunch of performance hackery in the JDK's file walking that's not (directly) available to us if we implement our own walk. For example, Path can cache file attributes avoiding some filesystem calls.
There was a problem hiding this comment.
Just to close this out, I tried this prototype:
override def walk(
start: Path,
maxDepth: Int,
followLinks: Boolean,
chunkSize: Int
): Stream[F, Path] =
walkJustInTime(start, maxDepth, followLinks, chunkSize)
// if (chunkSize == Int.MaxValue) walkEager(start, maxDepth, followLinks)
// else walkLazy(start, maxDepth, followLinks, chunkSize)
private def walkJustInTime(
start: Path,
maxDepth: Int,
followLinks: Boolean,
chunkSize: Int
): Stream[F, Path] = {
def loop(acc: Vector[Path], toWalk: Vector[Path]): Stream[F, Path] = {
if (toWalk.isEmpty) {
Stream.chunk(Chunk.from(acc))
} else {
val path = toWalk.head
val (toEmit, newAcc) =
if (acc.size + 1 >= chunkSize)
(Chunk.from(acc :+ path), Vector.empty)
else (Chunk.empty, acc :+ path)
val list = Sync[F].interruptibleMany {
val npath = path.toNioPath
if (JFiles.isDirectory(npath)) {
val listing = JFiles.list(npath)
try listing.iterator.asScala.map(Path.fromNioPath).toVector
finally listing.close()
}
else Vector.empty
}
Stream.chunk(toEmit) ++ Stream.eval(list).flatMap(descendants => loop(newAcc, toWalk.drop(1) ++ descendants))
}
}
loop(Vector.empty, Vector(start))
}Using MaxDepth = 7, I got these results:
fs2 took: 16070 ms
fs2 eager took: 13935 ms
nio took: 6356 ms
Whereas the implementation in this PR results in:
fs2 took: 8000 ms
fs2 eager took: 5975 ms
nio took: 6858 ms
There was a problem hiding this comment.
Here's a better prototype that does file attribute reading at the time of directory listing.
asdf private def walkJustInTime(
start: Path,
maxDepth: Int,
followLinks: Boolean,
chunkSize: Int
): Stream[F, Path] = {
def loop(acc: Vector[Path], toWalk: Vector[(Path, JBasicFileAttributes)]): Stream[F, Path] = {
if (toWalk.isEmpty) {
Stream.chunk(Chunk.from(acc))
} else {
val (path, attr) = toWalk.head
val (toEmit, newAcc) =
if (acc.size + 1 >= chunkSize)
(Chunk.from(acc :+ path), Vector.empty)
else (Chunk.empty, acc :+ path)
if (attr.isDirectory) {
val list = Sync[F].interruptibleMany {
val listing = JFiles.list(path.toNioPath)
try listing.iterator.asScala.map(p =>
(Path.fromNioPath(p), JFiles.readAttributes(p, classOf[JBasicFileAttributes]))).toVector
finally listing.close()
}
Stream.chunk(toEmit) ++ Stream.eval(list).flatMap(descendants => loop(newAcc, toWalk.drop(1) ++ descendants))
} else Stream.chunk(toEmit) ++ loop(newAcc, toWalk.drop(1))
}
}
Stream.eval(Sync[F].interruptibleMany {
start -> JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes])
}).flatMap { s => loop(Vector.empty, Vector(s)) }
}Performs better but still doesn't beat the walkFileTree solution:
fs2 took: 10399 ms
fs2 eager took: 8843 ms
nio took: 7202 ms
There was a problem hiding this comment.
Alright, maybe we should switch to a version based on this:
private def walkJustInTime(
start: Path,
maxDepth: Int,
followLinks: Boolean,
chunkSize: Int
): Stream[F, Path] = {
import scala.collection.immutable.Queue
def loop(toWalk0: Queue[(Path, JBasicFileAttributes)]): Stream[F, Path] = {
val partialWalk = Sync[F].interruptibleMany {
var acc = Vector.empty[Path]
var toWalk = toWalk0
while (acc.size < chunkSize && toWalk.nonEmpty) {
val (path, attr) = toWalk.head
toWalk = toWalk.drop(1)
acc = acc :+ path
if (attr.isDirectory) {
val listing = JFiles.list(path.toNioPath)
try {
val descendants = listing.iterator.asScala.map(p =>
(Path.fromNioPath(p), JFiles.readAttributes(p, classOf[JBasicFileAttributes]))).toVector
toWalk = toWalk ++ descendants
}
finally listing.close()
}
}
Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk))
}
Stream.eval(partialWalk).flatten
}
Stream.eval(Sync[F].interruptibleMany {
start -> JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes])
}).flatMap(s => loop(Queue(s)))
}fs2 took: 9312 ms
fs2 eager took: 8538 ms
nio took: 7769 ms
There was a problem hiding this comment.
So basically what we're trying to figure out is whether it's worth eating 9% overhead to avoid blocking a thread which is already getting blocked by filesystem I/O? My guess is that it's not worth it but I shall ponder a bit.
There was a problem hiding this comment.
Pushed a new version:
fs2 took: 8131 ms
fs2 eager took: 5950 ms
nio took: 7346 ms
I'd like to add some tests for symbolic link following & max depth limits (we don't have any now). Then this PR should be good.
|
|
||
| override def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = | ||
| Stream.resource(Dispatcher.sequential[F]).flatMap { dispatcher => | ||
| Stream.eval(Channel.bounded[F, Chunk[Path]](10)).flatMap { channel => |
There was a problem hiding this comment.
Btw @armanbilge one thing that occurs to me is that our fancy new unsafe queue thing isn't going to help very much if someone's using Channel.
|
It appears Opened scala-native/scala-native#3744 for tracking upstream. |
…wing cycles while following links
|
Okay this is ready for final review. Here's how we netted out performance wise: |
| .flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize)) | ||
|
|
||
| protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] = { | ||
| val doWalk = Sync[F].interruptibleMany { |
There was a problem hiding this comment.
Does it really need the Many?
| var acc = Vector.empty[Path] | ||
| var toWalk = toWalk0 | ||
|
|
||
| while (acc.size < options.chunkSize && toWalk.nonEmpty) { |
There was a problem hiding this comment.
May be worth checking Thread.interrupted()
Fixes #3329.
For small walks, the overhead of the fs2/ce machinery dominates. For large walks, fs2's performance is within ~25% or so of the jvm's performance. For example, using @djspiewak's scenario with `MaxDepth = 7', I get: