-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
场景 kuduSource to KuduSink
json
{
"job": {
"content": [
{
"reader": {
"name": "kudureader",
"parameter": {
"column": [
{
"name": "REPORT_TIME",
"type": "UNIXTIME_MICROS"
},
{
"name": "VEHICLE_TAG",
"type": "STRING"
},
{
"name": "REV",
"type": "INT32"
},
{
"name": "LONGITUDE",
"type": "DOUBLE"
},
{
"name": "LATITUDE",
"type": "DOUBLE"
},
{
"name": "SPEED",
"type": "DOUBLE"
},
{
"name": "HEADING",
"type": "DOUBLE"
},
{
"name": "TRAIN_ASSIGNMENT",
"type": "STRING"
},
{
"name": "PREDICTABLE",
"type": "INT32"
}
],
"masters": "172.29.207.143:7051,172.29.207.143:7151,172.29.207.143:7251",
"table": "sfmta_kudu",
"readMode": "read_latest",
"workerCount": 2,
"operationTimeout": 30000,
"adminOperationTimeout": 30000,
"queryTimeout": 30000,
"batchSizeBytes": 1048576
}
},
"writer": {
"parameter": {
"column": [
{
"name": "REPORT_TIME",
"type": "UNIXTIME_MICROS"
},
{
"name": "VEHICLE_TAG",
"type": "STRING"
},
{
"name": "REV",
"type": "INT32"
},
{
"name": "LONGITUDE",
"type": "DOUBLE"
},
{
"name": "LATITUDE",
"type": "DOUBLE"
},
{
"name": "SPEED",
"type": "DOUBLE"
},
{
"name": "HEADING",
"type": "DOUBLE"
},
{
"name": "TRAIN_ASSIGNMENT",
"type": "STRING"
},
{
"name": "PREDICTABLE",
"type": "INT32"
}
],
"masters": "localhost:7051",
"table": "sfmta_kudu_one",
"flushMode": "manual_flush",
"writeMode": "append",
"batchSizeBytes": 1048576
},
"name": "kuduwriter"
}
}
],
"setting": {
"restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
},
"errorLimit": {
"record": 100
},
"speed": {
"bytes": 0,
"channel": 1
},
"log": {
"isLogger": false,
"level": "debug",
"path": "",
"pattern": ""
}
}
}
}
命令
bin/start-chunjun -mode local -jobType sync -job ./chunjun-examples/json/kudu/kudu_stream_test.json -chunjunDistDir chunjun-dist
结果
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at com.dtstack.chunjun.connector.kudu.sink.KuduOutputFormat.writeSingleRecordInternal(KuduOutputFormat.java:77)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:466)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:272)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:92)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:117)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.lambda$createExternalConverter$3ab0349a$5(KuduColumnConverter.java:175)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.lambda$wrapIntoNullableExternalConverter$4be2f56a$1(KuduColumnConverter.java:86)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.toExternal(KuduColumnConverter.java:106)
at com.dtstack.chunjun.connector.kudu.converter.KuduColumnConverter.toExternal(KuduColumnConverter.java:52)
at com.dtstack.chunjun.connector.kudu.sink.KuduOutputFormat.writeSingleRecordInternal(KuduOutputFormat.java:72)
... 16 more
', fieldName='null', createTime=2022-06-16 13:55:57.308]
分析
kudu UNIXTIME_MICROS原先设计映射成flink table的bigint类型,但是在进行kudusink的时候,RowData的类型是Timestamp因此,出现类型转换异常