Skip to content

Commit d26c048

Browse files
committed
Abnormal and normal pipeline termination scenarios added. YAML config validation logic covers dynamic (pipeline-id) keyword.
1 parent 53fa5b8 commit d26c048

10 files changed

Lines changed: 199 additions & 97 deletions

File tree

.buildkite/scripts/health-report-tests/bootstrap.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,30 +70,32 @@ def build_logstash(self):
7070
print(f"Logstash has successfully built.")
7171

7272
def apply_config(self, config: dict) -> None:
73-
with open(os.getcwd() + "/config/pipelines.yml", 'w') as pipelines_file:
73+
with open(os.getcwd() + "/.buildkite/scripts/health-report-tests/config/pipelines.yml", 'w') as pipelines_file:
7474
yaml.dump(config, pipelines_file)
7575

76-
def run_logstash(self) -> subprocess.Popen:
77-
process = subprocess.Popen(["bin/logstash"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
76+
def run_logstash(self, full_start_required: bool) -> subprocess.Popen:
77+
# --config.reload.automatic is to make instance active
78+
# it is helpful when testing crash pipeline cases
79+
config_path = os.getcwd() + "/.buildkite/scripts/health-report-tests/config"
80+
process = subprocess.Popen(["bin/logstash", "--config.reload.automatic", "--path.settings", config_path,
81+
"-w 1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False)
7882
if process.poll() is not None:
7983
print(f"Logstash failed to run, check the the config and logs, then rerun.")
8084
return None
8185

8286
# Read stdout and stderr in real-time
8387
logs = []
8488
for stdout_line in iter(process.stdout.readline, ""):
85-
# print("STDOUT:", stdout_line.strip())
8689
logs.append(stdout_line.strip())
87-
if "Starting pipeline" in stdout_line:
90+
# we don't wait for Logstash fully start as we also test slow pipeline start scenarios
91+
if full_start_required is False and "Starting pipeline" in stdout_line:
8892
break
89-
if "Logstash shut down" in stdout_line:
93+
if full_start_required is True and "Pipeline started" in stdout_line:
94+
break
95+
if "Logstash shut down" in stdout_line or "Logstash stopped" in stdout_line:
9096
print(f"Logstash couldn't spin up.")
9197
print(logs)
9298
return None
9399

94100
print(f"Logstash is running with PID: {process.pid}.")
95101
return process
96-
97-
def stop_logstash(self, process: subprocess.Popen) -> None:
98-
process.terminate()
99-
print(f"Stopping Logstash...")
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Intentionally left blank
Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,48 @@
11
import yaml
2-
from typing import Any, List, Dict, Union
2+
from typing import Any, List, Dict
33

44

55
class ConfigValidator:
6-
REQUIRED_KEYS: Dict[str, List[Any]] = {
6+
REQUIRED_KEYS = {
7+
"root": ["name", "config", "conditions", "expectation"],
78
"config": ["pipeline.id", "config.string"],
8-
"expectation": ["status", "symptom", {"diagnosis": ["cause"]},
9-
{"impacts": ["description", "impact_areas"], "details": ["run_state"]}]
9+
"conditions": ["full_start_required"],
10+
"expectation": ["status", "symptom", "indicators"],
11+
"indicators": ["pipelines"],
12+
"pipelines": ["status", "symptom", "indicators"],
13+
"DYNAMIC": ["status", "symptom", "diagnosis", "impacts", "details"],
14+
"details": ["status"],
15+
"status": ["state"]
1016
}
1117

1218
def __init__(self):
1319
self.yaml_content = None
1420

15-
def __validate_keys(self, yaml_sub_keys: List[Dict[str, Any]], required_sub_keys: Dict[str, List[Any]]) -> bool:
16-
for required_sub_key in required_sub_keys:
17-
if isinstance(required_sub_key, str):
18-
is_key_found = False
19-
for yaml_sub_key in yaml_sub_keys:
20-
if yaml_sub_key.get(required_sub_key):
21-
is_key_found = True
22-
break
23-
if not is_key_found:
24-
print(f"Required {required_sub_key} key is not found in {yaml_sub_keys}")
25-
return False
26-
return True
21+
def __has_valid_keys(self, data: any, key_path: str, repeated: bool) -> bool:
22+
if isinstance(data, str) or isinstance(data, bool): # we reached values
23+
return True
2724

28-
def __check_nested_key(self, data: Dict[str, Any], nested_key: str) -> bool:
29-
keys = nested_key.split('.')
30-
for key in keys:
31-
if key not in data:
32-
return False
25+
# we have two indicators section and for the next repeated ones, we go deeper
26+
first_key = next(iter(data))
27+
data = data[first_key] if repeated and key_path == "indicators" else data
28+
29+
if isinstance(data, dict):
30+
# pipeline-id is a DYNAMIC
31+
required = self.REQUIRED_KEYS.get("DYNAMIC" if repeated and key_path == "indicators" else key_path, [])
32+
repeated = not repeated if key_path == "indicators" else repeated
33+
for key in required:
34+
if key not in data:
35+
print(f"Missing key '{key}' in '{key_path}'")
36+
return False
37+
else:
38+
dic_keys_result = self.__has_valid_keys(data[key], key, repeated)
39+
if dic_keys_result is False:
40+
return False
41+
elif isinstance(data, list):
42+
for item in data:
43+
list_keys_result = self.__has_valid_keys(item, key_path, repeated)
44+
if list_keys_result is False:
45+
return False
3346
return True
3447

3548
def load(self, file_path: str) -> None:
@@ -48,18 +61,9 @@ def is_valid(self) -> bool:
4861
print(f"YAML content is empty.")
4962
return False
5063

51-
if not isinstance(self.yaml_content, Dict):
64+
if not isinstance(self.yaml_content, dict):
5265
print(f"YAML structure is not as expected, it should start with a Dict.")
5366
return False
5467

55-
required_config_keys = list(self.REQUIRED_KEYS.keys())
56-
for yaml_key in self.yaml_content:
57-
if yaml_key == "name":
58-
continue
59-
if yaml_key not in required_config_keys:
60-
return False
61-
if not self.__validate_keys(self.yaml_content.get(yaml_key), self.REQUIRED_KEYS.get(yaml_key)):
62-
return False
63-
64-
print(f"YAML config validation succeeded.")
65-
return True
68+
result = self.__has_valid_keys(self.yaml_content, "root", False)
69+
return True if result is True else False

.buildkite/scripts/health-report-tests/logstash_stats.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

.buildkite/scripts/health-report-tests/main.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
"""
44
import glob
55
import os
6+
import time
7+
import traceback
8+
import yaml
69
from bootstrap import Bootstrap
710
from scenario_executor import ScenarioExecutor
811
from config_validator import ConfigValidator
9-
import yaml
1012

1113

1214
class BootstrapContextManager:
@@ -29,9 +31,9 @@ def __enter__(self):
2931
print(f"logstash-integration-failure_injector successfully installed.")
3032
return self.bootstrap
3133

32-
def __exit__(self, exc_type, exc_value, traceback):
34+
def __exit__(self, exc_type, exc_value, exc_traceback):
3335
if exc_type is not None:
34-
traceback.print_exception(exc_type, exc_value, traceback)
36+
print(traceback.format_exception(exc_type, exc_value, exc_traceback))
3537

3638

3739
def main():
@@ -46,24 +48,39 @@ def main():
4648
for scenario_file in scenario_files:
4749
print(f"Validating {scenario_file} scenario file.")
4850
config_validator.load(scenario_file)
49-
if not config_validator.is_valid():
51+
if config_validator.is_valid() is False:
5052
print(f"{scenario_file} scenario file is not valid.")
5153
return
54+
else:
55+
print(f"Validation succeeded.")
5256

57+
has_failed_scenario = False
5358
for scenario_file in scenario_files:
5459
with open(scenario_file, 'r') as file:
5560
# scenario_content: Dict[str, Any] = None
5661
scenario_content = yaml.safe_load(file)
62+
print(f"Testing `{scenario_content.get('name')}` scenario.")
5763
scenario_name = scenario_content['name']
64+
65+
is_full_start_required = next(sub.get('full_start_required') for sub in
66+
scenario_content.get('conditions') if 'full_start_required' in sub)
5867
config = scenario_content['config']
5968
if config is not None:
6069
bootstrap.apply_config(config)
61-
expectation = scenario_content['expectation']
62-
process = bootstrap.run_logstash()
70+
expectations = scenario_content.get("expectation")
71+
process = bootstrap.run_logstash(is_full_start_required)
6372
if process is not None:
64-
scenario_executor.on(scenario_name, expectation)
73+
try:
74+
scenario_executor.on(scenario_name, expectations)
75+
except Exception as e:
76+
print(e)
77+
has_failed_scenario = True
6578
process.terminate()
66-
break
79+
time.sleep(5) # leave some window to terminate the process
80+
81+
if has_failed_scenario:
82+
# intentionally fail due to visibility
83+
raise Exception("Some of scenarios failed, check the log for details.")
6784

6885

6986
if __name__ == "__main__":
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
requests==2.32.3
2-
deepdiff==8.0.1
32
pyyaml==6.0.2
Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
A class to execute the given scenario for Logstash Health Report integration test
33
"""
4-
from deepdiff import DeepDiff
4+
import time
55
from logstash_health_report import LogstashHealthReport
66

77

@@ -11,33 +11,55 @@ class ScenarioExecutor:
1111
def __init__(self):
1212
pass
1313

14-
def __is_expected(self, scenario_content: list) -> None:
15-
logstash_health = self.logstash_health_report_api.get()
16-
print(f"Logstash health report: {logstash_health}")
14+
def __has_intersection(self, expects, results):
15+
# we expect expects to be existing in results
16+
for expect in expects:
17+
for result in results:
18+
if not all(key in result and result[key] == value for key, value in expect.items()):
19+
return False
20+
return True
1721

18-
differences = []
19-
for index, item in enumerate(scenario_content):
20-
if "expectation" in item:
21-
key = f"Item {index + 1}"
22-
stat_value = logstash_health.get(key, {}).get("expectation")
22+
def __get_difference(self, differences: list, expectations: dict, reports: dict) -> dict:
23+
for key in expectations.keys():
24+
if key == "help_url": # help_url URL value may change
25+
continue
2326

24-
if stat_value:
25-
diff = DeepDiff(item["expectation"], stat_value, ignore_order=True).to_dict()
26-
if diff:
27-
differences.append({key: diff})
28-
else:
29-
print(f"Stats do not contain an 'expectation' entry for {key}")
27+
if type(expectations.get(key)) != type(reports.get(key)):
28+
differences.append(f"Scenario expectation and Health API report structure differs for {key}.")
29+
return differences
3030

31+
if isinstance(expectations.get(key), str):
32+
if expectations.get(key) != reports.get(key):
33+
differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}})
34+
continue
35+
elif isinstance(expectations.get(key), dict):
36+
self.__get_difference(differences, expectations.get(key), reports.get(key))
37+
elif isinstance(expectations.get(key), list):
38+
if not self.__has_intersection(expectations.get(key), reports.get(key)):
39+
differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}})
40+
return differences
41+
42+
def __is_expected(self, expectations: dict) -> None:
43+
reports = self.logstash_health_report_api.get()
44+
differences = self.__get_difference([], expectations, reports)
3145
if differences:
3246
print("Differences found in 'expectation' section between YAML content and stats:")
3347
for diff in differences:
34-
print(diff)
48+
print(f"Difference: {diff}")
3549
return False
3650
else:
37-
print("YAML 'expectation' section matches the stats.")
3851
return True
3952

40-
def on(self, scenario_name: str, scenario_content: list) -> None:
41-
print(f"Testing the scenario: {scenario_content}")
42-
if self.__is_expected(scenario_content) is False:
53+
def on(self, scenario_name: str, expectations: dict) -> None:
54+
# retriable check the expectations
55+
attempts = 5
56+
while self.__is_expected(expectations) is False:
57+
attempts = attempts - 1
58+
if attempts == 0:
59+
break
60+
time.sleep(1)
61+
62+
if attempts == 0:
4363
raise Exception(f"{scenario_name} failed.")
64+
else:
65+
print(f"Scenario `{scenario_name}` expectaion meets the health report stats.")
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: "Abnormally terminated pipeline"
2+
config:
3+
- pipeline.id: abnormally-terminated-pp
4+
config.string: |
5+
input { heartbeat { interval => 1 } }
6+
filter { failure_injector { crash_at => filter } }
7+
output { stdout {} }
8+
pipeline.workers: 1
9+
pipeline.batch.size: 1
10+
conditions:
11+
- full_start_required: true
12+
expectation:
13+
status: "red"
14+
symptom: "1 indicator is unhealthy (`pipelines`)"
15+
indicators:
16+
pipelines:
17+
status: "red"
18+
symptom: "1 indicator is unhealthy (`abnormally-terminated-pp`)"
19+
indicators:
20+
abnormally-terminated-pp:
21+
status: "red"
22+
symptom: "The pipeline is unhealthy; 1 area is impacted and 1 diagnosis is available"
23+
diagnosis:
24+
- cause: "pipeline is not running, likely because it has encountered an error"
25+
- action: "view logs to determine the cause of abnormal pipeline shutdown"
26+
impacts:
27+
- description: "the pipeline is not currently processing"
28+
- impact_areas: ["pipeline_execution"]
29+
details:
30+
status:
31+
state: "TERMINATED"
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
name: "Successfully terminated pipeline"
2+
config:
3+
- pipeline.id: normally-terminated-pp
4+
config.string: |
5+
input { generator { count => 1 } }
6+
output { stdout {} }
7+
pipeline.workers: 1
8+
pipeline.batch.size: 1
9+
conditions:
10+
- full_start_required: true
11+
expectation:
12+
status: "yellow"
13+
symptom: "1 indicator is concerning (`pipelines`)"
14+
indicators:
15+
pipelines:
16+
status: "yellow"
17+
symptom: "1 indicator is concerning (`normally-terminated-pp`)"
18+
indicators:
19+
normally-terminated-pp:
20+
status: "yellow"
21+
symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available"
22+
diagnosis:
23+
- cause: "pipeline has finished running because its inputs have been closed and events have been processed"
24+
- action: "if you expect this pipeline to run indefinitely, you will need to configure its inputs to continue receiving or fetching events"
25+
impacts:
26+
- impact_areas: ["pipeline_execution"]
27+
details:
28+
status:
29+
state: "FINISHED"

0 commit comments

Comments
 (0)