Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def connect(self, target_driver_func: Callable, driver_dialect: DriverDialect, h

def _get_verified_writer_connection(self, props: Properties, is_initial_connection: bool, connect_func: Callable) -> Connection | None:
retry_delay_ms: int = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props)
end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props) * 1000000)
end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_TIMEOUT_MS.get_int(props) * 1000000)

writer_candidate_conn: Optional[Connection]
writer_candidate: Optional[HostInfo]
Expand Down Expand Up @@ -116,7 +116,7 @@ def _get_verified_writer_connection(self, props: Properties, is_initial_connecti

def _get_verified_reader_connection(self, props: Properties, is_initial_connection: bool, connect_func: Callable) -> Optional[Connection]:
retry_delay_ms: int = WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props)
end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.get_int(props) * 1000000)
end_time_nano = perf_counter_ns() + (WrapperProperties.OPEN_CONNECTION_RETRY_TIMEOUT_MS.get_int(props) * 1000000)

reader_candidate_conn: Optional[Connection]
reader_candidate: Optional[HostInfo]
Expand Down
5 changes: 3 additions & 2 deletions aws_advanced_python_wrapper/limitless_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ def _synchronously_get_limitless_routers(self, context: LimitlessContext) -> Non
connection = context.get_connection()
if connection is None or self._plugin_service.driver_dialect.is_closed(connection):
context.set_connection(context.get_connect_func()())
connection = context.get_connection()

new_limitless_routers: List[HostInfo] = self._query_helper.query_for_limitless_routers(
connection, context.get_host_info().port)
Expand All @@ -521,8 +522,8 @@ def _synchronously_get_limitless_routers(self, context: LimitlessContext) -> Non
finally:
lock.release()

def _is_login_exception(self, error: Optional[Exception] = None):
self._plugin_service.is_login_exception(error)
def _is_login_exception(self, error: Optional[Exception] = None) -> bool:
return self._plugin_service.is_login_exception(error)

def start_monitoring(self, host_info: HostInfo,
props: Properties) -> None:
Expand Down
2 changes: 1 addition & 1 deletion aws_advanced_python_wrapper/plugin_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def current_host_info(self) -> HostInfo:
self._current_host_info = self.hosts[0]

if self._current_host_info is None:
raise AwsWrapperError("PluginServiceImpl.CouldNotDetermineCurrentHost")
raise AwsWrapperError(Messages.get("PluginServiceImpl.CouldNotDetermineCurrentHost"))

logger.debug("PluginServiceImpl.SetCurrentHostInfo", self._current_host_info)
return self._current_host_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ BlueGreenStatusMonitor.UnexpectedDialect=[BlueGreenStatusMonitor] Attempted to c
BlueGreenStatusMonitor.UnhandledException=[BlueGreenStatusMonitor] [{}] Unhandled exception: {}.
BlueGreenStatusMonitor.UsesVersion=[BlueGreenStatusMonitor] [{}] Blue/Green deployment uses version '{}' which the driver doesn't support. Version '{}' will be used instead.

BlueGreenStatusProvider.AllGreenHostsChangedName=[BlueGreenStatusProvider] All green hosts have changed their names.
BlueGreenStatusProvider.BlueDnsCompleted=[BlueGreenStatusProvider] [bgdId: '{}'] Blue DNS update completed.
BlueGreenStatusProvider.GreenDnsRemoved=[BlueGreenStatusProvider] [bgdId: '{}'] Green DNS removed.
BlueGreenStatusProvider.GreenHostChangedName=[BlueGreenStatusProvider] Green host '{}' has changed its name to '{}'.
Expand Down Expand Up @@ -466,6 +467,7 @@ StaleDnsPlugin.RequireDynamicProvider=[StaleDnsPlugin] A dynamic host list provi
SubstituteConnectRouting.InProgressCantOpenConnection=[SubstituteConnectRouting] Blue/Green Deployment switchover is in progress. Can't establish connection to '{}'.
SubstituteConnectRouting.RequireIamHost=[SubstituteConnectRouting] Connecting with IP address when IAM authentication is enabled requires an 'iamHost' parameter.

