Compare commits

..

10 commits

Author SHA1 Message Date
451de5e07e [#320] Added shards detach function
Signed-off-by: Dmitry Anurin <danurin@yadro.com>
2024-11-14 16:22:06 +03:00
f24bfc06fd [#319] Add cached fixture feature
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 17:46:03 +03:00
47bc11835b [#318] Add tombstone expiration test
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 10:11:03 +03:00
2a90ec74ff [#317] update morph rule chain 2024-11-12 16:01:12 +03:00
95b32a036a [#316] Extend parallel exception message output
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-12 12:28:10 +03:00
55d8ee5da0 [#315] Add http client
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-11-08 15:51:32 +03:00
ea40940514 [#313] update force_new_epoch 2024-11-05 12:37:56 +03:00
6f1baf3cf6 [#312] update morph remove_nodes 2024-11-01 15:50:17 +03:00
26139767f4 [#311] Add AWS CLI command to report from Boto3 request
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-10-31 12:14:51 +00:00
3d6a356e20 [#306] Fix handling of bucket names in AWS CLI
- Add quotes around container names if they contain spaces or `-`.

Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-10-31 12:14:36 +00:00
17 changed files with 1090 additions and 429 deletions

View file

@ -122,7 +122,9 @@ class FrostfsAdmMorph(CliCommand):
**{param: param_value for param, param_value in locals().items() if param not in ["self"]}, **{param: param_value for param, param_value in locals().items() if param not in ["self"]},
) )
def force_new_epoch(self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None) -> CommandResult: def force_new_epoch(
self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None, delta: Optional[int] = None
) -> CommandResult:
"""Create new FrostFS epoch event in the side chain. """Create new FrostFS epoch event in the side chain.
Args: Args:
@ -341,7 +343,6 @@ class FrostfsAdmMorph(CliCommand):
return self._execute( return self._execute(
f"morph remove-nodes {' '.join(node_netmap_keys)}", f"morph remove-nodes {' '.join(node_netmap_keys)}",
**{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]}, **{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]},
**{param: param_value for param, param_value in locals().items() if param not in ["self", "node_netmap_keys"]},
) )
def add_rule( def add_rule(
@ -352,6 +353,7 @@ class FrostfsAdmMorph(CliCommand):
rule: Optional[list[str]] = None, rule: Optional[list[str]] = None,
path: Optional[str] = None, path: Optional[str] = None,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -382,6 +384,7 @@ class FrostfsAdmMorph(CliCommand):
target_name: str, target_name: str,
target_type: str, target_type: str,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -409,6 +412,7 @@ class FrostfsAdmMorph(CliCommand):
target_type: str, target_type: str,
target_name: Optional[str] = None, target_name: Optional[str] = None,
rpc_endpoint: Optional[str] = None, rpc_endpoint: Optional[str] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -435,6 +439,7 @@ class FrostfsAdmMorph(CliCommand):
target_name: str, target_name: str,
target_type: str, target_type: str,
all: Optional[bool] = None, all: Optional[bool] = None,
chain_name: Optional[str] = None,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,

View file

@ -241,3 +241,21 @@ class FrostfsCliShards(CliCommand):
"control shards evacuation status", "control shards evacuation status",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def detach(self, endpoint: str, address: Optional[str] = None, id: Optional[str] = None, timeout: Optional[str] = None):
"""
Detach and close the shards
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
id: List of shard IDs in base58 encoding
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards detach",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -164,6 +164,9 @@ class DockerHost(Host):
return volume_path return volume_path
def send_signal_to_service(self, service_name: str, signal: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_metabase(self, service_name: str) -> None: def delete_metabase(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker") raise NotImplementedError("Not implemented for docker")

View file

@ -117,6 +117,17 @@ class Host(ABC):
service_name: Name of the service to stop. service_name: Name of the service to stop.
""" """
@abstractmethod
def send_signal_to_service(self, service_name: str, signal: str) -> None:
"""Send signal to service with specified name using kill -<signal>
The service must be hosted on this host.
Args:
service_name: Name of the service to stop.
signal: signal name. See kill -l to all names
"""
@abstractmethod @abstractmethod
def mask_service(self, service_name: str) -> None: def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it. """Prevent the service from start by any activity by masking it.

View file

View file

@ -0,0 +1,95 @@
import json
import logging
import logging.config
import httpx
from frostfs_testlib import reporter
timeout = httpx.Timeout(60, read=150)
LOGGING_CONFIG = {
"disable_existing_loggers": False,
"version": 1,
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
"formatters": {
"http": {
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"loggers": {
"httpx": {
"handlers": ["default"],
"level": "DEBUG",
},
"httpcore": {
"handlers": ["default"],
"level": "ERROR",
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("NeoLogger")
class HttpClient:
@reporter.step("Send {method} request to {url}")
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
transport = httpx.HTTPTransport(verify=False, retries=5)
client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs)
self._attach_response(response)
logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code:
assert response.status_code == expected_status_code, (
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
)
return response
def _attach_response(self, response: httpx.Response):
request = response.request
try:
request_headers = json.dumps(dict(request.headers), indent=4)
except json.JSONDecodeError:
request_headers = str(request.headers)
try:
request_body = request.read()
try:
request_body = request_body.decode("utf-8")
except UnicodeDecodeError as e:
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
except Exception as e:
request_body = f"Error reading request body: {str(e)}"
request_body = "" if request_body is None else request_body
try:
response_headers = json.dumps(dict(response.headers), indent=4)
except json.JSONDecodeError:
response_headers = str(response.headers)
report = (
f"Method: {request.method}\n\n"
f"URL: {request.url}\n\n"
f"Request Headers: {request_headers}\n\n"
f"Request Body: {request_body}\n\n"
f"Response Status Code: {response.status_code}\n\n"
f"Response Headers: {response_headers}\n\n"
f"Response Body: {response.text}\n\n"
)
curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body)
reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL")
def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else ""
# Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -53,3 +53,4 @@ HOSTING_CONFIG_FILE = os.getenv(
) )
MORE_LOG = os.getenv("MORE_LOG", "1") MORE_LOG = os.getenv("MORE_LOG", "1")
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"

View file

@ -16,11 +16,10 @@ OPTIONAL_NODE_UNDER_LOAD = os.getenv("OPTIONAL_NODE_UNDER_LOAD")
OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true")) OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true"))
# Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped. # Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped.
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool( OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true"))
os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")
)
# Set this to False for disable autouse fixture like node healthcheck during developing time. # Set this to False for disable autouse fixture like node healthcheck during developing time.
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool( OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true"))
os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")
) # Use cache for fixtures with @cachec_fixture decorator
OPTIONAL_CACHE_FIXTURES = str_to_bool(os.getenv("OPTIONAL_CACHE_FIXTURES", "false"))

View file

@ -70,6 +70,9 @@ class AwsCliClient(S3ClientWrapper):
if bucket is None: if bucket is None:
bucket = string_utils.unique_name("bucket-") bucket = string_utils.unique_name("bucket-")
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
if object_lock_enabled_for_bucket is None: if object_lock_enabled_for_bucket is None:
object_lock = "" object_lock = ""
elif object_lock_enabled_for_bucket: elif object_lock_enabled_for_bucket:
@ -103,16 +106,25 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket S3") @reporter.step("Delete bucket S3")
def delete_bucket(self, bucket: str) -> None: def delete_bucket(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}" cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd, command_options) self.local_shell.exec(cmd, command_options)
@reporter.step("Head bucket S3") @reporter.step("Head bucket S3")
def head_bucket(self, bucket: str) -> None: def head_bucket(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api head-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}" cmd = f"aws {self.common_flags} s3api head-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
self.local_shell.exec(cmd) self.local_shell.exec(cmd)
@reporter.step("Put bucket versioning status") @reporter.step("Put bucket versioning status")
def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None: def put_bucket_versioning(self, bucket: str, status: VersioningStatus) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-versioning --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-versioning --bucket {bucket} "
f"--versioning-configuration Status={status.value} " f"--versioning-configuration Status={status.value} "
@ -122,6 +134,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket versioning status") @reporter.step("Get bucket versioning status")
def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]: def get_bucket_versioning_status(self, bucket: str) -> Literal["Enabled", "Suspended"]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-versioning --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-versioning --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -132,6 +147,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket tagging") @reporter.step("Put bucket tagging")
def put_bucket_tagging(self, bucket: str, tags: list) -> None: def put_bucket_tagging(self, bucket: str, tags: list) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]} tags_json = {"TagSet": [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]}
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-tagging --bucket {bucket} "
@ -141,6 +159,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket tagging") @reporter.step("Get bucket tagging")
def get_bucket_tagging(self, bucket: str) -> list: def get_bucket_tagging(self, bucket: str) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-tagging --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -151,6 +172,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
) )
@ -160,6 +184,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket location") @reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict: def get_bucket_location(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-location --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -170,6 +197,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -181,6 +211,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects S3 v2") @reporter.step("List objects S3 v2")
def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects_v2(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} " f"aws {self.common_flags} s3api list-objects-v2 --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -195,6 +228,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects versions S3") @reporter.step("List objects versions S3")
def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict: def list_objects_versions(self, bucket: str, full_output: bool = False) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} " f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -205,6 +241,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List objects delete markers S3") @reporter.step("List objects delete markers S3")
def list_delete_markers(self, bucket: str, full_output: bool = False) -> list: def list_delete_markers(self, bucket: str, full_output: bool = False) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} " f"aws {self.common_flags} s3api list-object-versions --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -228,8 +267,13 @@ class AwsCliClient(S3ClientWrapper):
) -> str: ) -> str:
if bucket is None: if bucket is None:
bucket = source_bucket bucket = source_bucket
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
if key is None: if key is None:
key = string_utils.unique_name("copy-object-") key = string_utils.unique_name("copy-object-")
copy_source = f"{source_bucket}/{source_key}" copy_source = f"{source_bucket}/{source_key}"
cmd = ( cmd = (
@ -266,6 +310,9 @@ class AwsCliClient(S3ClientWrapper):
grant_full_control: Optional[str] = None, grant_full_control: Optional[str] = None,
grant_read: Optional[str] = None, grant_read: Optional[str] = None,
) -> str: ) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
if key is None: if key is None:
key = os.path.basename(filepath) key = os.path.basename(filepath)
@ -297,6 +344,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Head object S3") @reporter.step("Head object S3")
def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: def head_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
f"aws {self.common_flags} s3api head-object --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api head-object --bucket {bucket} --key {key} "
@ -315,6 +365,9 @@ class AwsCliClient(S3ClientWrapper):
object_range: Optional[tuple[int, int]] = None, object_range: Optional[tuple[int, int]] = None,
full_output: bool = False, full_output: bool = False,
) -> dict | TestFile: ) -> dict | TestFile:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-"))) test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
@ -329,6 +382,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get object ACL") @reporter.step("Get object ACL")
def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: def get_object_acl(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-object-acl --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api get-object-acl --bucket {bucket} --key {key} "
@ -347,6 +403,9 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None, grant_write: Optional[str] = None,
grant_read: Optional[str] = None, grant_read: Optional[str] = None,
) -> list: ) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-object-acl --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api put-object-acl --bucket {bucket} --key {key} "
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}" f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -369,6 +428,9 @@ class AwsCliClient(S3ClientWrapper):
grant_write: Optional[str] = None, grant_write: Optional[str] = None,
grant_read: Optional[str] = None, grant_read: Optional[str] = None,
) -> None: ) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-acl --bucket {bucket} "
f" --endpoint {self.s3gate_endpoint} --profile {self.profile}" f" --endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -383,6 +445,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete objects S3") @reporter.step("Delete objects S3")
def delete_objects(self, bucket: str, keys: list[str]) -> dict: def delete_objects(self, bucket: str, keys: list[str]) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json") file_path = os.path.join(os.getcwd(), ASSETS_DIR, "delete.json")
delete_structure = json.dumps(_make_objs_dict(keys)) delete_structure = json.dumps(_make_objs_dict(keys))
with open(file_path, "w") as out_file: with open(file_path, "w") as out_file:
@ -399,6 +464,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object S3") @reporter.step("Delete object S3")
def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict: def delete_object(self, bucket: str, key: str, version_id: Optional[str] = None) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-object --bucket {bucket} " f"aws {self.common_flags} s3api delete-object --bucket {bucket} "
@ -409,6 +477,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object versions S3") @reporter.step("Delete object versions S3")
def delete_object_versions(self, bucket: str, object_versions: list) -> dict: def delete_object_versions(self, bucket: str, object_versions: list) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
# Build deletion list in S3 format # Build deletion list in S3 format
delete_list = { delete_list = {
"Objects": [ "Objects": [
@ -435,6 +506,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object versions S3 without delete markers") @reporter.step("Delete object versions S3 without delete markers")
def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None: def delete_object_versions_without_dm(self, bucket: str, object_versions: list) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
# Delete objects without creating delete markers # Delete objects without creating delete markers
for object_version in object_versions: for object_version in object_versions:
self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"]) self.delete_object(bucket=bucket, key=object_version["Key"], version_id=object_version["VersionId"])
@ -450,6 +524,8 @@ class AwsCliClient(S3ClientWrapper):
part_number: int = 0, part_number: int = 0,
full_output: bool = True, full_output: bool = True,
) -> dict: ) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
attrs = ",".join(attributes) attrs = ",".join(attributes)
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
@ -473,6 +549,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket policy") @reporter.step("Get bucket policy")
def get_bucket_policy(self, bucket: str) -> dict: def get_bucket_policy(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -483,6 +562,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket policy") @reporter.step("Delete bucket policy")
def delete_bucket_policy(self, bucket: str) -> dict: def delete_bucket_policy(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-bucket-policy --bucket {bucket} " f"aws {self.common_flags} s3api delete-bucket-policy --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -493,6 +575,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket policy") @reporter.step("Put bucket policy")
def put_bucket_policy(self, bucket: str, policy: dict) -> None: def put_bucket_policy(self, bucket: str, policy: dict) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
# Leaving it as is was in test repo. Double dumps to escape resulting string # Leaving it as is was in test repo. Double dumps to escape resulting string
# Example: # Example:
# policy = {"a": 1} # policy = {"a": 1}
@ -508,6 +593,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket cors") @reporter.step("Get bucket cors")
def get_bucket_cors(self, bucket: str) -> dict: def get_bucket_cors(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -518,6 +606,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket cors") @reporter.step("Put bucket cors")
def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None: def put_bucket_cors(self, bucket: str, cors_configuration: dict) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-cors --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-cors --bucket {bucket} "
f"--cors-configuration '{json.dumps(cors_configuration)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--cors-configuration '{json.dumps(cors_configuration)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -526,6 +617,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket cors") @reporter.step("Delete bucket cors")
def delete_bucket_cors(self, bucket: str) -> None: def delete_bucket_cors(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} " f"aws {self.common_flags} s3api delete-bucket-cors --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -534,6 +628,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket tagging") @reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None: def delete_bucket_tagging(self, bucket: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-bucket-tagging --bucket {bucket} " f"aws {self.common_flags} s3api delete-bucket-tagging --bucket {bucket} "
f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
@ -549,6 +646,9 @@ class AwsCliClient(S3ClientWrapper):
version_id: Optional[str] = None, version_id: Optional[str] = None,
bypass_governance_retention: Optional[bool] = None, bypass_governance_retention: Optional[bool] = None,
) -> None: ) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-object-retention --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api put-object-retention --bucket {bucket} --key {key} "
@ -566,6 +666,9 @@ class AwsCliClient(S3ClientWrapper):
legal_hold_status: Literal["ON", "OFF"], legal_hold_status: Literal["ON", "OFF"],
version_id: Optional[str] = None, version_id: Optional[str] = None,
) -> None: ) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
legal_hold = json.dumps({"Status": legal_hold_status}) legal_hold = json.dumps({"Status": legal_hold_status})
cmd = ( cmd = (
@ -576,6 +679,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put object tagging") @reporter.step("Put object tagging")
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None: def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = "") -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags] tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags} tagging = {"TagSet": tags}
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
@ -587,6 +693,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get object tagging") @reporter.step("Get object tagging")
def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list: def get_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> list:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-object-tagging --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api get-object-tagging --bucket {bucket} --key {key} "
@ -598,6 +707,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete object tagging") @reporter.step("Delete object tagging")
def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None: def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
version = f" --version-id {version_id}" if version_id else "" version = f" --version-id {version_id}" if version_id else ""
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} " f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} "
@ -613,6 +725,9 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None, acl: Optional[str] = None,
metadata: Optional[dict] = None, metadata: Optional[dict] = None,
) -> dict: ) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"aws {self.common_flags} s3 sync {dir_path} s3://{bucket} " f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
) )
@ -633,6 +748,9 @@ class AwsCliClient(S3ClientWrapper):
acl: Optional[str] = None, acl: Optional[str] = None,
metadata: Optional[dict] = None, metadata: Optional[dict] = None,
) -> dict: ) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3 cp {dir_path} s3://{bucket} " f"aws {self.common_flags} s3 cp {dir_path} s3://{bucket} "
f"--endpoint-url {self.s3gate_endpoint} --recursive --profile {self.profile}" f"--endpoint-url {self.s3gate_endpoint} --recursive --profile {self.profile}"
@ -648,6 +766,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Create multipart upload S3") @reporter.step("Create multipart upload S3")
def create_multipart_upload(self, bucket: str, key: str) -> str: def create_multipart_upload(self, bucket: str, key: str) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api create-multipart-upload --bucket {bucket} " f"aws {self.common_flags} s3api create-multipart-upload --bucket {bucket} "
f"--key {key} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--key {key} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -661,6 +782,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List multipart uploads S3") @reporter.step("List multipart uploads S3")
def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]: def list_multipart_uploads(self, bucket: str) -> Optional[list[dict]]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api list-multipart-uploads --bucket {bucket} " f"aws {self.common_flags} s3api list-multipart-uploads --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -671,6 +795,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Abort multipart upload S3") @reporter.step("Abort multipart upload S3")
def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None: def abort_multipart_upload(self, bucket: str, key: str, upload_id: str) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api abort-multipart-upload --bucket {bucket} " f"aws {self.common_flags} s3api abort-multipart-upload --bucket {bucket} "
f"--key {key} --upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--key {key} --upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -679,6 +806,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Upload part S3") @reporter.step("Upload part S3")
def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str: def upload_part(self, bucket: str, key: str, upload_id: str, part_num: int, filepath: str) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api upload-part --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --body {filepath} " f"--upload-id {upload_id} --part-number {part_num} --body {filepath} "
@ -691,6 +821,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Upload copy part S3") @reporter.step("Upload copy part S3")
def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str: def upload_part_copy(self, bucket: str, key: str, upload_id: str, part_num: int, copy_source: str) -> str:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api upload-part-copy --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} " f"--upload-id {upload_id} --part-number {part_num} --copy-source {copy_source} "
@ -704,6 +837,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("List parts S3") @reporter.step("List parts S3")
def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]: def list_parts(self, bucket: str, key: str, upload_id: str) -> list[dict]:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api list-parts --bucket {bucket} --key {key} " f"aws {self.common_flags} s3api list-parts --bucket {bucket} --key {key} "
f"--upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--upload-id {upload_id} --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -717,6 +853,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
file_path = os.path.join(os.getcwd(), ASSETS_DIR, "parts.json") file_path = os.path.join(os.getcwd(), ASSETS_DIR, "parts.json")
parts_dict = {"Parts": [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]} parts_dict = {"Parts": [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]}
@ -737,6 +876,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put object lock configuration") @reporter.step("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict: def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {bucket} " f"aws {self.common_flags} s3api put-object-lock-configuration --bucket {bucket} "
f"--object-lock-configuration '{json.dumps(configuration)}' --endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--object-lock-configuration '{json.dumps(configuration)}' --endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -746,6 +888,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get object lock configuration") @reporter.step("Get object lock configuration")
def get_object_lock_configuration(self, bucket: str): def get_object_lock_configuration(self, bucket: str):
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {bucket} " f"aws {self.common_flags} s3api get-object-lock-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -756,6 +901,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Put bucket lifecycle configuration") @reporter.step("Put bucket lifecycle configuration")
def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict: def put_bucket_lifecycle_configuration(self, bucket: str, lifecycle_configuration: dict, dumped_configuration: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} " f"aws {self.common_flags} s3api put-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}" f"--endpoint-url {self.s3gate_endpoint} --lifecycle-configuration file://{dumped_configuration} --profile {self.profile}"
@ -766,6 +914,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Get bucket lifecycle configuration") @reporter.step("Get bucket lifecycle configuration")
def get_bucket_lifecycle_configuration(self, bucket: str) -> dict: def get_bucket_lifecycle_configuration(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} " f"aws {self.common_flags} s3api get-bucket-lifecycle-configuration --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
@ -776,6 +927,9 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Delete bucket lifecycle configuration") @reporter.step("Delete bucket lifecycle configuration")
def delete_bucket_lifecycle(self, bucket: str) -> dict: def delete_bucket_lifecycle(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"'
cmd = ( cmd = (
f"aws {self.common_flags} s3api delete-bucket-lifecycle --bucket {bucket} " f"aws {self.common_flags} s3api delete-bucket-lifecycle --bucket {bucket} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}" f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"

File diff suppressed because it is too large Load diff

View file

@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.metrics import Metrics
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry from frostfs_testlib.storage.service_registry import ServiceRegistry
from frostfs_testlib.storage.dataclasses.metrics import Metrics
class ClusterNode: class ClusterNode:
@ -91,10 +91,10 @@ class ClusterNode:
config_str = yaml.dump(new_config) config_str = yaml.dump(new_config)
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}") shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml: def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
return self.service(service_type).config return self.service(service_type).config
def service(self, service_type: type[ServiceClass]) -> ServiceClass: def service(self, service_type: ServiceClass) -> ServiceClass:
""" """
Get a service cluster node of specified type. Get a service cluster node of specified type.

View file

@ -172,6 +172,15 @@ class ClusterStateController:
if service_type == StorageNode: if service_type == StorageNode:
self.wait_after_storage_startup() self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to all {service_type} services")
def sighup_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
if service_type == StorageNode:
self.wait_after_storage_startup()
@wait_for_success(600, 60) @wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate): def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"): with reporter.step(f"Wait for {s3gate} reconnection"):
@ -206,21 +215,27 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Stop {service_type} service on {node}") @reporter.step("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True): def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
service = node.service(service_type) service = node.service(service_type)
service.stop_service(mask) service.stop_service(mask)
self.stopped_services.add(service) self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to {service_type} service on {node}")
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type)
service.send_signal_to_service("SIGHUP")
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start {service_type} service on {node}") @reporter.step("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type) service = node.service(service_type)
service.start_service() service.start_service()
self.stopped_services.discard(service) self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start all stopped {service_type} services") @reporter.step("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]): def start_stopped_services_of_type(self, service_type: ServiceClass):
stopped_svc = self._get_stopped_by_type(service_type) stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc: if not stopped_svc:
return return

