-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
create table source
(
id BIGINT NOT NULL,
action_type STRING,
action_option STRING, #-> DEFAULT NULL
target_type STRING,
source_type STRING,
source_id BIGINT,
target_id BIGINT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgresql-x',
'url' = 'jdbc:postgresql://localhost:5432/businesslogic_dev?currentSchema=public',
'username' = 'yqwoe',
'password' = 'yqwoe900316',
'table-name' = 'actions'
);
create table sink
(
id BIGINT NOT NULL,
action_type STRING,
action_option STRING, #-> DEFAULT NULL
target_type STRING,
source_type STRING,
source_id BIGINT,
target_id BIGINT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgresql-x',
'url' = 'jdbc:postgresql://localhost:5432/businesslogic_dev_bak?currentSchema=public',
'username' = 'yqwoe',
'password' = 'yqwoe900316',
'table-name' = 'actions',
'sink.all-replace' = 'true'
);
insert into sink
select id,
action_type,
action_option,
target_type,
source_type,
source_id,
target_id,
created_at,
updated_at
from source;
数据库中 action_option 字段默认为NULL,数据也为NULL
执行 local模式报错
What you expected to happen
2022-08-16 15:50:40,315 - 4287 ERROR [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, action_type, action_option, target_type, source_type, source_id, target_id, created_at, updated_at]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, action_type, action_option, target_type, source_type, source_id, target_id, created_at, updated_at]) (1/1)#0] com.dtstack.chunjun.source.DtInputFormatSourceFunction:Exception happened, start to close format
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:85)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:135)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.element.column.NullColumn cannot be cast to org.apache.flink.table.data.StringData
at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
... 10 more
How to reproduce
Anything else
No response
Version
1.12_release
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working
