[#143] Change network utils

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2023-12-12 09:38:38 +03:00 committed by Dmitriy Zayakin
parent 54d26b226c
commit f1264bd473
4 changed files with 48 additions and 120 deletions

View file

@ -5,6 +5,7 @@ from typing import Optional
from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.testing.readable import HumanReadableEnum
from frostfs_testlib.testing.test_control import retry
class HostStatus(HumanReadableEnum):
@ -25,9 +26,7 @@ class Host(ABC):
def __init__(self, config: HostConfig) -> None:
self._config = config
self._service_config_by_name = {
service_config.name: service_config for service_config in config.services
}
self._service_config_by_name = {service_config.name: service_config for service_config in config.services}
self._cli_config_by_name = {cli_config.name: cli_config for cli_config in config.clis}
@property
@ -323,9 +322,7 @@ class Host(ABC):
"""
@abstractmethod
def wait_for_service_to_be_in_state(
self, systemd_service_name: str, expected_state: str, timeout: int
) -> None:
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
"""
Waites for service to be in specified state.
@ -335,3 +332,23 @@ class Host(ABC):
timeout: Seconds to wait
"""
def down_interface(self, interface: str) -> None:
shell = self.get_shell()
shell.exec(f"ip link set {interface} down")
def up_interface(self, interface: str) -> None:
shell = self.get_shell()
shell.exec(f"ip link set {interface} up")
def check_state(self, interface: str) -> str:
shell = self.get_shell()
return shell.exec(f"ip link show {interface} | sed -z 's/.*state \(.*\) mode .*/\\1/'").stdout.strip()
@retry(max_attempts=5, sleep_interval=5, expected_result="UP")
def check_state_up(self, interface: str) -> str:
return self.check_state(interface=interface)
@retry(max_attempts=5, sleep_interval=5, expected_result="DOWN")
def check_state_down(self, interface: str) -> str:
return self.check_state(interface=interface)