[#318] Add tombstone expiration test #318
7 changed files with 63 additions and 20 deletions
|
@ -164,6 +164,9 @@ class DockerHost(Host):
|
|||
|
||||
return volume_path
|
||||
|
||||
def send_signal_to_service(self, service_name: str, signal: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def delete_metabase(self, service_name: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
|
|
|
@ -117,6 +117,17 @@ class Host(ABC):
|
|||
service_name: Name of the service to stop.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def send_signal_to_service(self, service_name: str, signal: str) -> None:
|
||||
"""Send signal to service with specified name using kill -<signal>
|
||||
|
||||
The service must be hosted on this host.
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to stop.
|
||||
signal: signal name. See kill -l to all names
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def mask_service(self, service_name: str) -> None:
|
||||
"""Prevent the service from start by any activity by masking it.
|
||||
|
|
|
@ -53,3 +53,4 @@ HOSTING_CONFIG_FILE = os.getenv(
|
|||
)
|
||||
|
||||
MORE_LOG = os.getenv("MORE_LOG", "1")
|
||||
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"
|
||||
|
|
|
@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
|
|||
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
|
||||
|
||||
class ClusterNode:
|
||||
|
@ -91,10 +91,10 @@ class ClusterNode:
|
|||
config_str = yaml.dump(new_config)
|
||||
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
||||
|
||||
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
|
||||
def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
|
||||
return self.service(service_type).config
|
||||
|
||||
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
|
||||
def service(self, service_type: ServiceClass) -> ServiceClass:
|
||||
"""
|
||||
Get a service cluster node of specified type.
|
||||
|
||||
|
|
|
@ -172,6 +172,15 @@ class ClusterStateController:
|
|||
if service_type == StorageNode:
|
||||
self.wait_after_storage_startup()
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Send sighup to all {service_type} services")
|
||||
def sighup_services_of_type(self, service_type: type[ServiceClass]):
|
||||
services = self.cluster.services(service_type)
|
||||
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
|
||||
|
||||
if service_type == StorageNode:
|
||||
self.wait_after_storage_startup()
|
||||
|
||||
@wait_for_success(600, 60)
|
||||
def wait_s3gate(self, s3gate: S3Gate):
|
||||
with reporter.step(f"Wait for {s3gate} reconnection"):
|
||||
|
@ -206,21 +215,27 @@ class ClusterStateController:
|
|||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Stop {service_type} service on {node}")
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True):
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
|
||||
service = node.service(service_type)
|
||||
service.stop_service(mask)
|
||||
self.stopped_services.add(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Send sighup to {service_type} service on {node}")
|
||||
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
|
||||
service = node.service(service_type)
|
||||
service.send_signal_to_service("SIGHUP")
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Start {service_type} service on {node}")
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
|
||||
service = node.service(service_type)
|
||||
service.start_service()
|
||||
self.stopped_services.discard(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Start all stopped {service_type} services")
|
||||
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
|
||||
def start_stopped_services_of_type(self, service_type: ServiceClass):
|
||||
stopped_svc = self._get_stopped_by_type(service_type)
|
||||
if not stopped_svc:
|
||||
return
|
||||
|
|
|
@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
|
|||
self.cluster = self.csc.cluster
|
||||
|
||||
@reporter.step("Change configuration for {service_type} on all nodes")
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
|
||||
services = self.cluster.services(service_type)
|
||||
nodes = self.cluster.nodes(services)
|
||||
self.services_with_changed_config.update([(node, service_type) for node in nodes])
|
||||
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
if not sighup:
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
|
||||
parallel([node.config(service_type).set for node in nodes], values=values)
|
||||
self.csc.start_services_of_type(service_type)
|
||||
if not sighup:
|
||||
self.csc.start_services_of_type(service_type)
|
||||
else:
|
||||
self.csc.sighup_services_of_type(service_type)
|
||||
|
||||
@reporter.step("Change configuration for {service_type} on {node}")
|
||||
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
|
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
|
|||
self.csc.start_service_of_type(node, service_type)
|
||||
|
||||
@reporter.step("Revert all configuration changes")
|
||||
def revert_all(self):
|
||||
def revert_all(self, sighup: bool = False):
|
||||
if not self.services_with_changed_config:
|
||||
return
|
||||
|
||||
parallel(self._revert_svc, self.services_with_changed_config)
|
||||
parallel(self._revert_svc, self.services_with_changed_config, sighup)
|
||||
self.services_with_changed_config.clear()
|
||||
|
||||
self.csc.start_all_stopped_services()
|
||||
if not sighup:
|
||||
self.csc.start_all_stopped_services()
|
||||
|
||||
# TODO: parallel can't have multiple parallel_items :(
|
||||
@reporter.step("Revert all configuration {node_and_service}")
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]):
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
|
||||
node, service_type = node_and_service
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
service = node.service(service_type)
|
||||
|
||||
if not sighup:
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
|
||||
node.config(service_type).revert()
|
||||
|
||||
if sighup:
|
||||
service.send_signal_to_service("SIGHUP")
|
||||
|
|
|
@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
|
|||
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
|
||||
self.host.start_service(self.name)
|
||||
|
||||
def send_signal_to_service(self, signal: str):
|
||||
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
|
||||
self.host.send_signal_to_service(self.name, signal)
|
||||
|
||||
@abstractmethod
|
||||
def service_healthcheck(self) -> bool:
|
||||
"""Service healthcheck."""
|
||||
|
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
|
|||
|
||||
if attribute_name not in config.attributes:
|
||||
if default_attribute_name is None:
|
||||
raise RuntimeError(
|
||||
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
|
||||
)
|
||||
raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
|
||||
|
||||
return config.attributes[default_attribute_name]
|
||||
|
||||
|
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
|
|||
return self.host.get_service_config(self.name)
|
||||
|
||||
def get_service_uptime(self, service: str) -> datetime:
|
||||
result = self.host.get_shell().exec(
|
||||
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
|
||||
)
|
||||
result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
|
||||
start_time = parser.parse(result.stdout.strip())
|
||||
current_time = datetime.now(tz=timezone.utc)
|
||||
active_time = current_time - start_time
|
||||
|
|
Loading…
Reference in a new issue