forked from TrueCloudLab/frostfs-testlib
Compare commits
5 commits
d8ce77cf55
...
a48994e88c
Author | SHA1 | Date | |
---|---|---|---|
a48994e88c | |||
47bc11835b | |||
2a90ec74ff | |||
95b32a036a | |||
55d8ee5da0 |
15 changed files with 418 additions and 28 deletions
|
@ -353,6 +353,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
rule: Optional[list[str]] = None,
|
||||
path: Optional[str] = None,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -383,6 +384,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
target_name: str,
|
||||
target_type: str,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -410,6 +412,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
target_type: str,
|
||||
target_name: Optional[str] = None,
|
||||
rpc_endpoint: Optional[str] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -436,6 +439,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
target_name: str,
|
||||
target_type: str,
|
||||
all: Optional[bool] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
|
|
|
@ -164,6 +164,9 @@ class DockerHost(Host):
|
|||
|
||||
return volume_path
|
||||
|
||||
def send_signal_to_service(self, service_name: str, signal: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def delete_metabase(self, service_name: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
|
|
|
@ -117,6 +117,17 @@ class Host(ABC):
|
|||
service_name: Name of the service to stop.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def send_signal_to_service(self, service_name: str, signal: str) -> None:
|
||||
"""Send signal to service with specified name using kill -<signal>
|
||||
|
||||
The service must be hosted on this host.
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to stop.
|
||||
signal: signal name. See kill -l to all names
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def mask_service(self, service_name: str) -> None:
|
||||
"""Prevent the service from start by any activity by masking it.
|
||||
|
|
0
src/frostfs_testlib/http/__init__.py
Normal file
0
src/frostfs_testlib/http/__init__.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
|
@ -0,0 +1,95 @@
|
|||
import json
|
||||
import logging
|
||||
import logging.config
|
||||
|
||||
import httpx
|
||||
|
||||
from frostfs_testlib import reporter
|
||||
|
||||
timeout = httpx.Timeout(60, read=150)
|
||||
LOGGING_CONFIG = {
|
||||
"disable_existing_loggers": False,
|
||||
"version": 1,
|
||||
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
|
||||
"formatters": {
|
||||
"http": {
|
||||
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
}
|
||||
},
|
||||
"loggers": {
|
||||
"httpx": {
|
||||
"handlers": ["default"],
|
||||
"level": "DEBUG",
|
||||
},
|
||||
"httpcore": {
|
||||
"handlers": ["default"],
|
||||
"level": "ERROR",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
logging.config.dictConfig(LOGGING_CONFIG)
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
||||
|
||||
class HttpClient:
|
||||
@reporter.step("Send {method} request to {url}")
|
||||
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
|
||||
transport = httpx.HTTPTransport(verify=False, retries=5)
|
||||
client = httpx.Client(timeout=timeout, transport=transport)
|
||||
response = client.request(method, url, **kwargs)
|
||||
|
||||
self._attach_response(response)
|
||||
logger.info(f"Response: {response.status_code} => {response.text}")
|
||||
|
||||
if expected_status_code:
|
||||
assert response.status_code == expected_status_code, (
|
||||
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def _attach_response(self, response: httpx.Response):
|
||||
request = response.request
|
||||
|
||||
try:
|
||||
request_headers = json.dumps(dict(request.headers), indent=4)
|
||||
except json.JSONDecodeError:
|
||||
request_headers = str(request.headers)
|
||||
|
||||
try:
|
||||
request_body = request.read()
|
||||
try:
|
||||
request_body = request_body.decode("utf-8")
|
||||
except UnicodeDecodeError as e:
|
||||
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
|
||||
except Exception as e:
|
||||
request_body = f"Error reading request body: {str(e)}"
|
||||
|
||||
request_body = "" if request_body is None else request_body
|
||||
|
||||
try:
|
||||
response_headers = json.dumps(dict(response.headers), indent=4)
|
||||
except json.JSONDecodeError:
|
||||
response_headers = str(response.headers)
|
||||
|
||||
report = (
|
||||
f"Method: {request.method}\n\n"
|
||||
f"URL: {request.url}\n\n"
|
||||
f"Request Headers: {request_headers}\n\n"
|
||||
f"Request Body: {request_body}\n\n"
|
||||
f"Response Status Code: {response.status_code}\n\n"
|
||||
f"Response Headers: {response_headers}\n\n"
|
||||
f"Response Body: {response.text}\n\n"
|
||||
)
|
||||
curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body)
|
||||
|
||||
reporter.attach(report, "Requests Info")
|
||||
reporter.attach(curl_request, "CURL")
|
||||
|
||||
def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str:
|
||||
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
||||
data = f" -d '{data}'" if data else ""
|
||||
# Option -k means no verify SSL
|
||||
return f"curl {url} -X {method} {headers}{data} -k"
|
|
@ -53,3 +53,4 @@ HOSTING_CONFIG_FILE = os.getenv(
|
|||
)
|
||||
|
||||
MORE_LOG = os.getenv("MORE_LOG", "1")
|
||||
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"
|
||||
|
|
|
@ -196,11 +196,20 @@ class AwsCliClient(S3ClientWrapper):
|
|||
return response.get("LocationConstraint")
|
||||
|
||||
@reporter.step("List objects S3")
|
||||
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
def list_objects(
|
||||
self, bucket: str, page_size: Optional[int] = None, prefix: Optional[str] = None, full_output: bool = False
|
||||
) -> Union[dict, list[str]]:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} "
|
||||
if page_size:
|
||||
cmd = cmd.replace("--no-paginate", "")
|
||||
cmd += f" --page-size {page_size} "
|
||||
if prefix:
|
||||
cmd += f" --prefix {prefix}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile} "
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
|
@ -1440,3 +1449,90 @@ class AwsCliClient(S3ClientWrapper):
|
|||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
# MFA METHODS
|
||||
@reporter.step("Creates a new virtual MFA device")
|
||||
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple:
|
||||
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
|
||||
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
|
||||
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
||||
assert serial_number, f"Expected SerialNumber in response:\n{response}"
|
||||
|
||||
return serial_number, False
|
||||
|
||||
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes a virtual MFA device")
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\
|
||||
--authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the MFA devices for an IAM user")
|
||||
def iam_list_virtual_mfa_devices(self) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Get session token for user")
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
|
||||
) -> tuple:
|
||||
cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}"
|
||||
if duration_seconds:
|
||||
cmd += f" --duration-seconds {duration_seconds}"
|
||||
if serial_number:
|
||||
cmd += f" --serial-number {serial_number}"
|
||||
if token_code:
|
||||
cmd += f" --token-code {token_code}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
access_key = response.get("Credentials", {}).get("AccessKeyId")
|
||||
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
|
||||
session_token = response.get("Credentials", {}).get("SessionToken")
|
||||
assert access_key, f"Expected AccessKeyId in response:\n{response}"
|
||||
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
|
||||
assert session_token, f"Expected SessionToken in response:\n{response}"
|
||||
|
||||
return access_key, secret_access_key, session_token
|
||||
|
|
|
@ -41,6 +41,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
self.boto3_iam_client: S3Client = None
|
||||
self.iam_endpoint: str = ""
|
||||
|
||||
self.boto3_sts_client: S3Client = None
|
||||
|
||||
self.access_key_id: str = access_key_id
|
||||
self.secret_access_key: str = secret_access_key
|
||||
self.profile = profile
|
||||
|
@ -87,6 +89,14 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
endpoint_url=self.iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
# since the STS does not have an enpoint, IAM is used
|
||||
self.boto3_sts_client = self.session.client(
|
||||
service_name="sts",
|
||||
aws_access_key_id=self.access_key_id,
|
||||
aws_secret_access_key=self.secret_access_key,
|
||||
endpoint_url=iam_endpoint,
|
||||
verify=False,
|
||||
)
|
||||
|
||||
def _to_s3_param(self, param: str) -> str:
|
||||
replacement_map = {
|
||||
|
@ -388,10 +398,17 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
return response if full_output else obj_list
|
||||
|
||||
@reporter.step("List objects S3")
|
||||
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
def list_objects(
|
||||
self, bucket: str, page_size: Optional[int] = None, prefix: Optional[str] = None, full_output: bool = False
|
||||
) -> Union[dict, list[str]]:
|
||||
params = {"Bucket": bucket}
|
||||
if page_size:
|
||||
params["MaxKeys"] = page_size
|
||||
if prefix:
|
||||
params["Prefix"] = prefix
|
||||
response = self._exec_request(
|
||||
self.boto3_client.list_objects,
|
||||
params={"Bucket": bucket},
|
||||
params,
|
||||
endpoint=self.s3gate_endpoint,
|
||||
profile=self.profile,
|
||||
)
|
||||
|
@ -1265,3 +1282,67 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
endpoint=self.iam_endpoint,
|
||||
profile=self.profile,
|
||||
)
|
||||
|
||||
# MFA methods
|
||||
|
||||
@reporter.step("Creates a new virtual MFA device")
|
||||
def iam_create_virtual_mfa_device(
|
||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
||||
) -> tuple:
|
||||
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
|
||||
|
||||
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
|
||||
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
|
||||
assert serial_number, f"Expected SerialNumber in response:\n{response}"
|
||||
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
|
||||
|
||||
return serial_number, base32StringSeed
|
||||
|
||||
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Deletes a virtual MFA device")
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
response = self.boto3_iam_client.enable_mfa_device(
|
||||
UserName=user_name,
|
||||
SerialNumber=serial_number,
|
||||
AuthenticationCode1=authentication_code1,
|
||||
AuthenticationCode2=authentication_code2,
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Lists the MFA devices for an IAM user")
|
||||
def iam_list_virtual_mfa_devices(self) -> dict:
|
||||
response = self.boto3_iam_client.list_virtual_mfa_devices()
|
||||
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Get session token for user")
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
|
||||
) -> tuple:
|
||||
response = self.boto3_sts_client.get_session_token(
|
||||
DurationSeconds=duration_seconds,
|
||||
SerialNumber=serial_number,
|
||||
TokenCode=token_code,
|
||||
)
|
||||
|
||||
access_key = response.get("Credentials", {}).get("AccessKeyId")
|
||||
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
|
||||
session_token = response.get("Credentials", {}).get("SessionToken")
|
||||
assert access_key, f"Expected AccessKeyId in response:\n{response}"
|
||||
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
|
||||
assert session_token, f"Expected SessionToken in response:\n{response}"
|
||||
|
||||
return access_key, secret_access_key, session_token
|
||||
|
|
|
@ -195,7 +195,9 @@ class S3ClientWrapper(HumanReadableABC):
|
|||
"""
|
||||
|
||||
@abstractmethod
|
||||
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
def list_objects(
|
||||
self, bucket: str, page_size: Optional[int] = None, prefix: Optional[str] = None, full_output: bool = False
|
||||
) -> Union[dict, list[str]]:
|
||||
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
|
||||
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
|
||||
A 200 OK response can contain valid or invalid XML. Make sure to design your application
|
||||
|
@ -578,3 +580,32 @@ class S3ClientWrapper(HumanReadableABC):
|
|||
@abstractmethod
|
||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
"""Removes the specified tags from the user"""
|
||||
|
||||
# MFA methods
|
||||
@abstractmethod
|
||||
def iam_create_virtual_mfa_device(
|
||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
||||
) -> tuple:
|
||||
"""Creates a new virtual MFA device"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
|
||||
"""Deactivates the specified MFA device and removes it from association with the user name"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
|
||||
"""Deletes a virtual MFA device"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
|
||||
"""Enables the specified MFA device and associates it with the specified IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_list_virtual_mfa_devices(self) -> dict:
|
||||
"""Lists the MFA devices for an IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def sts_get_session_token(
|
||||
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
|
||||
) -> tuple:
|
||||
"""Get session token for user"""
|
||||
|
|
|
@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
|
|||
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
|
||||
|
||||
class ClusterNode:
|
||||
|
@ -91,10 +91,10 @@ class ClusterNode:
|
|||
config_str = yaml.dump(new_config)
|
||||
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
||||
|
||||
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
|
||||
def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
|
||||
return self.service(service_type).config
|
||||
|
||||
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
|
||||
def service(self, service_type: ServiceClass) -> ServiceClass:
|
||||
"""
|
||||
Get a service cluster node of specified type.
|
||||
|
||||
|
|
|
@ -172,6 +172,15 @@ class ClusterStateController:
|
|||
if service_type == StorageNode:
|
||||
self.wait_after_storage_startup()
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Send sighup to all {service_type} services")
|
||||
def sighup_services_of_type(self, service_type: type[ServiceClass]):
|
||||
services = self.cluster.services(service_type)
|
||||
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
|
||||
|
||||
if service_type == StorageNode:
|
||||
self.wait_after_storage_startup()
|
||||
|
||||
@wait_for_success(600, 60)
|
||||
def wait_s3gate(self, s3gate: S3Gate):
|
||||
with reporter.step(f"Wait for {s3gate} reconnection"):
|
||||
|
@ -206,21 +215,27 @@ class ClusterStateController:
|
|||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Stop {service_type} service on {node}")
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True):
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
|
||||
service = node.service(service_type)
|
||||
service.stop_service(mask)
|
||||
self.stopped_services.add(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Send sighup to {service_type} service on {node}")
|
||||
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
|
||||
service = node.service(service_type)
|
||||
service.send_signal_to_service("SIGHUP")
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Start {service_type} service on {node}")
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
|
||||
service = node.service(service_type)
|
||||
service.start_service()
|
||||
self.stopped_services.discard(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Start all stopped {service_type} services")
|
||||
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
|
||||
def start_stopped_services_of_type(self, service_type: ServiceClass):
|
||||
stopped_svc = self._get_stopped_by_type(service_type)
|
||||
if not stopped_svc:
|
||||
return
|
||||
|
|
|
@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
|
|||
self.cluster = self.csc.cluster
|
||||
|
||||
@reporter.step("Change configuration for {service_type} on all nodes")
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
|
||||
services = self.cluster.services(service_type)
|
||||
nodes = self.cluster.nodes(services)
|
||||
self.services_with_changed_config.update([(node, service_type) for node in nodes])
|
||||
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
if not sighup:
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
|
||||
parallel([node.config(service_type).set for node in nodes], values=values)
|
||||
self.csc.start_services_of_type(service_type)
|
||||
if not sighup:
|
||||
self.csc.start_services_of_type(service_type)
|
||||
else:
|
||||
self.csc.sighup_services_of_type(service_type)
|
||||
|
||||
@reporter.step("Change configuration for {service_type} on {node}")
|
||||
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
|
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
|
|||
self.csc.start_service_of_type(node, service_type)
|
||||
|
||||
@reporter.step("Revert all configuration changes")
|
||||
def revert_all(self):
|
||||
def revert_all(self, sighup: bool = False):
|
||||
if not self.services_with_changed_config:
|
||||
return
|
||||
|
||||
parallel(self._revert_svc, self.services_with_changed_config)
|
||||
parallel(self._revert_svc, self.services_with_changed_config, sighup)
|
||||
self.services_with_changed_config.clear()
|
||||
|
||||
self.csc.start_all_stopped_services()
|
||||
if not sighup:
|
||||
self.csc.start_all_stopped_services()
|
||||
|
||||
# TODO: parallel can't have multiple parallel_items :(
|
||||
@reporter.step("Revert all configuration {node_and_service}")
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]):
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
|
||||
node, service_type = node_and_service
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
service = node.service(service_type)
|
||||
|
||||
if not sighup:
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
|
||||
node.config(service_type).revert()
|
||||
|
||||
if sighup:
|
||||
service.send_signal_to_service("SIGHUP")
|
||||
|
|
|
@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
|
|||
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
|
||||
self.host.start_service(self.name)
|
||||
|
||||
def send_signal_to_service(self, signal: str):
|
||||
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
|
||||
self.host.send_signal_to_service(self.name, signal)
|
||||
|
||||
@abstractmethod
|
||||
def service_healthcheck(self) -> bool:
|
||||
"""Service healthcheck."""
|
||||
|
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
|
|||
|
||||
if attribute_name not in config.attributes:
|
||||
if default_attribute_name is None:
|
||||
raise RuntimeError(
|
||||
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
|
||||
)
|
||||
raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
|
||||
|
||||
return config.attributes[default_attribute_name]
|
||||
|
||||
|
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
|
|||
return self.host.get_service_config(self.name)
|
||||
|
||||
def get_service_uptime(self, service: str) -> datetime:
|
||||
result = self.host.get_shell().exec(
|
||||
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
|
||||
)
|
||||
result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
|
||||
start_time = parser.parse(result.stdout.strip())
|
||||
current_time = datetime.now(tz=timezone.utc)
|
||||
active_time = current_time - start_time
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import itertools
|
||||
import traceback
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from contextlib import contextmanager
|
||||
from typing import Callable, Collection, Optional, Union
|
||||
|
@ -55,7 +56,42 @@ def parallel(
|
|||
# Check for exceptions
|
||||
exceptions = [future.exception() for future in futures if future.exception()]
|
||||
if exceptions:
|
||||
message = "\n".join([str(e) for e in exceptions])
|
||||
# Prettify exception in parallel with all underlying stack traces
|
||||
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
|
||||
#
|
||||
# RuntimeError: The following exceptions occured during parallel run:
|
||||
# 1) Exception one text
|
||||
# 2) Exception two text
|
||||
# 3) Exception three text
|
||||
# TRACES:
|
||||
# ==== 1 ====
|
||||
# Traceback (most recent call last):
|
||||
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
|
||||
# result = self.fn(*self.args, **self.kwargs)
|
||||
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
|
||||
# raise RuntimeError(f"Exception one text")
|
||||
# RuntimeError: Exception one text
|
||||
#
|
||||
# ==== 2 ====
|
||||
# Traceback (most recent call last):
|
||||
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
|
||||
# result = self.fn(*self.args, **self.kwargs)
|
||||
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
|
||||
# raise RuntimeError(f"Exception two text")
|
||||
# RuntimeError: Exception two text
|
||||
#
|
||||
# ==== 3 ====
|
||||
# Traceback (most recent call last):
|
||||
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
|
||||
# result = self.fn(*self.args, **self.kwargs)
|
||||
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
|
||||
# raise RuntimeError(f"Exception three text")
|
||||
# RuntimeError: Exception three text
|
||||
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
|
||||
stack_traces = "\n".join(
|
||||
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
|
||||
)
|
||||
message = f"{short_summary}\nTRACES:\n{stack_traces}"
|
||||
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
|
||||
return futures
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ def ensure_directory_opener(path, flags):
|
|||
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
|
||||
# Use object_size dt in future as argument
|
||||
@reporter.step("Generate file")
|
||||
def generate_file(size: int) -> TestFile:
|
||||
def generate_file(size: int, file_name: Optional[str] = None) -> TestFile:
|
||||
"""Generates a binary file with the specified size in bytes.
|
||||
|
||||
Args:
|
||||
|
@ -54,7 +54,11 @@ def generate_file(size: int) -> TestFile:
|
|||
Returns:
|
||||
The path to the generated file.
|
||||
"""
|
||||
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
|
||||
|
||||
if file_name is None:
|
||||
file_name = string_utils.unique_name("object-")
|
||||
|
||||
test_file = TestFile(os.path.join(ASSETS_DIR, file_name))
|
||||
with open(test_file, "wb", opener=ensure_directory_opener) as file:
|
||||
file.write(os.urandom(size))
|
||||
logger.info(f"File with size {size} bytes has been generated: {test_file}")
|
||||
|
|
Loading…
Reference in a new issue