11from common import *
2+ import threading
23
34OOM_QUERY_ERROR = "Not enough memory available to execute the query"
45
@@ -18,6 +19,9 @@ def allShards_change_maxmemory_low(env):
1819 res = env .getConnection (shardId ).execute_command ('config' , 'set' , 'maxmemory' , 1 )
1920 env .assertEqual (res , 'OK' )
2021
22+ # Helper to call a function and push its return value into a list
23+ def _call_and_store (fn , args , out_list ):
24+ out_list .append (fn (* args ))
2125
2226def _common_test_scenario (env ):
2327 # Create an index
@@ -30,7 +34,7 @@ def _common_test_scenario(env):
3034def _common_cluster_test_scenario (env ):
3135 conn = getConnectionByEnv (env )
3236 # Create an index
33- env .expect ('FT.CREATE' , 'idx' , 'SCHEMA' , 'name' , 'TEXT' ).ok ()
37+ env .expect ('FT.CREATE' , 'idx' , 'SCHEMA' , 'name' , 'TEXT' , 'SORTABLE' ).ok ()
3438
3539 n_docs_per_shard = 100
3640 n_docs = n_docs_per_shard * env .shardsCount
@@ -42,6 +46,17 @@ def _common_cluster_test_scenario(env):
4246
4347 return n_docs
4448
49+ def runDebugQueryCommand_expect_oom (env , query_cmd , debug_params ):
50+ # Use the helper function to build the argument list
51+ args = parseDebugQueryCommandArgs (query_cmd , debug_params )
52+ return env .expect (debug_cmd (), * args ).error ().contains (OOM_QUERY_ERROR )
53+
54+ def runDebugQueryCommandPauseBeforeRPAfterN_expect_oom (env , query_cmd , rp_type , pause_after_n , extra_args = None ):
55+ debug_params = ['PAUSE_BEFORE_RP_N' , rp_type , pause_after_n ]
56+ if extra_args :
57+ debug_params .extend (extra_args )
58+ return runDebugQueryCommand_expect_oom (env , query_cmd , debug_params )
59+
4560# Test ignore policy
4661@skip (cluster = True )
4762def test_query_oom_ignore (env ):
@@ -101,6 +116,68 @@ def test_query_oom_cluster_shards_error():
101116 # Change back coord maxmemory to 0
102117 set_unlimited_maxmemory_for_oom (env )
103118
119+ # Verify query fails
120+ env .expect ('FT.SEARCH' , 'idx' , '*' , 'SORTBY' , 'name' , 'ASC' ).error ().contains (OOM_QUERY_ERROR )
121+ # Verify aggregation query fails with sorting so it has to wait for all shards
122+ env .expect ('FT.AGGREGATE' , 'idx' , '*' , 'LOAD' , 1 , '@name' , 'SORTBY' , 2 , '@name' , 'ASC' ).error ().contains (OOM_QUERY_ERROR )
123+
124+ # Test OOM error returned from shards (only for fail), enforcing first reply from non-error shard
125+ # @skip(cluster=False)
126+ def test_query_oom_cluster_shards_error_first_reply ():
127+ env = Env (shardsCount = 3 , moduleArgs = 'WORKERS 1' )
128+
129+ # Set OOM policy to fail on all shards
130+ allShards_change_oom_policy (env , 'return' )
131+
132+ _ = _common_cluster_test_scenario (env )
133+
134+ # Change maxmemory on all shards to 1
135+ allShards_change_maxmemory_low (env )
136+ # Change back coord maxmemory to 0
137+ set_unlimited_maxmemory_for_oom (env )
138+
139+ ##### Use debug command to enforce first reply from non-error shard #####
140+
141+ # We need to call the queries in MT so the paused query won't block the test
142+ query_result = []
143+ # Build threads
144+ query_args = ['FT.AGGREGATE' , 'idx' , '*' , 'LIMIT' , 0 , 120 ]
145+ t_query = threading .Thread (
146+ target = _call_and_store ,
147+ args = (runDebugQueryCommandPauseBeforeRPAfterN_expect_oom ,
148+ (env , query_args , 'Index' , 0 , ['INTERNAL_ONLY' ]),
149+ query_result ),
150+ daemon = True
151+ )
152+ # Start the query and the pause-check in parallel
153+ t_query .start ()
154+ # Wait for the coordinator to be paused
155+ while False in allShards_getIsRPPaused (env ):
156+ time .sleep (0.1 )
157+
158+ # Resume the coordinator
159+ setPauseRPResume (env )
160+ t_query .join ()
161+ allShards_setPauseRPResume (env , 2 )
162+ # Now, the result from the coordinator is the first to arrive, which is not an error
163+ # Resume the rest of the shards
164+ print (query_result )
165+
166+ # Test OOM error returned from shards (only for fail) with Sorting
167+ @skip (cluster = False )
168+ def test_query_oom_cluster_shards_sorting_error ():
169+ env = Env (shardsCount = 3 )
170+
171+ # Set OOM policy to fail on all shards
172+ allShards_change_oom_policy (env , 'fail' )
173+
174+ _ = _common_cluster_test_scenario (env )
175+
176+ # Change maxmemory on all shards to 1
177+ allShards_change_maxmemory_low (env )
178+ # Change back coord maxmemory to 0
179+ set_unlimited_maxmemory_for_oom (env )
180+
104181 # Verify query fails
105182 env .expect ('FT.SEARCH' , 'idx' , '*' ).error ().contains (OOM_QUERY_ERROR )
106183 # Verify aggregation query fails
0 commit comments