forked from TrueCloudLab/frostfs-testlib
Compare commits
10 commits
e6faddedeb
...
451de5e07e
Author | SHA1 | Date | |
---|---|---|---|
451de5e07e | |||
f24bfc06fd | |||
47bc11835b | |||
2a90ec74ff | |||
95b32a036a | |||
55d8ee5da0 | |||
ea40940514 | |||
6f1baf3cf6 | |||
26139767f4 | |||
3d6a356e20 |
17 changed files with 1090 additions and 429 deletions
|
@ -122,7 +122,9 @@ class FrostfsAdmMorph(CliCommand):
|
|||
**{param: param_value for param, param_value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def force_new_epoch(self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None) -> CommandResult:
|
||||
def force_new_epoch(
|
||||
self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None, delta: Optional[int] = None
|
||||
) -> CommandResult:
|
||||
"""Create new FrostFS epoch event in the side chain.
|
||||
|
||||
Args:
|
||||
|
@ -341,7 +343,6 @@ class FrostfsAdmMorph(CliCommand):
|
|||
return self._execute(
|
||||
f"morph remove-nodes {' '.join(node_netmap_keys)}",
|
||||
**{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]},
|
||||
**{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]},
|
||||
)
|
||||
|
||||
def add_rule(
|
||||
|
@ -352,6 +353,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
rule: Optional[list[str]] = None,
|
||||
path: Optional[str] = None,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -382,6 +384,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
target_name: str,
|
||||
target_type: str,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -409,6 +412,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
target_type: str,
|
||||
target_name: Optional[str] = None,
|
||||
rpc_endpoint: Optional[str] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -435,6 +439,7 @@ class FrostfsAdmMorph(CliCommand):
|
|||
target_name: str,
|
||||
target_type: str,
|
||||
all: Optional[bool] = None,
|
||||
chain_name: Optional[str] = None,
|
||||
chain_id_hex: Optional[bool] = None,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
|
|
|
@ -241,3 +241,21 @@ class FrostfsCliShards(CliCommand):
|
|||
"control shards evacuation status",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def detach(self, endpoint: str, address: Optional[str] = None, id: Optional[str] = None, timeout: Optional[str] = None):
|
||||
"""
|
||||
Detach and close the shards
|
||||
|
||||
Args:
|
||||
address: Address of wallet account
|
||||
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
|
||||
id: List of shard IDs in base58 encoding
|
||||
timeout: Timeout for an operation (default 15s)
|
||||
|
||||
Returns:
|
||||
Command's result.
|
||||
"""
|
||||
return self._execute(
|
||||
"control shards detach",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
|
|
@ -164,6 +164,9 @@ class DockerHost(Host):
|
|||
|
||||
return volume_path
|
||||
|
||||
def send_signal_to_service(self, service_name: str, signal: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def delete_metabase(self, service_name: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
|
|
|
@ -117,6 +117,17 @@ class Host(ABC):
|
|||
service_name: Name of the service to stop.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def send_signal_to_service(self, service_name: str, signal: str) -> None:
|
||||
"""Send signal to service with specified name using kill -<signal>
|
||||
|
||||
The service must be hosted on this host.
|
||||
|
||||
Args:
|
||||
service_name: Name of the service to stop.
|
||||
signal: signal name. See kill -l to all names
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def mask_service(self, service_name: str) -> None:
|
||||
"""Prevent the service from start by any activity by masking it.
|
||||
|
|
0
src/frostfs_testlib/http/__init__.py
Normal file
0
src/frostfs_testlib/http/__init__.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
|
@ -0,0 +1,95 @@
|
|||
import json
|
||||
import logging
|
||||
import logging.config
|
||||
|
||||
import httpx
|
||||
|
||||
from frostfs_testlib import reporter
|
||||
|
||||
timeout = httpx.Timeout(60, read=150)
|
||||
LOGGING_CONFIG = {
|
||||
"disable_existing_loggers": False,
|
||||
"version": 1,
|
||||
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
|
||||
"formatters": {
|
||||
"http": {
|
||||
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
}
|
||||
},
|
||||
"loggers": {
|
||||
"httpx": {
|
||||
"handlers": ["default"],
|
||||
"level": "DEBUG",
|
||||
},
|
||||
"httpcore": {
|
||||
"handlers": ["default"],
|
||||
"level": "ERROR",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
logging.config.dictConfig(LOGGING_CONFIG)
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
||||
|
||||
class HttpClient:
|
||||
@reporter.step("Send {method} request to {url}")
|
||||
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
|
||||
transport = httpx.HTTPTransport(verify=False, retries=5)
|
||||
client = httpx.Client(timeout=timeout, transport=transport)
|
||||
response = client.request(method, url, **kwargs)
|
||||
|
||||
self._attach_response(response)
|
||||
logger.info(f"Response: {response.status_code} => {response.text}")
|
||||
|
||||
if expected_status_code:
|
||||
assert response.status_code == expected_status_code, (
|
||||
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def _attach_response(self, response: httpx.Response):
|
||||
request = response.request
|
||||
|
||||
try:
|
||||
request_headers = json.dumps(dict(request.headers), indent=4)
|
||||
except json.JSONDecodeError:
|
||||
request_headers = str(request.headers)
|
||||
|
||||
try:
|
||||
request_body = request.read()
|
||||
try:
|
||||
request_body = request_body.decode("utf-8")
|
||||
except UnicodeDecodeError as e:
|
||||
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
|
||||
except Exception as e:
|
||||
request_body = f"Error reading request body: {str(e)}"
|
||||
|
||||
request_body = "" if request_body is None else request_body
|
||||
|
||||
try:
|
||||
response_headers = json.dumps(dict(response.headers), indent=4)
|
||||
except json.JSONDecodeError:
|
||||
response_headers = str(response.headers)
|
||||
|
||||
report = (
|
||||
f"Method: {request.method}\n\n"
|
||||
f"URL: {request.url}\n\n"
|
||||
f"Request Headers: {request_headers}\n\n"
|
||||
f"Request Body: {request_body}\n\n"
|
||||
f"Response Status Code: {response.status_code}\n\n"
|
||||
f"Response Headers: {response_headers}\n\n"
|
||||
f"Response Body: {response.text}\n\n"
|
||||
)
|
||||
curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body)
|
||||
|
||||
reporter.attach(report, "Requests Info")
|
||||
reporter.attach(curl_request, "CURL")
|
||||
|
||||
def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str:
|
||||
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
||||
data = f" -d '{data}'" if data else ""
|
||||
# Option -k means no verify SSL
|
||||
return f"curl {url} -X {method} {headers}{data} -k"
|
|
@ -53,3 +53,4 @@ HOSTING_CONFIG_FILE = os.getenv(
|
|||
)
|
||||
|
||||
MORE_LOG = os.getenv("MORE_LOG", "1")
|
||||
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"
|
||||
|
|
|
@ -16,11 +16,10 @@ OPTIONAL_NODE_UNDER_LOAD = os.getenv("OPTIONAL_NODE_UNDER_LOAD")
|
|||
OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true"))
|
||||
|
||||
# Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped.
|
||||
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(
|
||||
os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")
|
||||
)
|
||||
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true"))
|
||||
|
||||
# Set this to False for disable autouse fixture like node healthcheck during developing time.
|
||||
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(
|
||||
os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")
|
||||
)
|
||||
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true"))
|
||||
|
||||
# Use cache for fixtures with @cachec_fixture decorator
|
||||
OPTIONAL_CACHE_FIXTURES = str_to_bool(os.getenv("OPTIONAL_CACHE_FIXTURES", "false"))
|
||||
|
|
|
@ -70,6 +70,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
if bucket is None:
|
||||
bucket = string_utils.unique_name("bucket-")
|
||||
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
if object_lock_enabled_for_bucket is None:
|
||||
object_lock = ""
|
||||
elif object_lock_enabled_for_bucket:
|
||||
|
@ -103,16 +106,25 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete bucket S3")
|
||||
def delete_bucket(self, bucket: str) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
self.local_shell.exec(cmd, command_options)
|
||||
|
||||
@reporter.step("Head bucket S3")
|
||||
def head_bucket(self, bucket: str) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = f"aws {self.common_flags} s3api head-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
self.local_shell.exec(cmd)
|
||||
|
||||
@reporter.step("Put bucket versioning status")
|
||||
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-versioning --bucket {bucket} "
|
||||
f"--versioning-configuration Status={status.value} "
|
||||
|
@ -122,6 +134,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket versioning status")
|
||||
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-versioning --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -132,6 +147,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Put bucket tagging")
|
||||
def put_bucket_tagging(self, bucket: str, tags: list) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
|
||||
|
@ -141,6 +159,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket tagging")
|
||||
def get_bucket_tagging(self, bucket: str) -> list:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -151,6 +172,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket acl")
|
||||
def get_bucket_acl(self, bucket: str) -> list:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
)
|
||||
|
@ -160,6 +184,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket location")
|
||||
def get_bucket_location(self, bucket: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -170,6 +197,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("List objects S3")
|
||||
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
@ -181,6 +211,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("List objects S3 v2")
|
||||
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -195,6 +228,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("List objects versions S3")
|
||||
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -205,6 +241,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("List objects delete markers S3")
|
||||
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -228,8 +267,13 @@ class AwsCliClient(S3ClientWrapper):
|
|||
) -> str:
|
||||
if bucket is None:
|
||||
bucket = source_bucket
|
||||
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
if key is None:
|
||||
key = string_utils.unique_name("copy-object-")
|
||||
|
||||
copy_source = f"{source_bucket}/{source_key}"
|
||||
|
||||
cmd = (
|
||||
|
@ -266,6 +310,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
grant_full_control: Optional[str] = None,
|
||||
grant_read: Optional[str] = None,
|
||||
) -> str:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
if key is None:
|
||||
key = os.path.basename(filepath)
|
||||
|
||||
|
@ -297,6 +344,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Head object S3")
|
||||
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api head-object --bucket {bucket} --key {key} "
|
||||
|
@ -315,6 +365,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
object_range: Optional[tuple[int, int]] = None,
|
||||
full_output: bool = False,
|
||||
) -> dict | TestFile:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
|
@ -329,6 +382,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get object ACL")
|
||||
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-object-acl --bucket {bucket} --key {key} "
|
||||
|
@ -347,6 +403,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
grant_write: Optional[str] = None,
|
||||
grant_read: Optional[str] = None,
|
||||
) -> list:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-object-acl --bucket {bucket} --key {key} "
|
||||
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -369,6 +428,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
grant_write: Optional[str] = None,
|
||||
grant_read: Optional[str] = None,
|
||||
) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
|
||||
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -383,6 +445,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete objects S3")
|
||||
def delete_objects(self, bucket: str, keys: list[str]) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json")
|
||||
delete_structure = json.dumps(_make_objs_dict(keys))
|
||||
with open(file_path, "w") as out_file:
|
||||
|
@ -399,6 +464,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete object S3")
|
||||
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-object --bucket {bucket} "
|
||||
|
@ -409,6 +477,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete object versions S3")
|
||||
def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
# Build deletion list in S3 format
|
||||
delete_list = {
|
||||
"Objects": [
|
||||
|
@ -435,6 +506,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete object versions S3 without delete markers")
|
||||
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
# Delete objects without creating delete markers
|
||||
for object_version in object_versions:
|
||||
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
|
||||
|
@ -450,6 +524,8 @@ class AwsCliClient(S3ClientWrapper):
|
|||
part_number: int = 0,
|
||||
full_output: bool = True,
|
||||
) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
attrs = ",".join(attributes)
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
|
@ -473,6 +549,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket policy")
|
||||
def get_bucket_policy(self, bucket: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -483,6 +562,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete bucket policy")
|
||||
def delete_bucket_policy(self, bucket: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-bucket-policy --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -493,6 +575,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Put bucket policy")
|
||||
def put_bucket_policy(self, bucket: str, policy: dict) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
# Leaving it as is was in test repo. Double dumps to escape resulting string
|
||||
# Example:
|
||||
# policy = {"a": 1}
|
||||
|
@ -508,6 +593,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket cors")
|
||||
def get_bucket_cors(self, bucket: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -518,6 +606,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Put bucket cors")
|
||||
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-cors --bucket {bucket} "
|
||||
f"--cors-configuration '{json.dumps(cors_configuration)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -526,6 +617,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete bucket cors")
|
||||
def delete_bucket_cors(self, bucket: str) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -534,6 +628,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete bucket tagging")
|
||||
def delete_bucket_tagging(self, bucket: str) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-bucket-tagging --bucket {bucket} "
|
||||
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -549,6 +646,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
version_id: Optional[str] = None,
|
||||
bypass_governance_retention: Optional[bool] = None,
|
||||
) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-object-retention --bucket {bucket} --key {key} "
|
||||
|
@ -566,6 +666,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
legal_hold_status: Literal["ON", "OFF"],
|
||||
version_id: Optional[str] = None,
|
||||
) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
legal_hold = json.dumps({"Status": legal_hold_status})
|
||||
cmd = (
|
||||
|
@ -576,6 +679,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Put object tagging")
|
||||
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
||||
tagging = {"TagSet": tags}
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
|
@ -587,6 +693,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get object tagging")
|
||||
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-object-tagging --bucket {bucket} --key {key} "
|
||||
|
@ -598,6 +707,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete object tagging")
|
||||
def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} "
|
||||
|
@ -613,6 +725,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
acl: Optional[str] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
)
|
||||
|
@ -633,6 +748,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
acl: Optional[str] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3 cp {dir_path} s3://{bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --recursive --profile {self.profile}"
|
||||
|
@ -648,6 +766,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Create multipart upload S3")
|
||||
def create_multipart_upload(self, bucket: str, key: str) -> str:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api create-multipart-upload --bucket {bucket} "
|
||||
f"--key {key} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -661,6 +782,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("List multipart uploads S3")
|
||||
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-multipart-uploads --bucket {bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -671,6 +795,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Abort multipart upload S3")
|
||||
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api abort-multipart-upload --bucket {bucket} "
|
||||
f"--key {key} --upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -679,6 +806,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Upload part S3")
|
||||
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
|
||||
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
|
||||
|
@ -691,6 +821,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Upload copy part S3")
|
||||
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
|
||||
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
|
||||
|
@ -704,6 +837,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("List parts S3")
|
||||
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api list-parts --bucket {bucket} --key {key} "
|
||||
f"--upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -717,6 +853,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Complete multipart upload S3")
|
||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "parts.json")
|
||||
parts_dict = {"Parts": [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]}
|
||||
|
||||
|
@ -737,6 +876,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Put object lock configuration")
|
||||
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {bucket} "
|
||||
f"--object-lock-configuration '{json.dumps(configuration)}' --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -746,6 +888,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get object lock configuration")
|
||||
def get_object_lock_configuration(self, bucket: str):
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -756,6 +901,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Put bucket lifecycle configuration")
|
||||
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}"
|
||||
|
@ -766,6 +914,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Get bucket lifecycle configuration")
|
||||
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
@ -776,6 +927,9 @@ class AwsCliClient(S3ClientWrapper):
|
|||
|
||||
@reporter.step("Delete bucket lifecycle configuration")
|
||||
def delete_bucket_lifecycle(self, bucket: str) -> dict:
|
||||
if bucket.startswith("-") or " " in bucket:
|
||||
bucket = f'"{bucket}"'
|
||||
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api delete-bucket-lifecycle --bucket {bucket} "
|
||||
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
|
|||
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.service_registry import ServiceRegistry
|
||||
from frostfs_testlib.storage.dataclasses.metrics import Metrics
|
||||
|
||||
|
||||
class ClusterNode:
|
||||
|
@ -91,10 +91,10 @@ class ClusterNode:
|
|||
config_str = yaml.dump(new_config)
|
||||
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
||||
|
||||
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
|
||||
def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
|
||||
return self.service(service_type).config
|
||||
|
||||
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
|
||||
def service(self, service_type: ServiceClass) -> ServiceClass:
|
||||
"""
|
||||
Get a service cluster node of specified type.
|
||||
|
||||
|
|
|
@ -172,6 +172,15 @@ class ClusterStateController:
|
|||
if service_type == StorageNode:
|
||||
self.wait_after_storage_startup()
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Send sighup to all {service_type} services")
|
||||
def sighup_services_of_type(self, service_type: type[ServiceClass]):
|
||||
services = self.cluster.services(service_type)
|
||||
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
|
||||
|
||||
if service_type == StorageNode:
|
||||
self.wait_after_storage_startup()
|
||||
|
||||
@wait_for_success(600, 60)
|
||||
def wait_s3gate(self, s3gate: S3Gate):
|
||||
with reporter.step(f"Wait for {s3gate} reconnection"):
|
||||
|
@ -206,21 +215,27 @@ class ClusterStateController:
|
|||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Stop {service_type} service on {node}")
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True):
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
|
||||
service = node.service(service_type)
|
||||
service.stop_service(mask)
|
||||
self.stopped_services.add(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Send sighup to {service_type} service on {node}")
|
||||
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
|
||||
service = node.service(service_type)
|
||||
service.send_signal_to_service("SIGHUP")
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Start {service_type} service on {node}")
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
|
||||
service = node.service(service_type)
|
||||
service.start_service()
|
||||
self.stopped_services.discard(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step("Start all stopped {service_type} services")
|
||||
def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
|
||||
def start_stopped_services_of_type(self, service_type: ServiceClass):
|
||||
stopped_svc = self._get_stopped_by_type(service_type)
|
||||
if not stopped_svc:
|
||||
return
|
||||
|
|
|
@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
|
|||
self.cluster = self.csc.cluster
|
||||
|
||||
@reporter.step("Change configuration for {service_type} on all nodes")
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
|
||||
services = self.cluster.services(service_type)
|
||||
nodes = self.cluster.nodes(services)
|
||||
self.services_with_changed_config.update([(node, service_type) for node in nodes])
|
||||
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
if not sighup:
|
||||
self.csc.stop_services_of_type(service_type)
|
||||
|
||||
parallel([node.config(service_type).set for node in nodes], values=values)
|
||||
self.csc.start_services_of_type(service_type)
|
||||
if not sighup:
|
||||
self.csc.start_services_of_type(service_type)
|
||||
else:
|
||||
self.csc.sighup_services_of_type(service_type)
|
||||
|
||||
@reporter.step("Change configuration for {service_type} on {node}")
|
||||
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
|
||||
|
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
|
|||
self.csc.start_service_of_type(node, service_type)
|
||||
|
||||
@reporter.step("Revert all configuration changes")
|
||||
def revert_all(self):
|
||||
def revert_all(self, sighup: bool = False):
|
||||
if not self.services_with_changed_config:
|
||||
return
|
||||
|
||||
parallel(self._revert_svc, self.services_with_changed_config)
|
||||
parallel(self._revert_svc, self.services_with_changed_config, sighup)
|
||||
self.services_with_changed_config.clear()
|
||||
|
||||
self.csc.start_all_stopped_services()
|
||||
if not sighup:
|
||||
self.csc.start_all_stopped_services()
|
||||
|
||||
# TODO: parallel can't have multiple parallel_items :(
|
||||
@reporter.step("Revert all configuration {node_and_service}")
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]):
|
||||
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
|
||||
node, service_type = node_and_service
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
service = node.service(service_type)
|
||||
|
||||
if not sighup:
|
||||
self.csc.stop_service_of_type(node, service_type)
|
||||
|
||||
node.config(service_type).revert()
|
||||
|
||||
if sighup:
|
||||
service.send_signal_to_service("SIGHUP")
|
||||
|
|
|
@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
|
|||
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
|
||||
self.host.start_service(self.name)
|
||||
|
||||
def send_signal_to_service(self, signal: str):
|
||||
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
|
||||
self.host.send_signal_to_service(self.name, signal)
|
||||
|
||||
@abstractmethod
|
||||
def service_healthcheck(self) -> bool:
|
||||
"""Service healthcheck."""
|
||||
|
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
|
|||
|
||||
if attribute_name not in config.attributes:
|
||||
if default_attribute_name is None:
|
||||
raise RuntimeError(
|
||||
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
|
||||
)
|
||||
raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
|
||||
|
||||
return config.attributes[default_attribute_name]
|
||||
|
||||
|
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
|
|||
return self.host.get_service_config(self.name)
|
||||
|
||||
def get_service_uptime(self, service: str) -> datetime:
|
||||
result = self.host.get_shell().exec(
|
||||
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
|
||||
)
|
||||
result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
|
||||
start_time = parser.parse(result.stdout.strip())
|
||||
current_time = datetime.now(tz=timezone.utc)
|
||||
active_time = current_time - start_time
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import itertools
|
||||
import traceback
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from contextlib import contextmanager
|
||||
from typing import Callable, Collection, Optional, Union
|
||||
|
@ -55,7 +56,42 @@ def parallel(
|
|||
# Check for exceptions
|
||||
exceptions = [future.exception() for future in futures if future.exception()]
|
||||
if exceptions:
|
||||
message = "\n".join([str(e) for e in exceptions])
|
||||
# Prettify exception in parallel with all underlying stack traces
|
||||
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
|
||||
#
|
||||
# RuntimeError: The following exceptions occured during parallel run:
|
||||
# 1) Exception one text
|
||||
# 2) Exception two text
|
||||
# 3) Exception three text
|
||||
# TRACES:
|
||||
# ==== 1 ====
|
||||
# Traceback (most recent call last):
|
||||
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
|
||||
# result = self.fn(*self.args, **self.kwargs)
|
||||
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
|
||||
# raise RuntimeError(f"Exception one text")
|
||||
# RuntimeError: Exception one text
|
||||
#
|
||||
# ==== 2 ====
|
||||
# Traceback (most recent call last):
|
||||
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
|
||||
# result = self.fn(*self.args, **self.kwargs)
|
||||
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
|
||||
# raise RuntimeError(f"Exception two text")
|
||||
# RuntimeError: Exception two text
|
||||
#
|
||||
# ==== 3 ====
|
||||
# Traceback (most recent call last):
|
||||
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
|
||||
# result = self.fn(*self.args, **self.kwargs)
|
||||
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
|
||||
# raise RuntimeError(f"Exception three text")
|
||||
# RuntimeError: Exception three text
|
||||
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
|
||||
stack_traces = "\n".join(
|
||||
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
|
||||
)
|
||||
message = f"{short_summary}\nTRACES:\n{stack_traces}"
|
||||
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
|
||||
return futures
|
||||
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
import inspect
|
||||
import logging
|
||||
import os
|
||||
from functools import wraps
|
||||
from time import sleep, time
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
from _pytest.outcomes import Failed
|
||||
from pytest import fail
|
||||
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.resources.common import ASSETS_DIR
|
||||
from frostfs_testlib.utils.func_utils import format_by_args
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
@ -128,6 +131,42 @@ def run_optionally(enabled: bool, mock_value: Any = True):
|
|||
return deco
|
||||
|
||||
|
||||
def cached_fixture(enabled: bool):
|
||||
"""
|
||||
Decorator to cache fixtures.
|
||||
MUST be placed after @pytest.fixture and before @allure decorators.
|
||||
|
||||
Args:
|
||||
enabled: if true, decorated func will be cached.
|
||||
"""
|
||||
|
||||
def deco(func):
|
||||
@wraps(func)
|
||||
def func_impl(*a, **kw):
|
||||
# TODO: *a and *kw should be parsed to some kind of hashsum and used in filename to prevent cache load from different parameters
|
||||
cache_file = os.path.join(ASSETS_DIR, f"fixture_cache_{func.__name__}.yml")
|
||||
|
||||
if enabled and os.path.exists(cache_file):
|
||||
with open(cache_file, "r") as cache_input:
|
||||
return yaml.load(cache_input, Loader=yaml.Loader)
|
||||
|
||||
result = func(*a, **kw)
|
||||
|
||||
if enabled:
|
||||
with open(cache_file, "w") as cache_output:
|
||||
yaml.dump(result, cache_output)
|
||||
return result
|
||||
|
||||
# TODO: cache yielding fixtures
|
||||
@wraps(func)
|
||||
def gen_impl(*a, **kw):
|
||||
raise NotImplementedError("Not implemented for yielding fixtures")
|
||||
|
||||
return gen_impl if inspect.isgeneratorfunction(func) else func_impl
|
||||
|
||||
return deco
|
||||
|
||||
|
||||
def wait_for_success(
|
||||
max_wait_time: int = 60,
|
||||
interval: int = 1,
|
||||
|
|
|
@ -9,13 +9,12 @@ import csv
|
|||
import json
|
||||
import logging
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from io import StringIO
|
||||
from textwrap import shorten
|
||||
from typing import Dict, List, Optional, TypedDict, Union
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
import pexpect
|
||||
|
||||
|
@ -75,22 +74,75 @@ def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: date
|
|||
reporter.attach(command_attachment, "Command execution")
|
||||
|
||||
|
||||
def log_command_execution(url: str, cmd: str, output: Union[str, dict], params: Optional[dict] = None) -> None:
|
||||
def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[dict] = None, **kwargs) -> None:
|
||||
logger.info(f"{cmd}: {output}")
|
||||
|
||||
with suppress(Exception):
|
||||
json_output = json.dumps(output, indent=4, sort_keys=True)
|
||||
output = json_output
|
||||
if not params:
|
||||
params = {}
|
||||
|
||||
output_params = params
|
||||
|
||||
try:
|
||||
json_params = json.dumps(params, indent=4, sort_keys=True)
|
||||
json_params = json.dumps(params, indent=4, sort_keys=True, default=str)
|
||||
except TypeError as err:
|
||||
logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}")
|
||||
else:
|
||||
params = json_params
|
||||
output_params = json_params
|
||||
|
||||
command_attachment = f"COMMAND: '{cmd}'\n" f"URL: {url}\n" f"PARAMS:\n{params}\n" f"OUTPUT:\n{output}\n"
|
||||
reporter.attach(command_attachment, "Command execution")
|
||||
output = json.dumps(output, indent=4, sort_keys=True, default=str)
|
||||
|
||||
command_execution = f"COMMAND: '{cmd}'\n" f"URL: {kwargs['endpoint']}\n" f"PARAMS:\n{output_params}\n" f"OUTPUT:\n{output}\n"
|
||||
aws_command = _convert_request_to_aws_cli_command(cmd, params, **kwargs)
|
||||
|
||||
reporter.attach(command_execution, "Command execution")
|
||||
reporter.attach(aws_command, "AWS CLI Command")
|
||||
|
||||
|
||||
def _convert_request_to_aws_cli_command(command: str, params: dict, **kwargs) -> str:
|
||||
overriden_names = [_convert_json_name_to_aws_cli(name) for name in kwargs.keys()]
|
||||
command = command.replace("_", "-")
|
||||
options = []
|
||||
|
||||
for name, value in params.items():
|
||||
name = _convert_json_name_to_aws_cli(name)
|
||||
|
||||
# To override parameters for AWS CLI
|
||||
if name in overriden_names:
|
||||
continue
|
||||
|
||||
if option := _create_option(name, value):
|
||||
options.append(option)
|
||||
|
||||
for name, value in kwargs.items():
|
||||
name = _convert_json_name_to_aws_cli(name)
|
||||
if option := _create_option(name, value):
|
||||
options.append(option)
|
||||
|
||||
options = " ".join(options)
|
||||
api = "s3api" if "s3" in kwargs["endpoint"] else "iam"
|
||||
return f"aws --no-verify-ssl --no-paginate {api} {command} {options}"
|
||||
|
||||
|
||||
def _convert_json_name_to_aws_cli(name: str) -> str:
|
||||
specific_names = {"CORSConfiguration": "cors-configuration"}
|
||||
|
||||
if aws_cli_name := specific_names.get(name):
|
||||
return aws_cli_name
|
||||
return re.sub(r"([a-z])([A-Z])", r"\1 \2", name).lower().replace(" ", "-").replace("_", "-")
|
||||
|
||||
|
||||
def _create_option(name: str, value: Any) -> str | None:
|
||||
if isinstance(value, bool) and value:
|
||||
return f"--{name}"
|
||||
|
||||
if isinstance(value, dict):
|
||||
value = json.dumps(value, indent=4, sort_keys=True, default=str)
|
||||
return f"--{name} '{value}'"
|
||||
|
||||
if value:
|
||||
return f"--{name} {value}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
|
||||
|
|
Loading…
Reference in a new issue