1313from localstack import config
1414from localstack .aws .api .lambda_ import InvocationType , TooManyRequestsException
1515from localstack .services .lambda_ .analytics import (
16+ FunctionInitializationType ,
1617 FunctionOperation ,
1718 FunctionStatus ,
1819 function_counter ,
@@ -198,22 +199,22 @@ def stop(self):
198199 def handle_message (self , message : dict ) -> None :
199200 failure_cause = None
200201 qualifier = self .version_manager .function_version .id .qualifier
202+ function_config = self .version_manager .function_version .config
201203 event_invoke_config = self .version_manager .function .event_invoke_configs .get (qualifier )
202204 runtime = None
203205 status = None
206+ # TODO: handle initialization_type provisioned-concurrency, which requires enriching invocation_result
207+ initialization_type = (
208+ FunctionInitializationType .lambda_managed_instances
209+ if function_config .CapacityProviderConfig
210+ else FunctionInitializationType .on_demand
211+ )
204212 try :
205213 sqs_invocation = SQSInvocation .decode (message ["Body" ])
206214 invocation = sqs_invocation .invocation
207215 try :
208216 invocation_result = self .version_manager .invoke (invocation = invocation )
209- function_config = self .version_manager .function_version .config
210- function_counter .labels (
211- operation = FunctionOperation .invoke ,
212- runtime = function_config .runtime or "n/a" ,
213- status = FunctionStatus .success ,
214- invocation_type = InvocationType .Event ,
215- package_type = function_config .package_type ,
216- ).increment ()
217+ status = FunctionStatus .success
217218 except Exception as e :
218219 # Reserved concurrency == 0
219220 if self .version_manager .function .reserved_concurrent_executions == 0 :
@@ -223,6 +224,7 @@ def handle_message(self, message: dict) -> None:
223224 elif not has_enough_time_for_retry (sqs_invocation , event_invoke_config ):
224225 failure_cause = "EventAgeExceeded"
225226 status = FunctionStatus .event_age_exceeded_error
227+
226228 if failure_cause :
227229 invocation_result = InvocationResult (
228230 is_error = True , request_id = invocation .request_id , payload = None , logs = None
@@ -240,14 +242,14 @@ def handle_message(self, message: dict) -> None:
240242 sqs_client .delete_message (
241243 QueueUrl = self .event_queue_url , ReceiptHandle = message ["ReceiptHandle" ]
242244 )
243- # status MUST be set before returning
244- package_type = self .version_manager .function_version .config .package_type
245+ assert status , "status MUST be set before returning"
245246 function_counter .labels (
246247 operation = FunctionOperation .invoke ,
247248 runtime = runtime or "n/a" ,
248249 status = status ,
249250 invocation_type = InvocationType .Event ,
250- package_type = package_type ,
251+ package_type = function_config .package_type ,
252+ initialization_type = initialization_type ,
251253 ).increment ()
252254
253255 # Good summary blogpost: https://haithai91.medium.com/aws-lambdas-retry-behaviors-edff90e1cf1b
@@ -257,6 +259,8 @@ def handle_message(self, message: dict) -> None:
257259 if event_invoke_config and event_invoke_config .maximum_retry_attempts is not None :
258260 max_retry_attempts = event_invoke_config .maximum_retry_attempts
259261
262+ assert invocation_result , "Invocation result MUST exist if we are not returning before"
263+
260264 # An invocation error either leads to a terminal failure or to a scheduled retry
261265 if invocation_result .is_error : # invocation error
262266 failure_cause = None
0 commit comments