11import yaml
2- from typing import Any , List , Dict , Union
2+ from typing import Any , List , Dict
33
44
55class ConfigValidator :
6- REQUIRED_KEYS : Dict [str , List [Any ]] = {
6+ REQUIRED_KEYS = {
7+ "root" : ["name" , "config" , "conditions" , "expectation" ],
78 "config" : ["pipeline.id" , "config.string" ],
8- "expectation" : ["status" , "symptom" , {"diagnosis" : ["cause" ]},
9- {"impacts" : ["description" , "impact_areas" ], "details" : ["run_state" ]}]
9+ "conditions" : ["full_start_required" ],
10+ "expectation" : ["status" , "symptom" , "indicators" ],
11+ "indicators" : ["pipelines" ],
12+ "pipelines" : ["status" , "symptom" , "indicators" ],
13+ "DYNAMIC" : ["status" , "symptom" , "diagnosis" , "impacts" , "details" ],
14+ "details" : ["status" ],
15+ "status" : ["state" ]
1016 }
1117
1218 def __init__ (self ):
1319 self .yaml_content = None
1420
15- def __validate_keys (self , yaml_sub_keys : List [Dict [str , Any ]], required_sub_keys : Dict [str , List [Any ]]) -> bool :
16- for required_sub_key in required_sub_keys :
17- if isinstance (required_sub_key , str ):
18- is_key_found = False
19- for yaml_sub_key in yaml_sub_keys :
20- if yaml_sub_key .get (required_sub_key ):
21- is_key_found = True
22- break
23- if not is_key_found :
24- print (f"Required { required_sub_key } key is not found in { yaml_sub_keys } " )
25- return False
26- return True
21+ def __has_valid_keys (self , data : any , key_path : str , repeated : bool ) -> bool :
22+ if isinstance (data , str ) or isinstance (data , bool ): # we reached values
23+ return True
2724
28- def __check_nested_key (self , data : Dict [str , Any ], nested_key : str ) -> bool :
29- keys = nested_key .split ('.' )
30- for key in keys :
31- if key not in data :
32- return False
25+ # we have two indicators section and for the next repeated ones, we go deeper
26+ first_key = next (iter (data ))
27+ data = data [first_key ] if repeated and key_path == "indicators" else data
28+
29+ if isinstance (data , dict ):
30+ # pipeline-id is a DYNAMIC
31+ required = self .REQUIRED_KEYS .get ("DYNAMIC" if repeated and key_path == "indicators" else key_path , [])
32+ repeated = not repeated if key_path == "indicators" else repeated
33+ for key in required :
34+ if key not in data :
35+ print (f"Missing key '{ key } ' in '{ key_path } '" )
36+ return False
37+ else :
38+ dic_keys_result = self .__has_valid_keys (data [key ], key , repeated )
39+ if dic_keys_result is False :
40+ return False
41+ elif isinstance (data , list ):
42+ for item in data :
43+ list_keys_result = self .__has_valid_keys (item , key_path , repeated )
44+ if list_keys_result is False :
45+ return False
3346 return True
3447
3548 def load (self , file_path : str ) -> None :
@@ -48,18 +61,9 @@ def is_valid(self) -> bool:
4861 print (f"YAML content is empty." )
4962 return False
5063
51- if not isinstance (self .yaml_content , Dict ):
64+ if not isinstance (self .yaml_content , dict ):
5265 print (f"YAML structure is not as expected, it should start with a Dict." )
5366 return False
5467
55- required_config_keys = list (self .REQUIRED_KEYS .keys ())
56- for yaml_key in self .yaml_content :
57- if yaml_key == "name" :
58- continue
59- if yaml_key not in required_config_keys :
60- return False
61- if not self .__validate_keys (self .yaml_content .get (yaml_key ), self .REQUIRED_KEYS .get (yaml_key )):
62- return False
63-
64- print (f"YAML config validation succeeded." )
65- return True
68+ result = self .__has_valid_keys (self .yaml_content , "root" , False )
69+ return True if result is True else False
0 commit comments