View file

@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
self.cluster = self.csc.cluster self.cluster = self.csc.cluster
@reporter.step("Change configuration for {service_type} on all nodes") @reporter.step("Change configuration for {service_type} on all nodes")
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
nodes = self.cluster.nodes(services) nodes = self.cluster.nodes(services)
self.services_with_changed_config.update([(node, service_type) for node in nodes]) self.services_with_changed_config.update([(node, service_type) for node in nodes])
if not sighup:
self.csc.stop_services_of_type(service_type) self.csc.stop_services_of_type(service_type)
parallel([node.config(service_type).set for node in nodes], values=values) parallel([node.config(service_type).set for node in nodes], values=values)
if not sighup:
self.csc.start_services_of_type(service_type) self.csc.start_services_of_type(service_type)
else:
self.csc.sighup_services_of_type(service_type)
@reporter.step("Change configuration for {service_type} on {node}") @reporter.step("Change configuration for {service_type} on {node}")
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
self.csc.start_service_of_type(node, service_type) self.csc.start_service_of_type(node, service_type)
@reporter.step("Revert all configuration changes") @reporter.step("Revert all configuration changes")
def revert_all(self): def revert_all(self, sighup: bool = False):
if not self.services_with_changed_config: if not self.services_with_changed_config:
return return
parallel(self._revert_svc, self.services_with_changed_config) parallel(self._revert_svc, self.services_with_changed_config, sighup)
self.services_with_changed_config.clear() self.services_with_changed_config.clear()
if not sighup:
self.csc.start_all_stopped_services() self.csc.start_all_stopped_services()
# TODO: parallel can't have multiple parallel_items :( # TODO: parallel can't have multiple parallel_items :(
@reporter.step("Revert all configuration {node_and_service}") @reporter.step("Revert all configuration {node_and_service}")
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]): def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
node, service_type = node_and_service node, service_type = node_and_service
service = node.service(service_type)
if not sighup:
self.csc.stop_service_of_type(node, service_type) self.csc.stop_service_of_type(node, service_type)
node.config(service_type).revert() node.config(service_type).revert()
if sighup:
service.send_signal_to_service("SIGHUP")

