Skip to content

[Bug] postgresql-x => postgresql-x local 模式 字段 NULL 异常 #1155

@yqwoe

Description

@yqwoe

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

create table source
(
    id            BIGINT NOT NULL,
    action_type   STRING,
    action_option STRING, #-> DEFAULT NULL
    target_type   STRING,
    source_type   STRING,
    source_id     BIGINT,
    target_id     BIGINT,
    created_at    TIMESTAMP,
    updated_at    TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
      'connector' = 'postgresql-x',
      'url' = 'jdbc:postgresql://localhost:5432/businesslogic_dev?currentSchema=public',
      'username' = 'yqwoe',
      'password' = 'yqwoe900316',
      'table-name' = 'actions'
      );

create table sink
(
    id            BIGINT NOT NULL,
    action_type   STRING,
    action_option STRING, #-> DEFAULT NULL
    target_type   STRING,
    source_type   STRING,
    source_id     BIGINT,
    target_id     BIGINT,
    created_at    TIMESTAMP,
    updated_at    TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
      'connector' = 'postgresql-x',
      'url' = 'jdbc:postgresql://localhost:5432/businesslogic_dev_bak?currentSchema=public',
      'username' = 'yqwoe',
      'password' = 'yqwoe900316',
      'table-name' = 'actions',
      'sink.all-replace' = 'true'
      );

insert into sink
select id,
       action_type,
       action_option,
       target_type,
       source_type,
       source_id,
       target_id,
       created_at,
       updated_at
from source;

数据库中 action_option 字段默认为NULL,数据也为NULL

执行 local模式报错

What you expected to happen

2022-08-16 15:50:40,315 - 4287 ERROR [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, action_type, action_option, target_type, source_type, source_id, target_id, created_at, updated_at]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, action_type, action_option, target_type, source_type, source_id, target_id, created_at, updated_at]) (1/1)#0] com.dtstack.chunjun.source.DtInputFormatSourceFunction:Exception happened, start to close format
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:85)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
	at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:135)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.element.column.NullColumn cannot be cast to org.apache.flink.table.data.StringData
	at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221)
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
	... 10 more

How to reproduce

image

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions