-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
questionFurther information is requestedFurther information is requested
Description
Search before asking
-
I had searched in the issues and found no similar question.
-
I had googled my question but i didn't get any help.
-
I had read the documentation: ChunJun doc but it didn't help me.
Description
官网提供的json文件参数选项中,ClickHouse Source有一个选项叫做“CustomSql”,这是一个非必要选项。我发现无论是否使用添加这个选项,都会报错,而且报的错误不一样。
运行环境说明:
操作系统:Ubuntu 16.04.7 LTS
jdk版本:jdk1.8
chunjun版本:1.12.7版本(最新版)
flink版本:1.13.5或1.12.7(更换这两个版本,均报一样的错误,说明跟flink版本无关)
ClickHouse版本:21.8.12.29
hive版本:2.3.9
情景一:json文件中ClickHouse Source一栏中不添加CustomSql选项
json文件如下:
{
"job" : {
"content" : [ {
"reader" : {
"name" : "clickhousereader",
"parameter" : {
"column":[
"packet_name","node_id","gateway_id"
],
"connection" : [ {
"jdbcUrl" : [ "jdbc:clickhouse://47.100.53.xxx:8123/default" ],
"schema": "bp001",
"table" : [ "HOS_4000" ]
}],
"username" : "XXXXX",
"password" : "XXXXX"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://192.168.111.111:10000/chunjun",
"username" : "",
"password" : "",
"fileType" : "text",
"writeMode" : "overwrite",
"charsetName" : "UTF-8",
"tablesColumn" : "{\"chunjun.HOS_4000\":[{\"type\":\"STRING\",\"key\":\"packet_name\"},{\"type\":\"STRING\",\"key\":\"node_id\"},{\"type\":\"STRING\",\"key\":\"gateway_id\"}]}",
"defaultFS" : "hdfs://192.168.111.111:9000"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
这种情况下的报错:(空指针)
可以正常提交到flink集群中运行,但是运行了一段时间之后发现报空指针异常。此时去查看hive,发现“HOS_4000”这张hive表确实自动生成了,但是并没有同步到ClickHouse的数据,是个空表。
报错信息:
2022-12-19 09:10:35
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat.executeQuery(JdbcInputFormat.java:680)
at com.dtstack.chunjun.connector.clickhouse.source.ClickhouseInputFormat.openInternal(ClickhouseInputFormat.java:55)
at com.dtstack.chunjun.source.format.BaseRichInputFormat.open(BaseRichInputFormat.java:162)
at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:126)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
情景二:json文件中ClickHouse Source一栏中添加上CustomSql选项
json文件如下:
{
"job" : {
"content" : [ {
"reader" : {
"name" : "clickhousereader",
"parameter" : {
"customSql" : "select packet_name,node_id,gateway_id from `bp001`.`HOS_4000` limit 100",
"column":[
"packet_name","node_id","gateway_id"
],
"connection" : [ {
"jdbcUrl" : [ "jdbc:clickhouse://47.100.53.XXX:8123/default" ],
"schema": "bp001",
"table" : [ "HOS_4000" ]
}],
"username" : "XXXXX",
"password" : "XXXXX"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://192.168.111.111:10000/chunjun",
"username" : "",
"password" : "",
"fileType" : "text",
"writeMode" : "overwrite",
"charsetName" : "UTF-8",
"tablesColumn" : "{\"chunjun.HOS_4000\":[{\"type\":\"STRING\",\"key\":\"packet_name\"},{\"type\":\"STRING\",\"key\":\"node_id\"},{\"type\":\"STRING\",\"key\":\"gateway_id\"}]}",
"defaultFS" : "hdfs://192.168.111.111:9000"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
这种情况下的报错:
一旦添加上CustomSql选项之后,我发现问题变的更严重,任务甚至无法提交到flink Standalone集群中,就直接在本地控制台报错。
报错信息:
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: error to get meta from [bp001.null]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
at com.dtstack.chunjun.client.util.JobGraphUtil.buildJobGraph(JobGraphUtil.java:62)
at com.dtstack.chunjun.client.standalone.StandaloneClusterClientHelper.submit(StandaloneClusterClientHelper.java:61)
at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: com.dtstack.chunjun.throwable.ChunJunRuntimeException: error to get meta from [bp001.null]
at com.dtstack.chunjun.connector.jdbc.util.JdbcUtil.getTableMetaData(JdbcUtil.java:192)
at com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory.getTableMetaData(JdbcSourceFactory.java:192)
at com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory.initColumnInfo(JdbcSourceFactory.java:176)
at com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory.createSource(JdbcSourceFactory.java:111)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:181)
at com.dtstack.chunjun.Main.main(Main.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 7 more
Caused by: ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 62, host: 47.100.53.191, port: 8123; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 90 ('custom') (line 1, col 90): custom) where 1=2
FORMAT TabSeparatedWithNamesAndTypes. Expected UNION (version 21.8.12.29 (official build))
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:60)
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:30)
at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1083)
at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:770)
at ru.yandex.clickhouse.ClickHouseStatementImpl.getLastInputStream(ClickHouseStatementImpl.java:693)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:341)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:326)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:320)
at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:314)
at com.dtstack.chunjun.connector.jdbc.util.JdbcUtil.getTableMetaData(JdbcUtil.java:174)
... 17 more
Caused by: java.lang.Throwable: Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 90 ('custom') (line 1, col 90): custom) where 1=2
FORMAT TabSeparatedWithNamesAndTypes. Expected UNION (version 21.8.12.29 (official build))
at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:55)
... 26 more
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
questionFurther information is requestedFurther information is requested