3232
3333DEFAULT_DATE = timezone .datetime (2017 , 1 , 1 )
3434
35+ QUEUE_NAME = 'test-queue'
36+ QUEUE_URL = f'https://{ QUEUE_NAME } '
37+
3538
3639class TestSQSSensor (unittest .TestCase ):
3740 def setUp (self ):
3841 args = {'owner' : 'airflow' , 'start_date' : DEFAULT_DATE }
3942
4043 self .dag = DAG ('test_dag_id' , default_args = args )
4144 self .sensor = SQSSensor (
42- task_id = 'test_task' , dag = self .dag , sqs_queue = 'test' , aws_conn_id = 'aws_default'
45+ task_id = 'test_task' , dag = self .dag , sqs_queue = QUEUE_URL , aws_conn_id = 'aws_default'
4346 )
4447
4548 self .mock_context = mock .MagicMock ()
4649 self .sqs_hook = SQSHook ()
4750
4851 @mock_sqs
4952 def test_poke_success (self ):
50- self .sqs_hook .create_queue ('test' )
51- self .sqs_hook .send_message (queue_url = 'test' , message_body = 'hello' )
53+ self .sqs_hook .create_queue (QUEUE_NAME )
54+ self .sqs_hook .send_message (queue_url = QUEUE_URL , message_body = 'hello' )
5255
5356 result = self .sensor .poke (self .mock_context )
5457 assert result
@@ -60,7 +63,7 @@ def test_poke_success(self):
6063 @mock_sqs
6164 def test_poke_no_message_failed (self ):
6265
63- self .sqs_hook .create_queue ('test' )
66+ self .sqs_hook .create_queue (QUEUE_NAME )
6467 result = self .sensor .poke (self .mock_context )
6568 assert not result
6669
@@ -112,40 +115,40 @@ def test_poke_receive_raise_exception(self, mock_conn):
112115 @mock .patch .object (SQSHook , 'get_conn' )
113116 def test_poke_visibility_timeout (self , mock_conn ):
114117 # Check without visibility_timeout parameter
115- self .sqs_hook .create_queue ('test' )
116- self .sqs_hook .send_message (queue_url = 'test' , message_body = 'hello' )
118+ self .sqs_hook .create_queue (QUEUE_NAME )
119+ self .sqs_hook .send_message (queue_url = QUEUE_URL , message_body = 'hello' )
117120
118121 self .sensor .poke (self .mock_context )
119122
120123 calls_receive_message = [
121- mock .call ().receive_message (QueueUrl = 'test' , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 )
124+ mock .call ().receive_message (QueueUrl = QUEUE_URL , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 )
122125 ]
123126 mock_conn .assert_has_calls (calls_receive_message )
124127 # Check with visibility_timeout parameter
125128 self .sensor = SQSSensor (
126129 task_id = 'test_task2' ,
127130 dag = self .dag ,
128- sqs_queue = 'test' ,
131+ sqs_queue = QUEUE_URL ,
129132 aws_conn_id = 'aws_default' ,
130133 visibility_timeout = 42 ,
131134 )
132135 self .sensor .poke (self .mock_context )
133136
134137 calls_receive_message = [
135138 mock .call ().receive_message (
136- QueueUrl = 'test' , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 , VisibilityTimeout = 42
139+ QueueUrl = QUEUE_URL , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 , VisibilityTimeout = 42
137140 )
138141 ]
139142 mock_conn .assert_has_calls (calls_receive_message )
140143
141144 @mock_sqs
142145 def test_poke_message_invalid_filtering (self ):
143- self .sqs_hook .create_queue ('test' )
144- self .sqs_hook .send_message (queue_url = 'test' , message_body = 'hello' )
146+ self .sqs_hook .create_queue (QUEUE_NAME )
147+ self .sqs_hook .send_message (queue_url = QUEUE_URL , message_body = 'hello' )
145148 sensor = SQSSensor (
146149 task_id = 'test_task2' ,
147150 dag = self .dag ,
148- sqs_queue = 'test' ,
151+ sqs_queue = QUEUE_URL ,
149152 aws_conn_id = 'aws_default' ,
150153 message_filtering = 'invalid_option' ,
151154 )
@@ -155,7 +158,7 @@ def test_poke_message_invalid_filtering(self):
155158
156159 @mock .patch .object (SQSHook , "get_conn" )
157160 def test_poke_message_filtering_literal_values (self , mock_conn ):
158- self .sqs_hook .create_queue ('test' )
161+ self .sqs_hook .create_queue (QUEUE_NAME )
159162 matching = [{"id" : 11 , "body" : "a matching message" }]
160163 non_matching = [{"id" : 12 , "body" : "a non-matching message" }]
161164 all = matching + non_matching
@@ -188,13 +191,13 @@ def mock_delete_message_batch(**kwargs):
188191 # Test that only filtered messages are deleted
189192 delete_entries = [{'Id' : x ['id' ], 'ReceiptHandle' : 100 + x ['id' ]} for x in matching ]
190193 calls_delete_message_batch = [
191- mock .call ().delete_message_batch (QueueUrl = 'test' , Entries = delete_entries )
194+ mock .call ().delete_message_batch (QueueUrl = QUEUE_URL , Entries = delete_entries )
192195 ]
193196 mock_conn .assert_has_calls (calls_delete_message_batch )
194197
195198 @mock .patch .object (SQSHook , "get_conn" )
196199 def test_poke_message_filtering_jsonpath (self , mock_conn ):
197- self .sqs_hook .create_queue ('test' )
200+ self .sqs_hook .create_queue (QUEUE_NAME )
198201 matching = [
199202 {"id" : 11 , "key" : {"matches" : [1 , 2 ]}},
200203 {"id" : 12 , "key" : {"matches" : [3 , 4 , 5 ]}},
@@ -234,13 +237,13 @@ def mock_delete_message_batch(**kwargs):
234237 # Test that only filtered messages are deleted
235238 delete_entries = [{'Id' : x ['id' ], 'ReceiptHandle' : 100 + x ['id' ]} for x in matching ]
236239 calls_delete_message_batch = [
237- mock .call ().delete_message_batch (QueueUrl = 'test' , Entries = delete_entries )
240+ mock .call ().delete_message_batch (QueueUrl = QUEUE_URL , Entries = delete_entries )
238241 ]
239242 mock_conn .assert_has_calls (calls_delete_message_batch )
240243
241244 @mock .patch .object (SQSHook , "get_conn" )
242245 def test_poke_message_filtering_jsonpath_values (self , mock_conn ):
243- self .sqs_hook .create_queue ('test' )
246+ self .sqs_hook .create_queue (QUEUE_NAME )
244247 matching = [
245248 {"id" : 11 , "key" : {"matches" : [1 , 2 ]}},
246249 {"id" : 12 , "key" : {"matches" : [1 , 4 , 5 ]}},
@@ -282,6 +285,6 @@ def mock_delete_message_batch(**kwargs):
282285 # Test that only filtered messages are deleted
283286 delete_entries = [{'Id' : x ['id' ], 'ReceiptHandle' : 100 + x ['id' ]} for x in matching ]
284287 calls_delete_message_batch = [
285- mock .call ().delete_message_batch (QueueUrl = 'test' , Entries = delete_entries )
288+ mock .call ().delete_message_batch (QueueUrl = 'https:// test-queue ' , Entries = delete_entries )
286289 ]
287290 mock_conn .assert_has_calls (calls_delete_message_batch )
0 commit comments