@@ -120,7 +120,7 @@ def test_link_error_callback_error_callback_retries_eager(self):
120120 )
121121 assert result .get (timeout = TIMEOUT , propagate = False ) == exception
122122
123- @flaky
123+ @pytest . mark . xfail ( raises = TimeoutError , reason = "Task is timeout instead of returning exception" )
124124 def test_link_error_callback_retries (self ):
125125 exception = ExpectedException ("Task expected to fail" , "test" )
126126 result = fail .apply_async (
@@ -140,7 +140,7 @@ def test_link_error_using_signature_eager(self):
140140 assert (fail .apply ().get (timeout = TIMEOUT , propagate = False ), True ) == (
141141 exception , True )
142142
143- @flaky
143+ @pytest . mark . xfail ( raises = TimeoutError , reason = "Task is timeout instead of returning exception" )
144144 def test_link_error_using_signature (self ):
145145 fail = signature ('t.integration.tasks.fail' , args = ("test" ,))
146146 retrun_exception = signature ('t.integration.tasks.return_exception' )
@@ -175,7 +175,7 @@ def test_complex_chain(self, manager):
175175 res = c ()
176176 assert res .get (timeout = TIMEOUT ) == [64 , 65 , 66 , 67 ]
177177
178- @flaky
178+ @pytest . mark . xfail ( raises = TimeoutError , reason = "Task is timeout" )
179179 def test_group_results_in_chain (self , manager ):
180180 # This adds in an explicit test for the special case added in commit
181181 # 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6
@@ -473,7 +473,7 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager):
473473 res = c ()
474474 assert res .get (timeout = TIMEOUT ) == [8 , 8 ]
475475
476- @flaky
476+ @pytest . mark . xfail ( raises = TimeoutError , reason = "Task is timeout" )
477477 def test_nested_chain_group_lone (self , manager ):
478478 """
479479 Test that a lone group in a chain completes.
@@ -1229,7 +1229,7 @@ def apply_chord_incr_with_sleep(self, *args, **kwargs):
12291229 result = c ()
12301230 assert result .get (timeout = TIMEOUT ) == 4
12311231
1232- @flaky
1232+ @pytest . mark . xfail ( reason = "async_results aren't performed in async way" )
12331233 def test_redis_subscribed_channels_leak (self , manager ):
12341234 if not manager .app .conf .result_backend .startswith ('redis' ):
12351235 raise pytest .skip ('Requires redis result backend.' )
@@ -1562,11 +1562,12 @@ def test_chord_on_error(self, manager):
15621562 ) == 1
15631563
15641564 @flaky
1565- def test_generator (self , manager ):
1565+ @pytest .mark .parametrize ('size' , [3 , 4 , 5 , 6 , 7 , 8 , 9 ])
1566+ def test_generator (self , manager , size ):
15661567 def assert_generator (file_name ):
1567- for i in range (3 ):
1568+ for i in range (size ):
15681569 sleep (1 )
1569- if i == 2 :
1570+ if i == size - 1 :
15701571 with open (file_name ) as file_handle :
15711572 # ensures chord header generators tasks are processed incrementally #3021
15721573 assert file_handle .readline () == '0\n ' , "Chord header was unrolled too early"
@@ -1575,7 +1576,7 @@ def assert_generator(file_name):
15751576 with tempfile .NamedTemporaryFile (mode = 'w' , delete = False ) as tmp_file :
15761577 file_name = tmp_file .name
15771578 c = chord (assert_generator (file_name ), tsum .s ())
1578- assert c ().get (timeout = TIMEOUT ) == 3
1579+ assert c ().get (timeout = TIMEOUT ) == size * ( size - 1 ) // 2
15791580
15801581 @flaky
15811582 def test_parallel_chords (self , manager ):
0 commit comments