Skip to content

Commit 6d0fed9

Browse files
zhenlineohvanhovell
authored andcommitted
[SPARK-43744][CONNECT] Fix class loading problem caused by stub user classes not found on the server classpath
### What changes were proposed in this pull request? This PR introduces a stub class loader for unpacking Scala UDFs in the driver and the executor. When encountering user classes that are not found on the server session classpath, the stub class loader would try to stub the class. This solves the problem that when serializing UDFs, Java serializer might include unnecessary user code e.g. User classes used in the lambda definition signatures in the same class where the UDF is defined. If the user code is actually needed to execute the UDF, we will return an error message to suggest the user to add the missing classes using the `addArtifact` method. ### Why are the changes needed? To enhance the user experience of UDF. This PR should be merged to master and 3.5. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added test both for Scala 2.12 & 2.13 4 tests in SparkSessionE2ESuite still fail to run with maven after the fix because the client test jar is installed on the system classpath (added using --jar at server start), the stub classloader can only stub classes missing from the session classpath (added using `session.addArtifact`). Moving the test jar to the session classpath causes failures in tests for `flatMapGroupsWithState` (SPARK-44576). Finish moving the test jar to session classpath once `flatMapGroupsWithState` test failures are fixed. Closes #42069 from zhenlineo/ref-spark-result. Authored-by: Zhen Li <zhenlineo@users.noreply.github.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
1 parent 3761b7d commit 6d0fed9

19 files changed

Lines changed: 480 additions & 27 deletions

File tree

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class SparkSession private[sql] (
554554
val command = proto.Command.newBuilder().setRegisterFunction(udf).build()
555555
val plan = proto.Plan.newBuilder().setCommand(command).build()
556556

557-
client.execute(plan)
557+
client.execute(plan).asScala.foreach(_ => ())
558558
}
559559

560560
@DeveloperApi

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ sealed abstract class UserDefinedFunction {
9292
/**
9393
* Holder class for a scalar user-defined function and it's input/output encoder(s).
9494
*/
95-
case class ScalarUserDefinedFunction private (
95+
case class ScalarUserDefinedFunction private[sql] (
9696
// SPARK-43198: Eagerly serialize to prevent the UDF from containing a reference to this class.
9797
serializedUdfPacket: Array[Byte],
9898
inputTypes: Seq[proto.DataType],
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.connect.client
18+
19+
// To generate a jar from the source file:
20+
// `scalac StubClassDummyUdf.scala -d udf.jar`
21+
// To remove class A from the jar:
22+
// `jar -xvf udf.jar` -> delete A.class and A$.class
23+
// `jar -cvf udf_noA.jar org/`
24+
class StubClassDummyUdf {
25+
val udf: Int => Int = (x: Int) => x + 1
26+
val dummy = (x: Int) => A(x)
27+
}
28+
29+
case class A(x: Int) { def get: Int = x + 5 }
30+
31+
// The code to generate the udf file
32+
object StubClassDummyUdf {
33+
import java.io.{BufferedOutputStream, File, FileOutputStream}
34+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.PrimitiveIntEncoder
35+
import org.apache.spark.sql.connect.common.UdfPacket
36+
import org.apache.spark.util.Utils
37+
38+
def packDummyUdf(): String = {
39+
val byteArray =
40+
Utils.serialize[UdfPacket](
41+
new UdfPacket(
42+
new StubClassDummyUdf().udf,
43+
Seq(PrimitiveIntEncoder),
44+
PrimitiveIntEncoder
45+
)
46+
)
47+
val file = new File("src/test/resources/udf")
48+
val target = new BufferedOutputStream(new FileOutputStream(file))
49+
try {
50+
target.write(byteArray)
51+
file.getAbsolutePath
52+
} finally {
53+
target.close
54+
}
55+
}
56+
}
1.48 KB
Binary file not shown.
5.21 KB
Binary file not shown.
1.59 KB
Binary file not shown.
5.54 KB
Binary file not shown.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.connect.client
18+
19+
import java.io.File
20+
import java.nio.file.{Files, Paths}
21+
22+
import scala.util.Properties
23+
24+
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
26+
import org.apache.spark.sql.connect.common.ProtoDataTypes
27+
import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
28+
29+
class UDFClassLoadingE2ESuite extends RemoteSparkSession {
30+
31+
private val scalaVersion = Properties.versionNumberString
32+
.split("\\.")
33+
.take(2)
34+
.mkString(".")
35+
36+
// See src/test/resources/StubClassDummyUdf for how the UDFs and jars are created.
37+
private val udfByteArray: Array[Byte] =
38+
Files.readAllBytes(Paths.get(s"src/test/resources/udf$scalaVersion"))
39+
private val udfJar =
40+
new File(s"src/test/resources/udf$scalaVersion.jar").toURI.toURL
41+
42+
private def registerUdf(session: SparkSession): Unit = {
43+
val udf = ScalarUserDefinedFunction(
44+
serializedUdfPacket = udfByteArray,
45+
inputTypes = Seq(ProtoDataTypes.IntegerType),
46+
outputType = ProtoDataTypes.IntegerType,
47+
name = Some("dummyUdf"),
48+
nullable = true,
49+
deterministic = true)
50+
session.registerUdf(udf.toProto)
51+
}
52+
53+
test("update class loader after stubbing: new session") {
54+
// Session1 should stub the missing class, but fail to call methods on it
55+
val session1 = spark.newSession()
56+
57+
assert(
58+
intercept[Exception] {
59+
registerUdf(session1)
60+
}.getMessage.contains(
61+
"java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf"))
62+
63+
// Session2 uses the real class
64+
val session2 = spark.newSession()
65+
session2.addArtifact(udfJar.toURI)
66+
registerUdf(session2)
67+
}
68+
69+
test("update class loader after stubbing: same session") {
70+
// Session should stub the missing class, but fail to call methods on it
71+
val session = spark.newSession()
72+
73+
assert(
74+
intercept[Exception] {
75+
registerUdf(session)
76+
}.getMessage.contains(
77+
"java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf"))
78+
79+
// Session uses the real class
80+
session.addArtifact(udfJar.toURI)
81+
registerUdf(session)
82+
}
83+
}

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object IntegrationTestUtils {
3030

3131
// System properties used for testing and debugging
3232
private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client"
33-
// Enable this flag to print all client debug log + server logs to the console
33+
// Enable this flag to print all server logs to the console
3434
private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
3535

3636
private[sql] lazy val scalaVersion = {

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ object SparkConnectServerUtils {
9696
// To find InMemoryTableCatalog for V2 writer tests
9797
val catalystTestJar =
9898
tryFindJar("sql/catalyst", "spark-catalyst", "spark-catalyst", test = true)
99-
.map(clientTestJar => Seq("--jars", clientTestJar.getCanonicalPath))
99+
.map(clientTestJar => Seq(clientTestJar.getCanonicalPath))
100100
.getOrElse(Seq.empty)
101101

102102
// For UDF maven E2E tests, the server needs the client code to find the UDFs defined in tests.

0 commit comments

Comments
 (0)