-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
当在LocalTest里运行 stream_redis.sql 时报错
stream_redis.sql配置就是master分支下自带配置:
CREATE TABLE source
(
id INT
, name STRING
, money decimal
, dateone timestamp
, age bigint
, datethree timestamp
, datesix timestamp(6)
, datenigth timestamp(9)
, dtdate date
, dttime time
, afloat float
, adouble double
, aboolean BOOLEAN
, abigint BIGINT
, atinyint TINYINT
, avarchar varchar
, asmallint SMALLINT
) WITH (
'connector' = 'stream-x'
,'number-of-rows' = '10'
);
CREATE TABLE sink
(
id INT
, name STRING
, money decimal
, dateone timestamp
, age bigint
, datethree timestamp
, datesix timestamp(6)
, datenigth timestamp(9)
, dtdate date
, dttime time
, afloat float
, adouble double
, aboolean BOOLEAN
, abigint BIGINT
, atinyint TINYINT
, avarchar varchar
, asmallint SMALLINT
, primary key (id) NOT ENFORCED -- redis必须要填写,存入redis的结构是hash结构,key=tableName_primaryKey1_primaryKey2
) WITH (
'connector' = 'redis-x' --必填
,'url' = 'localhost:6379' --必填,格式ip:port[,ip:port]
,'table-name' = 'cx' --必填
,'password' = '123456' -- 密码 无默认,非必填项
,'redis-type' = '1' -- redis模式(1 单机,2 哨兵, 3 集群),默认:1
,'master-name' = 'lala' -- 主节点名称(哨兵模式下为必填项)
,'database' = '0' -- redis 的数据库地址,默认:0
,'timeout' = '10000' -- 连接超时时间,默认:10000毫秒
,'max.total' = '5' -- 最大连接数 ,默认:8
,'max.idle' = '5' -- 最大空闲连接数,默认:8
,'min.idle' = '0' -- 最小空闲连接数 ,默认:0
-- ,'keyExpiredTime' = '1000' -- redis sink的key的过期时间。默认是0(永不过期),单位是s。默认:0
,'sink.parallelism' = '3' -- sink并行度
);
INSERT INTO sink
SELECT id
, max(name) as name
, max(money) as money
, max(dateone) as dateone
, max(age) as age
, max(datethree) as datethree
, max(datesix) as datesix
, max(datenigth) as datenigth
, max(dtdate) as dtdate
, max(dttime) as dttime
, max(afloat) as afloat
, max(adouble) as adouble
, max(aboolean) as aboolean
, max(abigint) as abigint
, max(atinyint) as atinyint
, max(avarchar) as avarchar
, max(asmallint) as asmallint
from source
group by id;
Expected behavior
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:64)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.dtstack.flinkx.local.test.LocalTest.main(LocalTest.java:184)
... 5 more
Caused by: com.dtstack.flinkx.throwable.FlinkxRuntimeException: java.lang.IllegalArgumentException:
No type supplied
No mode supplied
at com.dtstack.flinkx.Main.exeSqlJob(Main.java:149)
at com.dtstack.flinkx.Main.main(Main.java:108)
... 10 more
Caused by: java.lang.IllegalArgumentException:
No type supplied
No mode supplied
at com.dtstack.flinkx.connector.redis.sink.RedisOutputFormatBuilder.checkFormat(RedisOutputFormatBuilder.java:81)
at com.dtstack.flinkx.sink.format.BaseRichOutputFormatBuilder.finish(BaseRichOutputFormatBuilder.java:56)
at com.dtstack.flinkx.connector.redis.sink.RedisDynamicTableSink.getSinkRuntimeProvider(RedisDynamicTableSink.java:68)
at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:78)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:109)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:98)
at com.dtstack.flinkx.Main.exeSqlJob(Main.java:141)
... 11 more
master最新代码
f93e1b8
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working