-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
1. bug描述
chunjun1.12 版本,flink-connect-hdfs里 使用source 的sql 是不是不支持分区字段的同步?每次把这个分区字段dt写上就报,数组溢出;
2. 报错日志
2022-05-20 14:40:03.518 [Source: TableSourceScan(table=[[default_catalog, default_database, invite_delivery_page_stay_duration]], fields=[device_id, session_id, f, phone_no, user_code, track_name, dt]) -> Sink: Sink(table=[default_catalog.default_database.loganTopic_APP_3_zww], fields=[device_id, session_id, f, phone_no, user_code, track_name, dt]) (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: TableSourceScan(table=[[default_catalog, default_database, invite_delivery_page_stay_duration]], fields=[device_id, session_id, f, phone_no, user_code, track_name, dt]) -> Sink: Sink(table=[default_catalog.default_database.loganTopic_APP_3_zww], fields=[device_id, session_id, f, phone_no, user_code, track_name, dt]) (1/1)#0 (1210a693d6f0eefb2b1f6071f54fd13f) switched from RUNNING to FAILED.
java.lang.ArrayIndexOutOfBoundsException: -1
at parquet.example.data.simple.SimpleGroup.getFieldRepetitionCount(SimpleGroup.java:113)
at com.dtstack.flinkx.connector.hdfs.source.HdfsParquetInputFormat.nextRecordInternal(HdfsParquetInputFormat.java:256)
at com.dtstack.flinkx.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:192)
at com.dtstack.flinkx.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:67)
at com.dtstack.flinkx.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
3. flink-connect-hdfs 的 source sql配置
CREATE TABLE invite_delivery_page_stay_duration
(
device_id string,
session_id string,
f int,
phone_no string,
user_code string,
track_name string,
dt string
) PARTITIONED BY (dt) WITH (
'connector' = 'hdfs-x'
,'path' = 'hdfs://nameservice1/user/hive/warehouse/dws.db/invite_delivery_page_stay_duration/dt=2022-05-06'
,'properties.hadoop.user.name' = 'hdfs'
,'properties.dfs.ha.namenodes.nameservice1' = 'namenode156,namenode133'
,'properties.fs.defaultFS' = 'hdfs://nameservice1'
,'properties.dfs.namenode.rpc-address.nameservice1.namenode156' = 'QASHV144935.hostname.com:8020'
,'properties.dfs.client.failover.proxy.provider.nameservice1' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
,'properties.dfs.namenode.rpc-address.nameservice1.namenode133' = 'QASHV144936.hostname.com:8020'
,'properties.dfs.nameservices' = 'nameservice1'
,'properties.fs.hdfs.impl.disable.cache' = 'true'
,'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem'
,'default-fs' = 'hdfs://nameservice1'
,'file-type' = 'parquet'
);