Skip to content

JSON script mysqlcdc 到 pg数据库 报错:类型无法转换 #1463

@Iamgreat0

Description

@Iamgreat0

Search before asking

  • I had searched in the issues and found no similar question.

  • I had googled my question but i didn't get any help.

  • I had read the documentation: ChunJun doc but it didn't help me.

Description

JSON脚本:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"host": "172.2.0.22",
"port": 30001,
"serverId": 1,
"databaseList": [
"dps"
],
"tableList": [
"dps.qixin_source"
],
"username": "root",
"password": "wayz@1234",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "gender",
"type": "int"
},
{
"name": "data",
"type": "string"
},
{
"name": "create_time",
"type": "datetime"
},
{
"name": "update_time",
"type": "datetime"
}
],
"writeMode": "update",
"updateKey": ["id"]
},
"table": {
"tableName": "qixin_source"
},
"name": "mysqlcdcreader"
},
"writer": {
"parameter": {
"username": "pdmap",
"password": "pdmap@123",
"connection": [
{
"jdbcUrl": "jdbc:postgresql://114.67.106.133:5433/test?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&failOverReadOnly=false",
"table": [
"qixin_sink"
],
"schema": "public"

						}
					],
					"column": [
                                                  {
                                                            "name": "id",
                                                            "type": "int8"
                                                    },
                                                    {
                                                            "name": "name",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "gender",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "data",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "create_time",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "update_time",
                                                            "type": "string"
                                                    }
                                                   ],
					"mode": "insert"
				},
				"table": {
					"tableName": "qixin_sink"
				},
				"name": "postgresqlwriter"
			},
			"transformer": {
				"transformSql": "select id, if(name IS NULL,'',name) as name, if(gender IS NULL,'',cast(gender as CHAR)) as gender, if(data IS NULL,'',data) as data, if(create_time IS NULL,'',cast(create_time as CHAR)) as create_time, if(update_time IS NULL,'',cast(update_time as CHAR)) as update_time from qixin_source"
			}
		}
	],
	"setting": {
		"errorLimit": {
			"record": 100
		},
		"speed": {
			"bytes": 0,
			"channel": 1,
			"readerChannel": 1,
			"writerChannel": 1
		}
	}
}

}

提交任务命令:sh bin/chunjun-local.sh -job json/qixin/mysql_pg_cdc.json

报错:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811)
at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:174)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:224)
at com.dtstack.chunjun.Main.main(Main.java:117)
at com.dtstack.chunjun.client.local.LocalClusterClientHelper.submit(LocalClusterClientHelper.java:35)
at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.connector.jdbc.converter.JdbcRowConverter cannot be cast to com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter
at com.dtstack.chunjun.connector.postgresql.sink.PostgresOutputFormat.openInternal(PostgresOutputFormat.java:92)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.open(BaseRichOutputFormat.java:262)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:95)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:433)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)

这问题已经困扰了我一个星期了。谢谢,大佬们!!!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions