-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
因为公司需求,要用到redis插件,然后我发现可以写数据到redis里面,但是读取数据一直失败。按照官方的look up里面的flinksql文档操作,也不行,不知道哪里有问题。 一直报错
Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing.
Missing required options are:
mode
type
即使加上mode
type 虽然不报错,但是运行也失败!
我的代码在这里,我的emqx到mysql已经实现了。这点没问题,我只需要把emqx的数据与redis的数据一起混合计算,存储到Mysql里面而已。
-- 源表
CREATE TABLE source1
( id int,
head_code varchar,
cww float,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'emqx-x'
,'broker' = 'tcp://ip:1883'
,'topic' = 'amq/topic'
,'isCleanSession' = 'true'
,'qos' = '0'
,'username' = 'root'
,'password' = '******'
,'format' = 'json'
);
-- 维表
CREATE TABLE side
(
id INT
, name STRING
,length decimal
,weight decimal
,inner_meter decimal
,out_meter decimal
,spec decimal,
primary key (id,name) NOT ENFORCED --
-- , primary key (id,name) NOT ENFORCED -- 这里的pk,并不会作为查询redis的主键,所以作为维表可不写。是通过select中的join条件作为主键
) WITH (
'connector' = 'redis-x' --必填
,'url' = 'ip:6379' --必填,格式ip:port[,ip:port]
,'table-name' = 'db0' --必填
,'password' = '密码' -- 密码 无默认,非必填项
,'redis-type' = '3' -- redis模式(1 单机,2 哨兵, 3 集群),默认:1
,'master-name' = 'lala' -- 主节点名称(哨兵模式下为必填项)
,'database' = '0' -- redis 的数据库地址,默认:0
,'timeout' = '10000' -- 连接超时时间,默认:10000毫秒
,'max.total' = '5' -- 最大连接数 ,默认:8
,'max.idle' = '5' -- 最大空闲连接数,默认:8
,'min.idle' = '0' -- 最小空闲连接数 ,默认:0
,'lookup.cache-type' = 'all' -- 维表缓存类型(NONE、LRU、ALL),默认LRU
,'lookup.cache-period' = '4600000' -- ALL维表每隔多久加载一次数据,默认3600000毫秒
,'lookup.cache.max-rows' = '20000' -- lru维表缓存数据的条数,默认10000条
,'lookup.cache.ttl' = '700000' -- lru维表缓存数据的时间,默认60000毫秒
,'lookup.fetch-size' = '2000' -- ALL维表每次从数据库加载的条数,默认1000条
,'lookup.async-timeout' = '30000' -- lru维表缓访问超时时间,默认10000毫秒,暂时没用到
,'lookup.parallelism' = '3' -- 维表并行度,默认null
);
-- 结果表
CREATE TABLE sink1
(
id int,
head_code varchar,
cww float,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://ip:3306/dlink',
'table-name' = 'product',
'username' = 'root',
'password' = '123456'
);
-- 多源头混合计算
INSERT INTO sink1
SELECT
u.id,
u.head_code,
s.spec as cww
from
source1 u
left join side FOR SYSTEM_TIME AS OF u.PROCTIME AS s
on u.head_code = s.name;
请问啥原因啊!!!!!有人知道嘛
?