SuspendConnectRouting.WaitConnectUntilCorrespondingHostFound=[SuspendConnectRouting] Blue/Green Deployment switchover is in progress. The 'connect' call for host '{}' will be delayed until its corresponding host is found.
SuspendConnectRouting.InProgressSuspendConnect=[SuspendConnectRouting] Blue/Green Deployment switchover is in progress. The 'connect' call will be delayed until switchover is completed.
SuspendConnectRouting.InProgressTryConnectLater=[SuspendConnectRouting] Blue/Green Deployment switchover is still in progress after {} seconds. Try to connect again later.
SuspendConnectRouting.SwitchoverCompleteContinueWithConnect=[SuspendConnectRouting] Blue/Green Deployment switchover is completed. Continue with connect call. The call was suspended for {} ms.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def set_transfer_session_state_on_switch_func(func: Callable):

@staticmethod
def reset_transfer_session_state_on_switch_func():
SessionStateTransferHandlers.reset_transfer_session_state_on_switch_callable = None
SessionStateTransferHandlers.transfer_session_state_on_switch_callable = None


class SessionStateServiceImpl(SessionStateService):
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/test_aurora_initial_connection_strategy_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest.mock import MagicMock

from aws_advanced_python_wrapper.aurora_initial_connection_strategy_plugin import \
AuroraInitialConnectionStrategyPlugin
from aws_advanced_python_wrapper.utils.properties import (Properties,
WrapperProperties)


def test_retry_deadline_uses_timeout_property():
"""Regression (parity review): the retry deadline previously reused
OPEN_CONNECTION_RETRY_INTERVAL_MS; it must come from
OPEN_CONNECTION_RETRY_TIMEOUT_MS."""
plugin = AuroraInitialConnectionStrategyPlugin.__new__(
AuroraInitialConnectionStrategyPlugin)
plugin._plugin_service = MagicMock()
plugin._plugin_service.get_host_info_by_strategy = MagicMock(return_value=None)
plugin._host_list_provider_service = MagicMock()

props = Properties({})
# Zero total budget: the retry loop must not run even once despite the
# 10-minute interval (with the old bug the deadline WAS the interval).
WrapperProperties.OPEN_CONNECTION_RETRY_TIMEOUT_MS.set(props, "0")
WrapperProperties.OPEN_CONNECTION_RETRY_INTERVAL_MS.set(props, "600000")

connect_func = MagicMock()
result = plugin._get_verified_writer_connection(props, True, connect_func)
assert result is None
connect_func.assert_not_called()
27 changes: 27 additions & 0 deletions tests/unit/test_blue_green_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from aws_advanced_python_wrapper.utils.messages import Messages


def test_blue_green_log_message_keys_resolve():
"""Regression (parity review): both Blue/Green keys referenced by logger
calls exist in the bundle. The formatted
WaitConnectUntilCorrespondingHostFound call previously raised
NotInResourceBundleError whenever DEBUG logging was enabled during a
switchover (Logger's formatted path has no missing-key guard)."""
formatted = Messages.get_formatted(
"SuspendConnectRouting.WaitConnectUntilCorrespondingHostFound", "host-1")
assert "host-1" in formatted
assert Messages.get("BlueGreenStatusProvider.AllGreenHostsChangedName")
50 changes: 50 additions & 0 deletions tests/unit/test_limitless_router_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def mock_plugin_service(mocker, mock_driver_dialect, mock_conn, host_info, defau
service_mock.host_list_provider = mocker.MagicMock()
service_mock.host_list_provider.get_cluster_id.return_value = CLUSTER_ID
service_mock.props = Properties({})
# The retry scenarios below use generic (non-login) errors; without this
# the MagicMock's truthy return would make _is_login_exception treat
# every error as a login failure and raise instead of retrying.
service_mock.is_login_exception.return_value = False

type(service_mock).driver_dialect = mocker.PropertyMock(return_value=mock_driver_dialect)
return service_mock
Expand Down Expand Up @@ -545,3 +549,49 @@ def test_establish_connection_retry_and_max_retries_exceeded_then_raise_exceptio
assert str(e_info.value) == Messages.get("LimitlessRouterService.MaxRetriesExceeded")
assert mock_plugin_service.connect.call_count == WrapperProperties.MAX_RETRIES_MS.get(props)
assert mock_plugin_service.get_host_info_by_strategy.call_count == WrapperProperties.MAX_RETRIES_MS.get(props)


def test_is_login_exception_returns_verdict(mocker):
"""Regression (parity review): _is_login_exception previously discarded
the classification result, so login failures were never short-circuited
out of the retry loop."""
service = LimitlessRouterService.__new__(LimitlessRouterService)
plugin_service_mock = mocker.MagicMock()
service._plugin_service = plugin_service_mock

plugin_service_mock.is_login_exception.return_value = True
assert service._is_login_exception(Exception("auth")) is True
plugin_service_mock.is_login_exception.return_value = False
assert service._is_login_exception(Exception("net")) is False


def test_synchronously_get_limitless_routers_queries_fresh_connection(mocker):
"""Regression (parity review): after reconnecting a dead/None monitoring
connection, the router query must run on the NEW connection -- previously
it used the stale pre-reconnect local."""
service = LimitlessRouterService.__new__(LimitlessRouterService)
plugin_service_mock = mocker.MagicMock()
plugin_service_mock.driver_dialect.is_closed.return_value = True
service._plugin_service = plugin_service_mock
service._storage_service = mocker.MagicMock()
query_helper_mock = mocker.MagicMock()
query_helper_mock.query_for_limitless_routers.return_value = [
mocker.MagicMock(name="router_host")]
service._query_helper = query_helper_mock

stale_conn = mocker.MagicMock(name="stale_conn")
fresh_conn = mocker.MagicMock(name="fresh_conn")
context = mocker.MagicMock()
connections = {"current": stale_conn}
context.get_connection.side_effect = lambda: connections["current"]
context.set_connection.side_effect = (
lambda conn: connections.update(current=conn))
context.get_connect_func.return_value = mocker.MagicMock(
return_value=fresh_conn)
context.get_host_info.return_value = mocker.MagicMock(port=5432)

mocker.patch.object(service, "_get_limitless_routers", return_value=None)
service._synchronously_get_limitless_routers(context)

queried_conn = query_helper_mock.query_for_limitless_routers.call_args[0][0]
assert queried_conn is fresh_conn
19 changes: 18 additions & 1 deletion tests/unit/test_plugin_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.s

from concurrent.futures import TimeoutError
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest # type: ignore

Expand Down Expand Up @@ -412,3 +412,20 @@ def decorator(func):
assert host_info is not None
assert "instance-1.xyz.us-east-2.rds.amazonaws.com" == host_info.host
assert "instance-1" == host_info.host_id


def test_could_not_determine_current_host_raises_resolved_message():
"""Regression (parity review): the terminal current_host_info error
previously raised the raw message KEY instead of the resolved text."""
service = PluginServiceImpl.__new__(PluginServiceImpl)
service._current_host_info = None
service._initial_connection_host_info = None
reader = MagicMock()
reader.role = "reader-not-writer-sentinel"
with patch.object(PluginServiceImpl, "all_hosts", (reader,)), \
patch.object(PluginServiceImpl, "hosts", ()):
with pytest.raises(AwsWrapperError) as exc_info:
_ = service.current_host_info
message = str(exc_info.value)
assert "The current host could not be determined." in message
assert "PluginServiceImpl.CouldNotDetermineCurrentHost" not in message
16 changes: 14 additions & 2 deletions tests/unit/test_session_state_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from aws_advanced_python_wrapper import AwsWrapperConnection
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
from aws_advanced_python_wrapper.plugin_service import PluginService
from aws_advanced_python_wrapper.states.session_state_service import \
SessionStateServiceImpl
from aws_advanced_python_wrapper.states.session_state_service import (
SessionStateServiceImpl, SessionStateTransferHandlers)
from aws_advanced_python_wrapper.utils.properties import Properties


Expand Down Expand Up @@ -121,3 +121,15 @@ def test_transfer_to_new_connection_autocommit(mock_connection, mock_new_connect
session_state_service.complete()

mock_plugin_service.driver_dialect.set_autocommit.assert_called_with(mock_new_connection, value)


def test_transfer_handler_reset_actually_clears(mocker):
"""Regression (parity review): reset_transfer_session_state_on_switch_func
previously assigned a nonexistent attribute, silently keeping the old
handler registered."""
handler = mocker.MagicMock()
SessionStateTransferHandlers.set_transfer_session_state_on_switch_func(handler)
assert SessionStateTransferHandlers.get_transfer_session_state_on_switch_func() is handler

SessionStateTransferHandlers.reset_transfer_session_state_on_switch_func()
assert SessionStateTransferHandlers.get_transfer_session_state_on_switch_func() is None