Skip to content

[Question][Module Name] 使用ClickHouse Source时报两种错误 #1449

@shens16

Description

@shens16

Search before asking

  • I had searched in the issues and found no similar question.

  • I had googled my question but i didn't get any help.

  • I had read the documentation: ChunJun doc but it didn't help me.

Description

官网提供的json文件参数选项中,ClickHouse Source有一个选项叫做“CustomSql”,这是一个非必要选项。我发现无论是否使用添加这个选项,都会报错,而且报的错误不一样。
运行环境说明
操作系统:Ubuntu 16.04.7 LTS
jdk版本:jdk1.8
chunjun版本:1.12.7版本(最新版)
flink版本:1.13.5或1.12.7(更换这两个版本,均报一样的错误,说明跟flink版本无关)
ClickHouse版本:21.8.12.29
hive版本:2.3.9

情景一:json文件中ClickHouse Source一栏中不添加CustomSql选项

json文件如下:

{
  "job" : {
    "content" : [ {
      "reader" : {
        "name" : "clickhousereader",
        "parameter" : {
          "column":[
			"packet_name","node_id","gateway_id"
		  ],
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://47.100.53.xxx:8123/default" ],
            "schema": "bp001",
            "table" : [ "HOS_4000" ]
          }],
          "username" : "XXXXX",
          "password" : "XXXXX"
        }
      },
      "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://192.168.111.111:10000/chunjun",
			"username" : "",
			"password" : "",
            "fileType" : "text",
            "writeMode" : "overwrite",
            "charsetName" : "UTF-8",
            "tablesColumn" : "{\"chunjun.HOS_4000\":[{\"type\":\"STRING\",\"key\":\"packet_name\"},{\"type\":\"STRING\",\"key\":\"node_id\"},{\"type\":\"STRING\",\"key\":\"gateway_id\"}]}",
            "defaultFS" : "hdfs://192.168.111.111:9000"     
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

这种情况下的报错:(空指针)
可以正常提交到flink集群中运行,但是运行了一段时间之后发现报空指针异常。此时去查看hive,发现“HOS_4000”这张hive表确实自动生成了,但是并没有同步到ClickHouse的数据,是个空表。
报错信息:

2022-12-19 09:10:35
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
	at com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat.executeQuery(JdbcInputFormat.java:680)
	at com.dtstack.chunjun.connector.clickhouse.source.ClickhouseInputFormat.openInternal(ClickhouseInputFormat.java:55)
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.open(BaseRichInputFormat.java:162)
	at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:126)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)

情景二:json文件中ClickHouse Source一栏中添加上CustomSql选项

json文件如下:

{
  "job" : {
    "content" : [ {
      "reader" : {
        "name" : "clickhousereader",
        "parameter" : {
		  "customSql" : "select packet_name,node_id,gateway_id from `bp001`.`HOS_4000` limit 100",
          "column":[
			"packet_name","node_id","gateway_id"
		  ],
          "connection" : [ {
            "jdbcUrl" : [ "jdbc:clickhouse://47.100.53.XXX:8123/default" ],
            "schema": "bp001",
            "table" : [ "HOS_4000" ]
          }],
          "username" : "XXXXX",
          "password" : "XXXXX"
        }
      },
      "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://192.168.111.111:10000/chunjun",
			"username" : "",
			"password" : "",
            "fileType" : "text",
            "writeMode" : "overwrite",
            "charsetName" : "UTF-8",
            "tablesColumn" : "{\"chunjun.HOS_4000\":[{\"type\":\"STRING\",\"key\":\"packet_name\"},{\"type\":\"STRING\",\"key\":\"node_id\"},{\"type\":\"STRING\",\"key\":\"gateway_id\"}]}",
            "defaultFS" : "hdfs://192.168.111.111:9000"
            
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

这种情况下的报错:
一旦添加上CustomSql选项之后,我发现问题变的更严重,任务甚至无法提交到flink Standalone集群中,就直接在本地控制台报错。
报错信息:

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: error to get meta from [bp001.null]
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
        at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
        at com.dtstack.chunjun.client.util.JobGraphUtil.buildJobGraph(JobGraphUtil.java:62)
        at com.dtstack.chunjun.client.standalone.StandaloneClusterClientHelper.submit(StandaloneClusterClientHelper.java:61)
        at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: com.dtstack.chunjun.throwable.ChunJunRuntimeException: error to get meta from [bp001.null]
        at com.dtstack.chunjun.connector.jdbc.util.JdbcUtil.getTableMetaData(JdbcUtil.java:192)
        at com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory.getTableMetaData(JdbcSourceFactory.java:192)
        at com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory.initColumnInfo(JdbcSourceFactory.java:176)
        at com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory.createSource(JdbcSourceFactory.java:111)
        at com.dtstack.chunjun.Main.exeSyncJob(Main.java:181)
        at com.dtstack.chunjun.Main.main(Main.java:117)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
        ... 7 more
Caused by: ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 62, host: 47.100.53.191, port: 8123; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 90 ('custom') (line 1, col 90): custom) where 1=2
FORMAT TabSeparatedWithNamesAndTypes. Expected UNION (version 21.8.12.29 (official build))

        at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:60)
        at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:30)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1083)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:770)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.getLastInputStream(ClickHouseStatementImpl.java:693)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:341)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:326)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:320)
        at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:314)
        at com.dtstack.chunjun.connector.jdbc.util.JdbcUtil.getTableMetaData(JdbcUtil.java:174)
        ... 17 more
Caused by: java.lang.Throwable: Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 90 ('custom') (line 1, col 90): custom) where 1=2
FORMAT TabSeparatedWithNamesAndTypes. Expected UNION (version 21.8.12.29 (official build))

        at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:55)
        ... 26 more

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions