-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
refactor kinesis state lifecycle hook #7825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,80 +1,2 @@ | ||
| import logging | ||
| import threading | ||
| from typing import Dict, Optional | ||
|
|
||
| from localstack.aws.accounts import get_aws_account_id | ||
| from localstack.constants import DEFAULT_AWS_ACCOUNT_ID | ||
| from localstack.services.infra import log_startup_message | ||
| from localstack.services.kinesis import kinesis_mock_server | ||
| from localstack.utils.aws import aws_stack | ||
| from localstack.utils.serving import Server | ||
| from localstack.utils.sync import SynchronizedDefaultDict | ||
|
|
||
| LOG = logging.getLogger(__name__) | ||
|
|
||
| _SERVERS: Dict[str, Server] = {} # server singleton keyed by account IDs | ||
| _LOCKS = SynchronizedDefaultDict(threading.RLock) | ||
|
|
||
|
|
||
| def start_kinesis( | ||
| port=None, | ||
| update_listener=None, | ||
| asynchronous=None, | ||
| persist_path: Optional[str] = None, | ||
| account_id=None, | ||
| ) -> Server: | ||
| """ | ||
| Creates a singleton of a Kinesis server and starts it on a new thread. Uses Kinesis Mock | ||
|
|
||
| :param persist_path: path to persist data to | ||
| :param port: port to run server on. Selects an arbitrary available port if None. | ||
| :param update_listener: an update listener instance for server proxy | ||
| :param asynchronous: currently unused but required by localstack.services.plugins.Service.start(). | ||
| TODO: either make use of this param or refactor Service.start() to not pass it. | ||
| :param account_id: account ID to use for this instance of Kinesis-Mock | ||
| :returns: A running Kinesis server instance | ||
| """ | ||
| global _SERVERS | ||
|
|
||
| account_id = account_id or get_aws_account_id() | ||
|
|
||
| with _LOCKS[account_id]: | ||
| if account_id not in _SERVERS: | ||
| # To support multi-accounts we use separate instance of Kinesis-Mock per account | ||
| # See https://github.com/etspaceman/kinesis-mock/issues/377 | ||
| if not _SERVERS.get(account_id): | ||
| _SERVERS[account_id] = kinesis_mock_server.create_kinesis_mock_server( | ||
| account_id=account_id, persist_path=persist_path | ||
| ) | ||
|
|
||
| _SERVERS[account_id].start() | ||
| log_startup_message("Kinesis") | ||
|
|
||
| check_kinesis(account_id=account_id) | ||
|
|
||
| return _SERVERS[account_id] | ||
|
|
||
|
|
||
| def check_kinesis( | ||
| expect_shutdown=False, print_error=False, account_id: str = DEFAULT_AWS_ACCOUNT_ID | ||
| ): | ||
| out = None | ||
| if not expect_shutdown: | ||
| assert _SERVERS.get(account_id) | ||
|
|
||
| try: | ||
| _SERVERS[account_id].wait_is_up() | ||
| out = aws_stack.connect_to_service( | ||
| service_name="kinesis", endpoint_url=_SERVERS[account_id].url | ||
| ).list_streams() | ||
| except Exception: | ||
| if print_error: | ||
| LOG.exception("Kinesis health check failed") | ||
| if expect_shutdown: | ||
| assert out is None | ||
| else: | ||
| assert out is not None and isinstance(out.get("StreamNames"), list) | ||
|
|
||
|
|
||
| def get_server(account_id: str) -> Server: | ||
| return _SERVERS[account_id] | ||
| # FIXME: remove once ext is refactored to no longer use this | ||
| _SERVERS = {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,6 @@ | |
| import time | ||
| from random import random | ||
|
|
||
| import localstack.services.kinesis.kinesis_starter as starter | ||
| from localstack import config | ||
| from localstack.aws.accounts import get_aws_account_id | ||
| from localstack.aws.api import RequestContext | ||
|
|
@@ -27,6 +26,7 @@ | |
| SubscribeToShardOutput, | ||
| ) | ||
| from localstack.constants import LOCALHOST | ||
| from localstack.services.kinesis.kinesis_mock_server import KinesisServerManager | ||
| from localstack.services.kinesis.models import KinesisStore, kinesis_stores | ||
| from localstack.services.plugins import ServiceLifecycleHook | ||
| from localstack.state import AssetDirectory, StateVisitor | ||
|
|
@@ -35,6 +35,7 @@ | |
|
|
||
| LOG = logging.getLogger(__name__) | ||
| MAX_SUBSCRIPTION_SECONDS = 300 | ||
| SERVER_STARTUP_TIMEOUT = 120 | ||
|
|
||
|
|
||
| def find_stream_for_consumer(consumer_arn): | ||
|
|
@@ -48,19 +49,35 @@ def find_stream_for_consumer(consumer_arn): | |
|
|
||
|
|
||
| class KinesisProvider(KinesisApi, ServiceLifecycleHook): | ||
|
|
||
| server_manager: KinesisServerManager | ||
|
|
||
| def __init__(self): | ||
| self.server_manager = KinesisServerManager() | ||
|
|
||
| def accept_state_visitor(self, visitor: StateVisitor): | ||
| visitor.visit(kinesis_stores) | ||
| visitor.visit(AssetDirectory(os.path.join(config.dirs.data, "kinesis"))) | ||
|
|
||
| @staticmethod | ||
| def get_store(account_id: str, region_name: str) -> KinesisStore: | ||
| return kinesis_stores[account_id][region_name] | ||
| def on_before_state_load(self): | ||
| # no need to restart servers, since that happens lazily in `server_manager.get_server_for_account`. | ||
| self.server_manager.shutdown_all() | ||
|
|
||
| def on_before_state_reset(self): | ||
| self.server_manager.shutdown_all() | ||
|
|
||
| def on_before_stop(self): | ||
| self.server_manager.shutdown_all() | ||
|
|
||
| def get_forward_url(self): | ||
| """Return the URL of the backend Kinesis server to forward requests to""" | ||
| account_id = get_aws_account_id() | ||
| starter.start_kinesis(account_id=account_id) | ||
| return f"http://{LOCALHOST}:{starter.get_server(account_id).port}" | ||
| server = self.server_manager.get_server_for_account(account_id) | ||
| return f"http://{LOCALHOST}:{server.port}" | ||
|
|
||
| @staticmethod | ||
| def get_store(account_id: str, region_name: str) -> KinesisStore: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had a quick look at the kinesis stores. Are we using them somewhere?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good question, i didn't actually look into it. but may be useful to keep it |
||
| return kinesis_stores[account_id][region_name] | ||
|
|
||
| def subscribe_to_shard( | ||
| self, | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.