Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44576

Session Artifact update breaks XXWithState methods in KVGDS

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 3.5.0
    • 3.5.0
    • Connect
    • None

    Description

      When changing the client test jar from system classloader to session classloader (https://github.com/apache/spark/compare/master...zhenlineo:spark:streaming-artifacts?expand=1), all XXWithState test suite failed with class loader errors: e.g.
      ```
      23/07/25 16:13:14 WARN TaskSetManager: Lost task 1.0 in stage 2.0 (TID 16) (10.8.132.125 executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 170 in stage 2.0 failed 1 times, most recent failure: Lost task 170.0 in stage 2.0 (TID 14) (10.8.132.125 executor driver): java.lang.ClassCastException: class org.apache.spark.sql.streaming.ClickState cannot be cast to class org.apache.spark.sql.streaming.ClickState (org.apache.spark.sql.streaming.ClickState is in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @2c604965; org.apache.spark.sql.streaming.ClickState is in unnamed module of loader java.net.URLClassLoader @57751f4)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
      at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:441)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1514)
      at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
      at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
      at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
      at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
      at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
      at org.apache.spark.scheduler.Task.run(Task.scala:141)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:592)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:595)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)

      Driver stacktrace
      23/07/25 16:13:14 ERROR Utils: Aborting task
      java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0,part=5),dir=file:/private/var/folders/b0/f9jmmrrx5js7xsswxyf58nwr0000gp/T/temporary-02cca002-e189-4e32-afd8-964d6f8d5056/state/0/5]
      at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:148)
      at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.$anonfun$processDataWithPartition$4(FlatMapGroupsWithStateExec.scala:183)
      at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
      at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:611)
      at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:179)
      at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:179)
      at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.timeTakenMs(FlatMapGroupsWithStateExec.scala:374)
      at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExecBase.$anonfun$processDataWithPartition$3(FlatMapGroupsWithStateExec.scala:183)
      at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
      at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
      at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:441)
      at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1514)
      at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
      at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
      at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
      at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
      at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
      at org.apache.spark.scheduler.Task.run(Task.scala:141)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:592)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:595)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
      ```

      Fix the failure and change the client tests to install client jar on the session class loader.

      This test only failed on maven builds as only maven builds needs the test jar to be installed separately.

      Attachments

        Activity

          People

            hvanhovell Herman van Hövell
            zhenli Zhen Li
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: