Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 68 additions & 40 deletions localstack/services/kinesis/kinesis_mock_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import os
import threading
from typing import Dict, List, Optional, Tuple

from localstack import config
Expand Down Expand Up @@ -107,46 +109,72 @@ def _log_listener(self, line, **_kwargs):
LOG.info(line.rstrip())


def create_kinesis_mock_server(
account_id: str, port=None, persist_path: Optional[str] = None
) -> KinesisMockServer:
"""
Creates a new Kinesis Mock server instance. Installs Kinesis Mock on the host first if necessary.
Introspects on the host config to determine server configuration:
config.dirs.data -> if set, the server runs with persistence using the path to store data
config.LS_LOG -> configure kinesis mock log level (defaults to INFO)
config.KINESIS_LATENCY -> configure stream latency (in milliseconds)
config.KINESIS_INITIALIZE_STREAMS -> Initialize the given streams on startup
"""
port = port or get_free_tcp_port()
kinesismock_package.install()
kinesis_mock_bin_path = kinesismock_package.get_installer().get_executable_path()
persist_path = (
f"{config.dirs.data}/kinesis" if not persist_path and config.dirs.data else persist_path
)
if persist_path:
class KinesisServerManager:
default_startup_timeout = 60

def __init__(self):
self._lock = threading.RLock()
self._servers: dict[str, KinesisMockServer] = {}

def get_server_for_account(self, account_id: str) -> KinesisMockServer:
if account_id in self._servers:
return self._servers[account_id]

with self._lock:
if account_id in self._servers:
return self._servers[account_id]

LOG.info("Creating kinesis backend for account %s", account_id)
self._servers[account_id] = self._create_kinesis_mock_server(account_id)
self._servers[account_id].start()
if not self._servers[account_id].wait_is_up(timeout=self.default_startup_timeout):
raise TimeoutError("gave up waiting for kinesis backend to start up")
return self._servers[account_id]

def shutdown_all(self):
with self._lock:
while self._servers:
account_id, server = self._servers.popitem()
LOG.info("Shutting down kinesis backend for account %s", account_id)
server.shutdown()

def _create_kinesis_mock_server(self, account_id: str) -> KinesisMockServer:
"""
Creates a new Kinesis Mock server instance. Installs Kinesis Mock on the host first if necessary.
Introspects on the host config to determine server configuration:
config.dirs.data -> if set, the server runs with persistence using the path to store data
config.LS_LOG -> configure kinesis mock log level (defaults to INFO)
config.KINESIS_LATENCY -> configure stream latency (in milliseconds)
config.KINESIS_INITIALIZE_STREAMS -> Initialize the given streams on startup
"""
port = get_free_tcp_port()
kinesismock_package.install()
kinesis_mock_bin_path = kinesismock_package.get_installer().get_executable_path()

# kinesis-mock stores state in json files <account_id>.json, so we can dump everything into `kinesis/`
persist_path = os.path.join(config.dirs.data, "kinesis")
mkdir(persist_path)

if config.LS_LOG:
if config.LS_LOG == "warning":
log_level = "WARN"
if config.LS_LOG:
if config.LS_LOG == "warning":
log_level = "WARN"
else:
log_level = config.LS_LOG.upper()
else:
log_level = config.LS_LOG.upper()
else:
log_level = "INFO"

latency = config.KINESIS_LATENCY + "ms"
initialize_streams = (
config.KINESIS_INITIALIZE_STREAMS if config.KINESIS_INITIALIZE_STREAMS else None
)

server = KinesisMockServer(
port=port,
bin_path=kinesis_mock_bin_path,
log_level=log_level,
latency=latency,
initialize_streams=initialize_streams,
data_dir=persist_path,
account_id=account_id,
)
return server
log_level = "INFO"

latency = config.KINESIS_LATENCY + "ms"
initialize_streams = (
config.KINESIS_INITIALIZE_STREAMS if config.KINESIS_INITIALIZE_STREAMS else None
)

server = KinesisMockServer(
port=port,
bin_path=kinesis_mock_bin_path,
log_level=log_level,
latency=latency,
initialize_streams=initialize_streams,
data_dir=persist_path,
account_id=account_id,
)
return server
82 changes: 2 additions & 80 deletions localstack/services/kinesis/kinesis_starter.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,2 @@
import logging
import threading
from typing import Dict, Optional

from localstack.aws.accounts import get_aws_account_id
from localstack.constants import DEFAULT_AWS_ACCOUNT_ID
from localstack.services.infra import log_startup_message
from localstack.services.kinesis import kinesis_mock_server
from localstack.utils.aws import aws_stack
from localstack.utils.serving import Server
from localstack.utils.sync import SynchronizedDefaultDict

LOG = logging.getLogger(__name__)

