Skip to content

bug: Unsupported integration URI when connecting a Websocket API with Kinesis #7737

@Oblynx

Description

@Oblynx

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

I create a Websocket API, a Kinesis data stream, and an API integration to call PutRecord on Kinesis.
When I send data to the Websocket API, I see error logs on Localstack:

2023-02-23T00:44:16.132  INFO --- [cthread72555] l.s.a.integrations         : Error invoking integration request for route "V1V2CasingDict({'ApiId': '0e2ee594', 'ApiKeyRequired': False, 'AuthorizationType': 'NONE', 'RouteKey': '$default', 'Target': 'integrations/7ff0497d', 'CreatedDate': 1677110864, 'RouteId': '0df4d37b'})": Unsupported integration URI "arn:aws:apigateway:us-east-1:kinesis:action/PutRecord" for Websocket API ID "0e2ee594"

After restarting the Localstack container, I see a different error:

2023-02-23T11:34:34.267  INFO --- [functhread29] websockets.server          : connection open
2023-02-23T11:34:34.269  INFO --- [uncthread303] localstack.utils.threads   : Thread run method <function AdaptiveThreadPool.submit.<locals>._run at 0x7fba144f0310>(None) failed: expected string or bytes-like object Traceback (most recent call last):
  File "/opt/code/localstack/localstack/utils/threads.py", line 58, in run
    result = self.func(self.params, **kwargs)
  File "/opt/code/localstack/localstack/utils/asyncio.py", line 30, in _run
    return fn(*args, **kwargs)
  File "/opt/code/localstack/.venv/lib/python3.10/site-packages/localstack_ext/services/apigateway/integrations.py.enc", line 86, in A
localstack_ext.services.apigateway.authorizers.DeniedAuthorization: expected string or bytes-like object
2023-02-23T11:34:34.271  INFO --- [functhread29] websockets.server          : connection closed

Expected Behavior

When I send data to the Websocket API, I expect it to put records on Kinesis without errors.

How are you starting LocalStack?

With a docker-compose file

Steps To Reproduce

How are you starting localstack (e.g., bin/localstack command, arguments, or docker-compose.yml)

docker-compose.yaml:

version: "3.8"

services:
  devcontainer:
    # my client container from where I run all CLI tools
    # ...

  localstack:
    image: localstack/localstack-pro:1.4
    environment:
      - DEBUG=${DEBUG-}
      - LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY-}
      - DEFAULT_REGION=eu-central-1
      - DATA_DIR=/tmp/localstack/data
      - PERSISTENCE=${PERSISTENCE-}
      - LAMBDA_EXECUTOR=local
      - DOCKER_HOST=unix:///var/run/docker.sock
      - KINESIS_LATENCY=0 # Disable latency for Kinesis API
    volumes:
      - localstack-data:/var/lib/localstack
      - /var/run/docker.sock:/var/run/docker.sock
    stop_grace_period: 5s # Fixes issue with some connection issues

volumes:
  localstack-data: null

Client commands (e.g., AWS SDK code snippet, or sequence of "awslocal" commands)

Terraform.

The Terraform definition looks long, but it only defines a Kinesis stream and 1 websocket API with its integration.

resource "aws_kinesis_stream" "incoming" {
  name                = "incoming-data-stream"
  shard_count = 1
}

# API Gateway configurations - WebSocket API with stream forwarding to Kinesis

# A WebSocket API
resource "aws_apigatewayv2_api" "incoming" {
  name                       = "incoming-websocket-api"
  protocol_type              = "WEBSOCKET"
  route_selection_expression = "\\$default"
}

