-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Search before asking
-
I had searched in the issues and found no similar question.
-
I had googled my question but i didn't get any help.
-
I had read the documentation: ChunJun doc but it didn't help me.
Description
JSON脚本:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"host": "172.2.0.22",
"port": 30001,
"serverId": 1,
"databaseList": [
"dps"
],
"tableList": [
"dps.qixin_source"
],
"username": "root",
"password": "wayz@1234",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "gender",
"type": "int"
},
{
"name": "data",
"type": "string"
},
{
"name": "create_time",
"type": "datetime"
},
{
"name": "update_time",
"type": "datetime"
}
],
"writeMode": "update",
"updateKey": ["id"]
},
"table": {
"tableName": "qixin_source"
},
"name": "mysqlcdcreader"
},
"writer": {
"parameter": {
"username": "pdmap",
"password": "pdmap@123",
"connection": [
{
"jdbcUrl": "jdbc:postgresql://114.67.106.133:5433/test?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&failOverReadOnly=false",
"table": [
"qixin_sink"
],
"schema": "public"
}
],
"column": [
{
"name": "id",
"type": "int8"
},
{
"name": "name",
"type": "string"
},
{
"name": "gender",
"type": "string"
},
{
"name": "data",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"mode": "insert"
},
"table": {
"tableName": "qixin_sink"
},
"name": "postgresqlwriter"
},
"transformer": {
"transformSql": "select id, if(name IS NULL,'',name) as name, if(gender IS NULL,'',cast(gender as CHAR)) as gender, if(data IS NULL,'',data) as data, if(create_time IS NULL,'',cast(create_time as CHAR)) as create_time, if(update_time IS NULL,'',cast(update_time as CHAR)) as update_time from qixin_source"
}
}
],
"setting": {
"errorLimit": {
"record": 100
},
"speed": {
"bytes": 0,
"channel": 1,
"readerChannel": 1,
"writerChannel": 1
}
}
}
}
提交任务命令:sh bin/chunjun-local.sh -job json/qixin/mysql_pg_cdc.json
报错:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811)
at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:174)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:224)
at com.dtstack.chunjun.Main.main(Main.java:117)
at com.dtstack.chunjun.client.local.LocalClusterClientHelper.submit(LocalClusterClientHelper.java:35)
at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.connector.jdbc.converter.JdbcRowConverter cannot be cast to com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter
at com.dtstack.chunjun.connector.postgresql.sink.PostgresOutputFormat.openInternal(PostgresOutputFormat.java:92)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.open(BaseRichOutputFormat.java:262)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:95)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:433)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
这问题已经困扰了我一个星期了。谢谢,大佬们!!!
Code of Conduct
- I agree to follow this project's Code of Conduct