@@ -199,19 +199,22 @@ def stop(self):
199199 def handle_message (self , message : dict ) -> None :
200200 failure_cause = None
201201 qualifier = self .version_manager .function_version .id .qualifier
202+ function_config = self .version_manager .function_version .config
202203 event_invoke_config = self .version_manager .function .event_invoke_configs .get (qualifier )
203204 runtime = None
204205 status = None
205- initialization_type = FunctionInitializationType .on_demand
206+ # TODO: handle initialization_type provisioned-concurrency, which requires enriching invocation_result
207+ initialization_type = (
208+ FunctionInitializationType .lambda_managed_instances
209+ if function_config .CapacityProviderConfig
210+ else FunctionInitializationType .on_demand
211+ )
206212 try :
207213 sqs_invocation = SQSInvocation .decode (message ["Body" ])
208214 invocation = sqs_invocation .invocation
209215 try :
210216 invocation_result = self .version_manager .invoke (invocation = invocation )
211- function_config = self .version_manager .function_version .config
212- # TODO: handle initialization_type provisioned-concurrency, requires enriching invocation_result
213- if function_config .CapacityProviderConfig :
214- initialization_type = FunctionInitializationType .lambda_managed_instances
217+ status = FunctionStatus .success
215218 except Exception as e :
216219 # Reserved concurrency == 0
217220 if self .version_manager .function .reserved_concurrent_executions == 0 :
@@ -221,6 +224,7 @@ def handle_message(self, message: dict) -> None:
221224 elif not has_enough_time_for_retry (sqs_invocation , event_invoke_config ):
222225 failure_cause = "EventAgeExceeded"
223226 status = FunctionStatus .event_age_exceeded_error
227+
224228 if failure_cause :
225229 invocation_result = InvocationResult (
226230 is_error = True , request_id = invocation .request_id , payload = None , logs = None
@@ -238,8 +242,7 @@ def handle_message(self, message: dict) -> None:
238242 sqs_client .delete_message (
239243 QueueUrl = self .event_queue_url , ReceiptHandle = message ["ReceiptHandle" ]
240244 )
241- # status MUST be set before returning
242- function_config = self .version_manager .function_version .config
245+ assert status , "status MUST be set before returning"
243246 function_counter .labels (
244247 operation = FunctionOperation .invoke ,
245248 runtime = runtime or "n/a" ,
@@ -256,6 +259,8 @@ def handle_message(self, message: dict) -> None:
256259 if event_invoke_config and event_invoke_config .maximum_retry_attempts is not None :
257260 max_retry_attempts = event_invoke_config .maximum_retry_attempts
258261
262+ assert invocation_result , "Invocation result MUST exist if we are not returning before"
263+
259264 # An invocation error either leads to a terminal failure or to a scheduled retry
260265 if invocation_result .is_error : # invocation error
261266 failure_cause = None
0 commit comments