# Integration with Kinesis Stream
resource "aws_apigatewayv2_integration" "incoming_connection" {
  api_id             = aws_apigatewayv2_api.incoming.id
  integration_type   = "AWS"
  integration_uri    = "arn:aws:apigateway:eu-central-1:kinesis:action/PutRecord"
  integration_method = "POST"
  credentials_arn    = aws_iam_role.kinesis_role.arn
  request_templates = {
    "default" = <<TEMPLATE
      #set($data = "{""deviceID"": $input.json('$.deviceID'), ""recordingID"": $input.json('$.recordingID'), ""stop"": $input.json('$.stop'), ""deviceTimestamp"": $input.json('$.deviceTimestamp'), ""payload"": $input.json('$.payload'), ""connectionID"": ""$context.connectionId""}")
      {
          "Data": "$util.base64Encode($data)",
          "PartitionKey": $input.json('$.deviceID'),
          "StreamName": "${aws_kinesis_stream.incoming.name}"
      }
    TEMPLATE
  }
  template_selection_expression = "default"
}

# Response with HTTP Status Code 200
resource "aws_apigatewayv2_integration_response" "incoming_response" {
  api_id                   = aws_apigatewayv2_api.incoming.id
  integration_id           = aws_apigatewayv2_integration.incoming_connection.id
  integration_response_key = "/200/"
}

# Route for all other connects
resource "aws_apigatewayv2_route" "incoming_route" {
  api_id             = aws_apigatewayv2_api.incoming.id
  route_key          = "$default"
  target             = "integrations/${aws_apigatewayv2_integration.incoming_connection.id}"
  authorization_type = "NONE"
  api_key_required   = false
}

# Response to default requests
resource "aws_apigatewayv2_route_response" "incoming_response" {
  api_id             = aws_apigatewayv2_api.incoming.id
  route_id           = aws_apigatewayv2_route.incoming_route.id
  route_response_key = "$default"
}

# API Gateway Stage
resource "aws_apigatewayv2_stage" "incoming_stage" {
  name          = "v1"
  api_id        = aws_apigatewayv2_api.incoming.id
  deployment_id = aws_apigatewayv2_deployment.incoming_deployment.id
}

# Any changes with connection or the routes will trigger a redeployment of the API
resource "aws_apigatewayv2_deployment" "incoming_deployment" {
  api_id      = aws_apigatewayv2_api.incoming.id
  description = "Incoming api-resource deployment"

  triggers = {
    redeployment = sha1(join(",", tolist([
      jsonencode(aws_apigatewayv2_integration.incoming_connection),
    ])))
  }

  depends_on = [
    aws_apigatewayv2_api.incoming,
    aws_apigatewayv2_route.connect_route,
    aws_apigatewayv2_route.disconnect_route,
    aws_apigatewayv2_route.incoming_route,
    aws_apigatewayv2_integration.incoming_connection,
  ]
}

# Disconnect does need to integrate anywhere, so utilizing MOCK integration
resource "aws_apigatewayv2_integration" "disconnect_integration" {
  api_id           = aws_apigatewayv2_api.incoming.id
  integration_type = "MOCK"
}

# Route for opening new WebSocket connections
resource "aws_apigatewayv2_route" "connect_route" {
  api_id           = aws_apigatewayv2_api.incoming.id
  route_key        = "$connect"
  authorization_type = "NONE"
  api_key_required = false
}

# Response to connect request
resource "aws_apigatewayv2_route_response" "connect_response" {
  api_id             = aws_apigatewayv2_api.incoming.id
  route_id           = aws_apigatewayv2_route.connect_route.id
  route_response_key = "$default"
}

# Route for disconnects
resource "aws_apigatewayv2_route" "disconnect_route" {
  api_id             = aws_apigatewayv2_api.incoming.id
  route_key          = "$disconnect"
  target             = "integrations/${aws_apigatewayv2_integration.disconnect_integration.id}"
  authorization_type = "NONE"
  api_key_required   = false
}

# Response to disconnect requests
resource "aws_apigatewayv2_route_response" "disconnect_response" {
  api_id             = aws_apigatewayv2_api.incoming.id
  route_id           = aws_apigatewayv2_route.disconnect_route.id
  route_response_key = "$default"
}

Environment

- OS: Ubuntu 22.04
- LocalStack: 1.4

Anything else?

No response

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions