Skip to content

[Bug] [hdfs] 字段分割符不生效问题,比如分割符\t  #1586

@libailin

Description

@libailin

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

当hdfs源数据是\t分割的数据时,用hdfs source 配置了 'field-delimiter' = '\t' 无法正确切分数据。导致误报为脏数据。


2023-03-29 09:55:54,781 ERROR com.dtstack.chunjun.converter.AbstractRowConverter           [] - value [8390	20	test33] convent failed 
2023-03-29 09:55:54,873 INFO  com.dtstack.chunjun.dirty.utils.TablePrintUtil               [] - 
+------------------+----------------------------------+--------------+-------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+
| serialVersionUID |              jobId               |   jobName    |   operatorName    |                          dirtyContent                          |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     errorMessage                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      | fieldName |       createTime        |
+------------------+----------------------------------+--------------+-------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+
|        1         | dd796d1b83eb55c5596f95d4aa124a45 | Flink RI Job | Source: source[1] | {"arity":3,"nullBitsSizeInBytes":8,"offset":0,"sizeInBytes":0} | com.dtstack.chunjun.throwable.ReadRecordException: 
java.lang.NumberFormatException: For input string: "8390	20	test33"
	at com.dtstack.chunjun.connector.hdfs.source.HdfsTextInputFormat.nextRecordInternal(HdfsTextInputFormat.java:148)
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:198)
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:66)
	at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:125)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
Caused by: java.lang.NumberFormatException: For input string: "8390	20	test33"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.parseLong(Long.java:631)
	at com.dtstack.chunjun.converter.AbstractRowConverter.lambda$wrapIntoNullableInternalConverter$66e1293c$1(AbstractRowConverter.java:97)
	at com.dtstack.chunjun.connector.hdfs.converter.HdfsTextRowConverter.toInternal(HdfsTextRowConverter.java:72)
	at com.dtstack.chunjun.connector.hdfs.converter.HdfsTextRowConverter.toInternal(HdfsTextRowConverter.java:47)
	at com.dtstack.chunjun.connector.hdfs.source.HdfsTextInputFormat.nextRecordInternal(HdfsTextInputFormat.java:146)
	... 6 more
 |   null    | 2023-03-29 09:55:54.783 |
+------------------+----------------------------------+--------------+-------------------+----------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+

2023-03-29 09:55:54,869 ERROR com.dtstack.chunjun.source.DtInputFormatSourceFunction       [] - Exception happened, start to close format
com.dtstack.chunjun.throwable.NoRestartException: The dirty consumer shutdown, due to the consumed count exceed the max-consumed [0]
	at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.addConsumed(DirtyDataCollector.java:99) ~[chunjun-core.jar:?]
	at com.dtstack.chunjun.dirty.consumer.DirtyDataCollector.offer(DirtyDataCollector.java:73) ~[chunjun-core.jar:?]
	at com.dtstack.chunjun.dirty.manager.DirtyManager.collect(DirtyManager.java:134) ~[chunjun-core.jar:?]
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:200) ~[chunjun-core.jar:?]
	at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:66) ~[chunjun-core.jar:?]
	at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:125) [chunjun-core.jar:?]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist-1.16.1.jar:1.16.1]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) [flink-dist-1.16.1.jar:1.16.1]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) [flink-dist-1.16.1.jar:1.16.1]
2023-03-29 09:56:14,879 INFO  com.dtstack.chunjun.source.format.BaseRichInputFormat        [] - subtask input close finished
2023-03-29 09:56:14,886 INFO  com.dtstack.chunjun.sink.format.BaseRichOutputFormat         [] - taskNumber[0] close()

What you expected to happen

读取配置的字段分隔符 \t 可以正确切分数据

How to reproduce

hdfs上源文件数据:

8389	10	test32
8388	1	test31

sql模式


CREATE TABLE source
(
    id          bigint
    ,user_id    bigint
    ,`name`     string
) WITH (
      'connector' = 'hdfs-x'
      ,'path' = 'hdfs://namenode.dfs.net:9000/home/hdp-test-test/hive/warehouse/hdp_test_test.db/flinkx_mysql_hive_polling/p_day=20220329'
      ,'default-fs' = 'hdfs://namenode.dfs.net:9000'
      ,'field-delimiter' = '\t'
      ,'encoding' = 'utf-8'
      ,'file-type' = 'text'
      ,'properties.hadoop.user.name' = 'hdp-test-test'
      ,'properties.fs.defaultFS' = 'hdfs://namenode.dfs.net:9000'
      ,'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
      ,'properties.fs.hdfs.impl.disable.cache' = 'true'
      ,'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem'
      );

CREATE TABLE sink
(
    id        bigint
    ,user_id  bigint
    ,`name`     string
) WITH (
    'connector' = 'stream-x'
    ,'print' = 'true'
);

insert into sink select * from source;


Anything else

No response

Version

master

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions