Conversation
7b88a34 to
4bf96fc
Compare
| ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | ||
| .booleanType() | ||
| .noDefaultValue() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
This change is required:
py4j.protocol.Py4JJavaError: An error occurred while calling o42.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configurable
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.iceberg.hadoop.Util.usesHadoopFileIO(Util.java:122)
at org.apache.iceberg.hadoop.Util.mayHaveBlockLocations(Util.java:92)
at org.apache.iceberg.flink.source.SourceUtil.isLocalityEnabled(SourceUtil.java:43)
at org.apache.iceberg.flink.source.FlinkSource$Builder.buildFormat(FlinkSource.java:260)
at org.apache.iceberg.flink.source.FlinkSource$Builder.build(FlinkSource.java:272)
at org.apache.iceberg.flink.source.IcebergTableSource.createDataStream(IcebergTableSource.java:128)
at org.apache.iceberg.flink.source.IcebergTableSource.access$200(IcebergTableSource.java:55)
at org.apache.iceberg.flink.source.IcebergTableSource$1.produceDataStream(IcebergTableSource.java:209)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:163)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:99)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.java:205)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.java:127)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1138)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 67 more
16681f2 to
dbe6ecb
Compare
dbe6ecb to
b4d3887
Compare
This reverts commit c088745.
857e208 to
bf64429
Compare
|
@stevenzwu @pvary do you have time to get some eyes on this one? |
|
|
||
| private FileIO fileIO(Table table) { | ||
| if (table.io() instanceof HadoopConfigurable) { | ||
| if (HadoopDependency.isHadoopCommonOnClasspath(SerializableTable.class.getClassLoader()) |
There was a problem hiding this comment.
What happens when the hadoop is not on the classpath, but the table is HadoopConfigurable?
There was a problem hiding this comment.
No configuration will be passed through.
| "There should be a hive-site.xml file under the directory %s", | ||
| hiveConfDir); | ||
| newConf.addResource(new Path(hiveConfDir, "hive-site.xml")); | ||
| public static Object clusterHadoopConf() { |
There was a problem hiding this comment.
Is this better in the HadoopUtil?
|
|
||
| org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | ||
| org.apache.hadoop.conf.Configuration hadoopConf = | ||
| (org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); |
There was a problem hiding this comment.
So FlinkSQL will still need hadoop on the classpath?
There was a problem hiding this comment.
agree with Peter that this seems problematic, as the return value can be null
| } | ||
|
|
||
| static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { | ||
| static TableLoader fromHadoopTable(String location, Object hadoopConf) { |
There was a problem hiding this comment.
I hate this part of the change.
We have a public API where type is not defined.
Do we have any better solution for this?
There was a problem hiding this comment.
What do you think of the ParquetConfiguration that Parquet-Java did: apache/parquet-java#1141 ?
There was a problem hiding this comment.
it seems quite a bit of effort to define a IcebergHadoopConfiguration class like parquet-java.
another possible option is to add a new overloaded method with Flink Configuration arg. Flink provides a util method for the conversion.
public class HadoopUtils {
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
Show resolved
Hide resolved
| public SerializableConfiguration(Object hadoopConf) { | ||
| this.hadoopConf = (Configuration) hadoopConf; | ||
| } |
There was a problem hiding this comment.
What's the point here? When there's no Hadoop one classpath then it will blow up no matter what, right?
Additionally explicit casts are just brittle. This question applies to all other such places where Object is passed.
In Flink this is solved in a way that Hadoop specific class usages are protected with isHadoopCommonOnClasspath and that's it, works like charm.
There was a problem hiding this comment.
I am also interested in the answer to Gabor's question.
also wondering if we can get overload ambiguity from the two constructors?
| ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | ||
| .booleanType() | ||
| .noDefaultValue() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
This is a backwards incompatible change. I'm not sure how widely it's used, but we need to bdo some research and be more vocal about this change, if we decide to go ahead with it.
There was a problem hiding this comment.
agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.
would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?
public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);
} else {
return false;
}
}
return false;
}
| return hadoopConf; | ||
| } | ||
|
|
||
| public Configuration getClone() { |
There was a problem hiding this comment.
maybe name this method just as config().
getCone can look like a clone of this SeriazableConfiguration class.
| public SerializableConfiguration(Object hadoopConf) { | ||
| this.hadoopConf = (Configuration) hadoopConf; | ||
| } |
There was a problem hiding this comment.
I am also interested in the answer to Gabor's question.
also wondering if we can get overload ambiguity from the two constructors?
|
|
||
| org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | ||
| org.apache.hadoop.conf.Configuration hadoopConf = | ||
| (org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); |
There was a problem hiding this comment.
agree with Peter that this seems problematic, as the return value can be null
| ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | ||
| .booleanType() | ||
| .noDefaultValue() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.
would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?
public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);
} else {
return false;
}
}
return false;
}
| } | ||
|
|
||
| static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { | ||
| static TableLoader fromHadoopTable(String location, Object hadoopConf) { |
There was a problem hiding this comment.
it seems quite a bit of effort to define a IcebergHadoopConfiguration class like parquet-java.
another possible option is to add a new overloaded method with Flink Configuration arg. Flink provides a util method for the conversion.
public class HadoopUtils {
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
|
@Fokko I wonder why this is closed. I face this issue when submitting jobs to AWS managed Flink. |
|
Same, for some reason Flink requires Hadoop and yet does not come with it? We're using the Kubernetes Operator and would like to avoid customizing the image to fix this bug. |
|
What was the last status for this PR, as in, how much work is left before it can be reviewed? |
Allow Flink to run without Hadoop
This PR aims to remove Hadoop's
Configurationclass from the main code path, so we can also run Flink without having the Hadoop JARs on the Java Classpath.Testing
Testing is still pending. This PR focusses on read operations. For write operations, upstream changes need to be done to Parquet-MR. With the main focus on the ParquetWriter class: https://github.com/apache/parquet-mr/blob/4bf606905924896403d25cd6287399cfe7050ce9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java#L25
Resolves #7332
Resolves #3117