_SERVERS: Dict[str, Server] = {} # server singleton keyed by account IDs
_LOCKS = SynchronizedDefaultDict(threading.RLock)


def start_kinesis(
port=None,
update_listener=None,
asynchronous=None,
persist_path: Optional[str] = None,
account_id=None,
) -> Server:
"""
Creates a singleton of a Kinesis server and starts it on a new thread. Uses Kinesis Mock

:param persist_path: path to persist data to
:param port: port to run server on. Selects an arbitrary available port if None.
:param update_listener: an update listener instance for server proxy
:param asynchronous: currently unused but required by localstack.services.plugins.Service.start().
TODO: either make use of this param or refactor Service.start() to not pass it.
:param account_id: account ID to use for this instance of Kinesis-Mock
:returns: A running Kinesis server instance
"""
global _SERVERS

account_id = account_id or get_aws_account_id()

with _LOCKS[account_id]:
if account_id not in _SERVERS:
# To support multi-accounts we use separate instance of Kinesis-Mock per account
# See https://github.com/etspaceman/kinesis-mock/issues/377
if not _SERVERS.get(account_id):
_SERVERS[account_id] = kinesis_mock_server.create_kinesis_mock_server(
account_id=account_id, persist_path=persist_path
)

_SERVERS[account_id].start()
log_startup_message("Kinesis")

check_kinesis(account_id=account_id)

return _SERVERS[account_id]


def check_kinesis(
expect_shutdown=False, print_error=False, account_id: str = DEFAULT_AWS_ACCOUNT_ID
):
out = None
if not expect_shutdown:
assert _SERVERS.get(account_id)

try:
_SERVERS[account_id].wait_is_up()
out = aws_stack.connect_to_service(
service_name="kinesis", endpoint_url=_SERVERS[account_id].url
).list_streams()
except Exception:
if print_error:
LOG.exception("Kinesis health check failed")
if expect_shutdown:
assert out is None
else:
assert out is not None and isinstance(out.get("StreamNames"), list)


def get_server(account_id: str) -> Server:
return _SERVERS[account_id]
# FIXME: remove once ext is refactored to no longer use this
_SERVERS = {}
29 changes: 23 additions & 6 deletions localstack/services/kinesis/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
from random import random

import localstack.services.kinesis.kinesis_starter as starter
from localstack import config
from localstack.aws.accounts import get_aws_account_id
from localstack.aws.api import RequestContext
Expand All @@ -27,6 +26,7 @@
SubscribeToShardOutput,
)
from localstack.constants import LOCALHOST
from localstack.services.kinesis.kinesis_mock_server import KinesisServerManager
from localstack.services.kinesis.models import KinesisStore, kinesis_stores
from localstack.services.plugins import ServiceLifecycleHook
from localstack.state import AssetDirectory, StateVisitor
Expand All @@ -35,6 +35,7 @@

LOG = logging.getLogger(__name__)
MAX_SUBSCRIPTION_SECONDS = 300
SERVER_STARTUP_TIMEOUT = 120


def find_stream_for_consumer(consumer_arn):
Expand All @@ -48,19 +49,35 @@ def find_stream_for_consumer(consumer_arn):


class KinesisProvider(KinesisApi, ServiceLifecycleHook):

server_manager: KinesisServerManager

def __init__(self):
self.server_manager = KinesisServerManager()

def accept_state_visitor(self, visitor: StateVisitor):
visitor.visit(kinesis_stores)
visitor.visit(AssetDirectory(os.path.join(config.dirs.data, "kinesis")))

@staticmethod
def get_store(account_id: str, region_name: str) -> KinesisStore:
return kinesis_stores[account_id][region_name]
def on_before_state_load(self):
# no need to restart servers, since that happens lazily in `server_manager.get_server_for_account`.
self.server_manager.shutdown_all()

def on_before_state_reset(self):
self.server_manager.shutdown_all()

def on_before_stop(self):
self.server_manager.shutdown_all()

def get_forward_url(self):
"""Return the URL of the backend Kinesis server to forward requests to"""
account_id = get_aws_account_id()
starter.start_kinesis(account_id=account_id)
return f"http://{LOCALHOST}:{starter.get_server(account_id).port}"
server = self.server_manager.get_server_for_account(account_id)
return f"http://{LOCALHOST}:{server.port}"

@staticmethod
def get_store(account_id: str, region_name: str) -> KinesisStore:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a quick look at the kinesis stores. Are we using them somewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, i didn't actually look into it. but may be useful to keep it

return kinesis_stores[account_id][region_name]

def subscribe_to_shard(
self,
Expand Down
3 changes: 3 additions & 0 deletions localstack/state/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def __init__(self, path: str):

self.path = path

def __str__(self):
return self.path


class Encoder:
def encodes(self, obj: Any) -> bytes:
Expand Down