Skip to content

Commit 4f8cd0b

Browse files
committed
Prepare testing
1 parent 218d174 commit 4f8cd0b

2 files changed

Lines changed: 80 additions & 3 deletions

File tree

tests/pytests/common.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -879,9 +879,9 @@ def allShards_getIsRPPaused(env):
879879
results.append(result)
880880
return results
881881

882-
def allShards_setPauseRPResume(env):
882+
def allShards_setPauseRPResume(env, start_shard=1):
883883
results = []
884-
for shardId in range(1, env.shardsCount + 1):
884+
for shardId in range(start_shard, env.shardsCount + 1):
885885
result = env.getConnection(shardId).execute_command(debug_cmd(), 'QUERY_CONTROLLER', 'SET_PAUSE_RP_RESUME')
886886
results.append(result)
887887
return results

tests/pytests/test_query_oom.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from common import *
2+
import threading
23

34
OOM_QUERY_ERROR = "Not enough memory available to execute the query"
45

@@ -18,6 +19,9 @@ def allShards_change_maxmemory_low(env):
1819
res = env.getConnection(shardId).execute_command('config', 'set', 'maxmemory', 1)
1920
env.assertEqual(res, 'OK')
2021

22+
# Helper to call a function and push its return value into a list
23+
def _call_and_store(fn, args, out_list):
24+
out_list.append(fn(*args))
2125

2226
def _common_test_scenario(env):
2327
# Create an index
@@ -30,7 +34,7 @@ def _common_test_scenario(env):
3034
def _common_cluster_test_scenario(env):
3135
conn = getConnectionByEnv(env)
3236
# Create an index
33-
env.expect('FT.CREATE', 'idx', 'SCHEMA', 'name', 'TEXT').ok()
37+
env.expect('FT.CREATE', 'idx', 'SCHEMA', 'name', 'TEXT', 'SORTABLE').ok()
3438

3539
n_docs_per_shard = 100
3640
n_docs = n_docs_per_shard * env.shardsCount
@@ -42,6 +46,17 @@ def _common_cluster_test_scenario(env):
4246

4347
return n_docs
4448

49+
def runDebugQueryCommand_expect_oom(env, query_cmd, debug_params):
50+
# Use the helper function to build the argument list
51+
args = parseDebugQueryCommandArgs(query_cmd, debug_params)
52+
return env.expect(debug_cmd(), *args).error().contains(OOM_QUERY_ERROR)
53+
54+
def runDebugQueryCommandPauseBeforeRPAfterN_expect_oom(env, query_cmd, rp_type, pause_after_n, extra_args=None):
55+
debug_params = ['PAUSE_BEFORE_RP_N', rp_type, pause_after_n]
56+
if extra_args:
57+
debug_params.extend(extra_args)
58+
return runDebugQueryCommand_expect_oom(env, query_cmd, debug_params)
59+
4560
# Test ignore policy
4661
@skip(cluster=True)
4762
def test_query_oom_ignore(env):
@@ -101,6 +116,68 @@ def test_query_oom_cluster_shards_error():
101116
# Change back coord maxmemory to 0
102117
set_unlimited_maxmemory_for_oom(env)
103118

119+
# Verify query fails
120+
env.expect('FT.SEARCH', 'idx', '*', 'SORTBY', 'name', 'ASC').error().contains(OOM_QUERY_ERROR)
121+
# Verify aggregation query fails with sorting so it has to wait for all shards
122+
env.expect('FT.AGGREGATE', 'idx', '*', 'LOAD', 1, '@name', 'SORTBY', 2, '@name', 'ASC').error().contains(OOM_QUERY_ERROR)
123+
124+
# Test OOM error returned from shards (only for fail), enforcing first reply from non-error shard
125+
# @skip(cluster=False)
126+
def test_query_oom_cluster_shards_error_first_reply():
127+
env = Env(shardsCount=3, moduleArgs='WORKERS 1')
128+
129+
# Set OOM policy to fail on all shards
130+
allShards_change_oom_policy(env, 'return')
131+
132+
_ = _common_cluster_test_scenario(env)
133+
134+
# Change maxmemory on all shards to 1
135+
allShards_change_maxmemory_low(env)
136+
# Change back coord maxmemory to 0
137+
set_unlimited_maxmemory_for_oom(env)
138+
139+
##### Use debug command to enforce first reply from non-error shard #####
140+
141+
# We need to call the queries in MT so the paused query won't block the test
142+
query_result = []
143+
# Build threads
144+
query_args = ['FT.AGGREGATE', 'idx', '*', 'LIMIT', 0, 120]
145+
t_query = threading.Thread(
146+
target=_call_and_store,
147+
args=(runDebugQueryCommandPauseBeforeRPAfterN_expect_oom,
148+
(env, query_args, 'Index', 0, ['INTERNAL_ONLY']),
149+
query_result),
150+
daemon=True
151+
)
152+
# Start the query and the pause-check in parallel
153+
t_query.start()
154+
# Wait for the coordinator to be paused
155+
while False in allShards_getIsRPPaused(env):
156+
time.sleep(0.1)
157+
158+
# Resume the coordinator
159+
setPauseRPResume(env)
160+
t_query.join()
161+
allShards_setPauseRPResume(env, 2)
162+
# Now, the result from the coordinator is the first to arrive, which is not an error
163+
# Resume the rest of the shards
164+
print(query_result)
165+
166+
# Test OOM error returned from shards (only for fail) with Sorting
167+
@skip(cluster=False)
168+
def test_query_oom_cluster_shards_sorting_error():
169+
env = Env(shardsCount=3)
170+
171+
# Set OOM policy to fail on all shards
172+
allShards_change_oom_policy(env, 'fail')
173+
174+
_ = _common_cluster_test_scenario(env)
175+
176+
# Change maxmemory on all shards to 1
177+
allShards_change_maxmemory_low(env)
178+
# Change back coord maxmemory to 0
179+
set_unlimited_maxmemory_for_oom(env)
180+
104181
# Verify query fails
105182
env.expect('FT.SEARCH', 'idx', '*').error().contains(OOM_QUERY_ERROR)
106183
# Verify aggregation query fails

0 commit comments

Comments
 (0)