View file

@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
with reporter.step(f"Start {self.name} service on {self.host.config.address}"): with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name) self.host.start_service(self.name)
def send_signal_to_service(self, signal: str):
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
self.host.send_signal_to_service(self.name, signal)
@abstractmethod @abstractmethod
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
if attribute_name not in config.attributes: if attribute_name not in config.attributes:
if default_attribute_name is None: if default_attribute_name is None:
raise RuntimeError( raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
)
return config.attributes[default_attribute_name] return config.attributes[default_attribute_name]
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
return self.host.get_service_config(self.name) return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime: def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec( result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip()) start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc) current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time active_time = current_time - start_time

View file

@ -1,4 +1,5 @@
import itertools import itertools
import traceback
from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager from contextlib import contextmanager
from typing import Callable, Collection, Optional, Union from typing import Callable, Collection, Optional, Union
@ -55,7 +56,42 @@ def parallel(
# Check for exceptions # Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()] exceptions = [future.exception() for future in futures if future.exception()]
if exceptions: if exceptions:
message = "\n".join([str(e) for e in exceptions]) # Prettify exception in parallel with all underlying stack traces
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
#
# RuntimeError: The following exceptions occured during parallel run:
# 1) Exception one text
# 2) Exception two text
# 3) Exception three text
# TRACES:
# ==== 1 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception one text")
# RuntimeError: Exception one text
#
# ==== 2 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception two text")
# RuntimeError: Exception two text
#
# ==== 3 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception three text")
# RuntimeError: Exception three text
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
stack_traces = "\n".join(
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
)
message = f"{short_summary}\nTRACES:\n{stack_traces}"
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}") raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
return futures return futures

View file

@ -1,13 +1,16 @@
import inspect import inspect
import logging import logging
import os
from functools import wraps from functools import wraps
from time import sleep, time from time import sleep, time
from typing import Any from typing import Any
import yaml
from _pytest.outcomes import Failed from _pytest.outcomes import Failed
from pytest import fail from pytest import fail
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.utils.func_utils import format_by_args from frostfs_testlib.utils.func_utils import format_by_args
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -128,6 +131,42 @@ def run_optionally(enabled: bool, mock_value: Any = True):
return deco return deco
def cached_fixture(enabled: bool):
"""
Decorator to cache fixtures.
MUST be placed after @pytest.fixture and before @allure decorators.
Args:
enabled: if true, decorated func will be cached.
"""
def deco(func):
@wraps(func)
def func_impl(*a, **kw):
# TODO: *a and *kw should be parsed to some kind of hashsum and used in filename to prevent cache load from different parameters
cache_file = os.path.join(ASSETS_DIR, f"fixture_cache_{func.__name__}.yml")
if enabled and os.path.exists(cache_file):
with open(cache_file, "r") as cache_input:
return yaml.load(cache_input, Loader=yaml.Loader)
result = func(*a, **kw)
if enabled:
with open(cache_file, "w") as cache_output:
yaml.dump(result, cache_output)
return result
# TODO: cache yielding fixtures
@wraps(func)
def gen_impl(*a, **kw):
raise NotImplementedError("Not implemented for yielding fixtures")
return gen_impl if inspect.isgeneratorfunction(func) else func_impl
return deco
def wait_for_success( def wait_for_success(
max_wait_time: int = 60, max_wait_time: int = 60,
interval: int = 1, interval: int = 1,

View file

@ -9,13 +9,12 @@ import csv
import json import json
import logging import logging
import re import re
import subprocess
import sys import sys
from contextlib import suppress from contextlib import suppress
from datetime import datetime from datetime import datetime
from io import StringIO from io import StringIO
from textwrap import shorten from textwrap import shorten
from typing import Dict, List, Optional, TypedDict, Union from typing import Any, Optional, Union
import pexpect import pexpect
@ -75,22 +74,75 @@ def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: date
reporter.attach(command_attachment, "Command execution") reporter.attach(command_attachment, "Command execution")
def log_command_execution(url: str, cmd: str, output: Union[str, dict], params: Optional[dict] = None) -> None: def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[dict] = None, **kwargs) -> None:
logger.info(f"{cmd}: {output}") logger.info(f"{cmd}: {output}")
with suppress(Exception): if not params:
json_output = json.dumps(output, indent=4, sort_keys=True) params = {}
output = json_output
output_params = params
try: try:
json_params = json.dumps(params, indent=4, sort_keys=True) json_params = json.dumps(params, indent=4, sort_keys=True, default=str)
except TypeError as err: except TypeError as err:
logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}") logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}")
else: else:
params = json_params output_params = json_params
command_attachment = f"COMMAND: '{cmd}'\n" f"URL: {url}\n" f"PARAMS:\n{params}\n" f"OUTPUT:\n{output}\n" output = json.dumps(output, indent=4, sort_keys=True, default=str)
reporter.attach(command_attachment, "Command execution")
command_execution = f"COMMAND: '{cmd}'\n" f"URL: {kwargs['endpoint']}\n" f"PARAMS:\n{output_params}\n" f"OUTPUT:\n{output}\n"
aws_command = _convert_request_to_aws_cli_command(cmd, params, **kwargs)
reporter.attach(command_execution, "Command execution")
reporter.attach(aws_command, "AWS CLI Command")
def _convert_request_to_aws_cli_command(command: str, params: dict, **kwargs) -> str:
overriden_names = [_convert_json_name_to_aws_cli(name) for name in kwargs.keys()]
command = command.replace("_", "-")
options = []
for name, value in params.items():
name = _convert_json_name_to_aws_cli(name)
# To override parameters for AWS CLI
if name in overriden_names:
continue
if option := _create_option(name, value):
options.append(option)
for name, value in kwargs.items():
name = _convert_json_name_to_aws_cli(name)
if option := _create_option(name, value):
options.append(option)
options = " ".join(options)
api = "s3api" if "s3" in kwargs["endpoint"] else "iam"
return f"aws --no-verify-ssl --no-paginate {api} {command} {options}"
def _convert_json_name_to_aws_cli(name: str) -> str:
specific_names = {"CORSConfiguration": "cors-configuration"}
if aws_cli_name := specific_names.get(name):
return aws_cli_name
return re.sub(r"([a-z])([A-Z])", r"\1 \2", name).lower().replace(" ", "-").replace("_", "-")
def _create_option(name: str, value: Any) -> str | None:
if isinstance(value, bool) and value:
return f"--{name}"
if isinstance(value, dict):
value = json.dumps(value, indent=4, sort_keys=True, default=str)
return f"--{name} '{value}'"
if value:
return f"--{name} {value}"
return None
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: