Compare commits

...

24 commits

Author SHA1 Message Date
dc5a9e7bb9 [#340] Move s3 and http directories to avoid conflict with requests
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-18 12:57:24 +03:00
335eed85b1 [#338] Added parameter word_count to method get_logs
Signed-off-by: Ilyas Niyazov <i.niyazov@yadro.com>
2024-12-17 14:25:10 +00:00
cc7bd4ffc9 [#339] Added ns args for func container create
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-12-17 13:55:15 +03:00
cd15be3b7c [#334] Automation of PATCH method in S3
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-13 10:53:02 +03:00
8ff1e72499 [#337] Add rule chain error
Signed-off-by: Ekaterina Chernitsyna <e.chernitsyna@yadro.com>
2024-12-13 10:45:14 +03:00
0ebb845329 [#335] Fixed iam boto3 client 2024-12-06 10:50:34 +03:00
ee7d9df4a9 [#333] Fix files param in http client part two
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 16:48:23 +03:00
61353cb38c [#332] Fix files param in http client
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 14:26:24 +03:00
b3d05c5c28 [#326] Automation of PATCH method in GRPC
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 10:28:06 +00:00
8ec7e21e84 [#331] Fix type hints for service methods
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-12-03 14:55:12 +03:00
0e040d2722 [#330] Improve CURL generation and fix Boto3 logging
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-02 15:54:38 +03:00
7d6768c83f [#325] Added get nns records method to frostfs-adm
Signed-off-by: Dmitry Anurin <danurin@yadro.com>
2024-11-29 10:21:41 +00:00
3dc7a5bdb0 [#328] Change logic activating split-brain
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-11-29 08:55:08 +00:00
24e1dfef28 [#324]Extension list_objects method 2024-11-26 07:37:56 +00:00
0c9660fffc [#323] Update APE related entities
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-20 17:14:33 +03:00
8eaa511e5c [#322] Added classmethod decorator in Http client
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-11-18 15:07:24 +00:00
a1953684b8 [#307] added methods for testing MFA 2024-11-18 07:08:42 +00:00
451de5e07e [#320] Added shards detach function
Signed-off-by: Dmitry Anurin <danurin@yadro.com>
2024-11-14 16:22:06 +03:00
f24bfc06fd [#319] Add cached fixture feature
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 17:46:03 +03:00
47bc11835b [#318] Add tombstone expiration test
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 10:11:03 +03:00
2a90ec74ff [#317] update morph rule chain 2024-11-12 16:01:12 +03:00
95b32a036a [#316] Extend parallel exception message output
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-12 12:28:10 +03:00
55d8ee5da0 [#315] Add http client
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-11-08 15:51:32 +03:00
ea40940514 [#313] update force_new_epoch 2024-11-05 12:37:56 +03:00
37 changed files with 958 additions and 81 deletions

View file

@ -62,7 +62,7 @@ authmate = "frostfs_testlib.credentials.authmate_s3_provider:AuthmateS3Credentia
wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider" wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider"
[project.entry-points."frostfs.testlib.bucket_cid_resolver"] [project.entry-points."frostfs.testlib.bucket_cid_resolver"]
frostfs = "frostfs_testlib.s3.curl_bucket_resolver:CurlBucketContainerResolver" frostfs = "frostfs_testlib.clients.s3.curl_bucket_resolver:CurlBucketContainerResolver"
[tool.isort] [tool.isort]
profile = "black" profile = "black"

View file

@ -122,7 +122,9 @@ class FrostfsAdmMorph(CliCommand):
**{param: param_value for param, param_value in locals().items() if param not in ["self"]}, **{param: param_value for param, param_value in locals().items() if param not in ["self"]},
) )
def force_new_epoch(self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None) -> CommandResult: def force_new_epoch(
self, rpc_endpoint: Optional[str] = None, alphabet_wallets: Optional[str] = None, delta: Optional[int] = None
) -> CommandResult:
"""Create new FrostFS epoch event in the side chain. """Create new FrostFS epoch event in the side chain.
Args: Args:
@ -351,6 +353,7 @@ class FrostfsAdmMorph(CliCommand):
rule: Optional[list[str]] = None, rule: Optional[list[str]] = None,
path: Optional[str] = None, path: Optional[str] = None,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -381,6 +384,7 @@ class FrostfsAdmMorph(CliCommand):
target_name: str, target_name: str,
target_type: str, target_type: str,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -408,6 +412,7 @@ class FrostfsAdmMorph(CliCommand):
target_type: str, target_type: str,
target_name: Optional[str] = None, target_name: Optional[str] = None,
rpc_endpoint: Optional[str] = None, rpc_endpoint: Optional[str] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -434,6 +439,7 @@ class FrostfsAdmMorph(CliCommand):
target_name: str, target_name: str,
target_type: str, target_type: str,
all: Optional[bool] = None, all: Optional[bool] = None,
chain_name: Optional[str] = None,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
@ -457,3 +463,26 @@ class FrostfsAdmMorph(CliCommand):
"morph ape rm-rule-chain", "morph ape rm-rule-chain",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def get_nns_records(
self,
name: str,
type: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
alphabet_wallets: Optional[str] = None,
) -> CommandResult:
"""Returns domain record of the specified type
Args:
name: Domain name
type: Domain name service record type(A|CNAME|SOA|TXT)
rpc_endpoint: N3 RPC node endpoint
alphabet_wallets: path to alphabet wallets dir
Returns:
Command's result
"""
return self._execute(
"morph nns get-records",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -276,6 +276,53 @@ class FrostfsCliObject(CliCommand):
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def patch(
self,
rpc_endpoint: str,
cid: str,
oid: str,
range: list[str] = None,
payload: list[str] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
trace: bool = False,
ttl: Optional[int] = None,
wallet: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
PATCH an object.
Args:
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>')
cid: Container ID
oid: Object ID
range: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
payload: An array of file paths to be applied in each range
new_attrs: Attributes to be changed in the format Key1=Value1,Key2=Value2
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
address: Address of wallet account
bearer: File with signed JSON or binary encoded bearer token
generate_key: Generate new private key
session: Filepath to a JSON- or binary-encoded token of the object RANGE session
timeout: Timeout for the operation
trace: Generate trace ID and print it
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Dict with request X-Headers
Returns:
(str): ID of patched Object
"""
return self._execute(
"object patch",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def range( def range(
self, self,
rpc_endpoint: str, rpc_endpoint: str,

View file

@ -241,3 +241,21 @@ class FrostfsCliShards(CliCommand):
"control shards evacuation status", "control shards evacuation status",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def detach(self, endpoint: str, address: Optional[str] = None, id: Optional[str] = None, timeout: Optional[str] = None):
"""
Detach and close the shards
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
id: List of shard IDs in base58 encoding
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards detach",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,5 @@
from frostfs_testlib.clients.http.http_client import HttpClient
from frostfs_testlib.clients.s3.aws_cli_client import AwsCliClient
from frostfs_testlib.clients.s3.boto3_client import Boto3ClientWrapper
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper
from frostfs_testlib.clients.s3.s3_http_client import S3HttpClient

View file

@ -0,0 +1,144 @@
import io
import json
import logging
import logging.config
from typing import Mapping, Sequence
import httpx
from frostfs_testlib import reporter
timeout = httpx.Timeout(60, read=150)
LOGGING_CONFIG = {
"disable_existing_loggers": False,
"version": 1,
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
"formatters": {
"http": {
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"loggers": {
"httpx": {
"handlers": ["default"],
"level": "DEBUG",
},
"httpcore": {
"handlers": ["default"],
"level": "ERROR",
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("NeoLogger")
class HttpClient:
@reporter.step("Send {method} request to {url}")
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
transport = httpx.HTTPTransport(verify=False, retries=5)
client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs)
self._attach_response(response, **kwargs)
logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code:
assert (
response.status_code == expected_status_code
), f"Got {response.status_code} response code while {expected_status_code} expected"
return response
@classmethod
def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None:
try:
content = readable.read()
except Exception as e:
logger.warning(f"Unable to read file: {str(e)}")
return None
if not content:
return None
request_body = None
try:
request_body = json.loads(content)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Unable to convert body to json: {str(e)}")
if request_body is not None:
return json.dumps(request_body, default=str, indent=4)
try:
request_body = content.decode()
except UnicodeDecodeError as e:
logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}")
request_body = content if request_body is None else request_body
request_body = "<large text data>" if len(request_body) > 1000 else request_body
return request_body
@classmethod
def _parse_files(cls, files: Mapping | Sequence | None) -> dict:
filepaths = {}
if not files:
return filepaths
if isinstance(files, Sequence):
items = files
elif isinstance(files, Mapping):
items = files.items()
else:
raise TypeError(f"'files' must be either Sequence or Mapping, got: {type(files).__name__}")
for name, file in items:
if isinstance(file, io.IOBase):
filepaths[name] = file.name
elif isinstance(file, Sequence):
filepaths[name] = file[1].name
return filepaths
@classmethod
def _attach_response(cls, response: httpx.Response, **kwargs):
request = response.request
request_headers = json.dumps(dict(request.headers), default=str, indent=4)
request_body = cls._parse_body(request)
files = kwargs.get("files")
request_files = cls._parse_files(files)
response_headers = json.dumps(dict(response.headers), default=str, indent=4)
response_body = cls._parse_body(response)
report = (
f"Method: {request.method}\n\n"
+ f"URL: {request.url}\n\n"
+ f"Request Headers: {request_headers}\n\n"
+ (f"Request Body: {request_body}\n\n" if request_body else "")
+ (f"Request Files: {request_files}\n\n" if request_files else "")
+ f"Response Status Code: {response.status_code}\n\n"
+ f"Response Headers: {response_headers}\n\n"
+ (f"Response Body: {response_body}\n\n" if response_body else "")
)
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, request_files)
reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL")
@classmethod
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else ""
for name, path in files.items():
data += f' -F "{name}=@{path}"'
# Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -0,0 +1 @@
from frostfs_testlib.clients.s3.interfaces import BucketContainerResolver, S3ClientWrapper, VersioningStatus

View file

@ -6,8 +6,8 @@ from time import sleep
from typing import Literal, Optional, Union from typing import Literal, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.utils import string_utils from frostfs_testlib.utils import string_utils
@ -171,7 +171,7 @@ class AwsCliClient(S3ClientWrapper):
return response.get("TagSet") return response.get("TagSet")
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
@ -179,8 +179,7 @@ class AwsCliClient(S3ClientWrapper):
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) return self._to_json(output)
return response.get("Grants")
@reporter.step("Get bucket location") @reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict: def get_bucket_location(self, bucket: str) -> dict:
@ -196,11 +195,20 @@ class AwsCliClient(S3ClientWrapper):
return response.get("LocationConstraint") return response.get("LocationConstraint")
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} "
if page_size:
cmd = cmd.replace("--no-paginate", "")
cmd += f" --page-size {page_size} "
if prefix:
cmd += f" --prefix {prefix}"
if self.profile:
cmd += f" --profile {self.profile} "
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -852,7 +860,7 @@ class AwsCliClient(S3ClientWrapper):
return response["Parts"] return response["Parts"]
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
@ -1440,3 +1448,90 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output) response = self._to_json(output)
return response return response
# MFA METHODS
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple:
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
return serial_number, False
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\
--authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}"
if duration_seconds:
cmd += f" --duration-seconds {duration_seconds}"
if serial_number:
cmd += f" --serial-number {serial_number}"
if token_code:
cmd += f" --token-code {token_code}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

View file

@ -13,8 +13,8 @@ from botocore.exceptions import ClientError
from mypy_boto3_s3 import S3Client from mypy_boto3_s3 import S3Client
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.utils import string_utils from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run # TODO: Refactor this code to use shell instead of _cmd_run
@ -41,6 +41,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
self.boto3_iam_client: S3Client = None self.boto3_iam_client: S3Client = None
self.iam_endpoint: str = "" self.iam_endpoint: str = ""
self.boto3_sts_client: S3Client = None
self.access_key_id: str = access_key_id self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key self.secret_access_key: str = secret_access_key
self.profile = profile self.profile = profile
@ -84,9 +86,18 @@ class Boto3ClientWrapper(S3ClientWrapper):
service_name="iam", service_name="iam",
aws_access_key_id=self.access_key_id, aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key, aws_secret_access_key=self.secret_access_key,
region_name=self.region,
endpoint_url=self.iam_endpoint, endpoint_url=self.iam_endpoint,
verify=False, verify=False,
) )
# since the STS does not have an enpoint, IAM is used
self.boto3_sts_client = self.session.client(
service_name="sts",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint,
verify=False,
)
def _to_s3_param(self, param: str) -> str: def _to_s3_param(self, param: str) -> str:
replacement_map = { replacement_map = {
@ -219,14 +230,13 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response.get("TagSet") return response.get("TagSet")
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> dict:
response = self._exec_request( return self._exec_request(
self.boto3_client.get_bucket_acl, self.boto3_client.get_bucket_acl,
params={"Bucket": bucket}, params={"Bucket": bucket},
endpoint=self.s3gate_endpoint, endpoint=self.s3gate_endpoint,
profile=self.profile, profile=self.profile,
) )
return response.get("Grants")
@reporter.step("Delete bucket tagging") @reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None: def delete_bucket_tagging(self, bucket: str) -> None:
@ -388,10 +398,17 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response if full_output else obj_list return response if full_output else obj_list
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
params = {"Bucket": bucket}
if page_size:
params["MaxKeys"] = page_size
if prefix:
params["Prefix"] = prefix
response = self._exec_request( response = self._exec_request(
self.boto3_client.list_objects, self.boto3_client.list_objects,
params={"Bucket": bucket}, params,
endpoint=self.s3gate_endpoint, endpoint=self.s3gate_endpoint,
profile=self.profile, profile=self.profile,
) )
@ -687,7 +704,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response["Parts"] return response["Parts"]
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts] parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
params = self._convert_to_s3_params(locals(), exclude=["parts"]) params = self._convert_to_s3_params(locals(), exclude=["parts"])
params["MultipartUpload"] = {"Parts": parts} params["MultipartUpload"] = {"Parts": parts}
@ -1265,3 +1282,66 @@ class Boto3ClientWrapper(S3ClientWrapper):
endpoint=self.iam_endpoint, endpoint=self.iam_endpoint,
profile=self.profile, profile=self.profile,
) )
# MFA methods
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
return serial_number, base32StringSeed
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
response = self.boto3_iam_client.enable_mfa_device(
UserName=user_name,
SerialNumber=serial_number,
AuthenticationCode1=authentication_code1,
AuthenticationCode2=authentication_code2,
)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
response = self.boto3_iam_client.list_virtual_mfa_devices()
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
) -> tuple:
response = self.boto3_sts_client.get_session_token(
DurationSeconds=duration_seconds,
SerialNumber=serial_number,
TokenCode=token_code,
)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

View file

@ -1,7 +1,7 @@
import re import re
from frostfs_testlib.cli.generic_cli import GenericCli from frostfs_testlib.cli.generic_cli import GenericCli
from frostfs_testlib.s3.interfaces import BucketContainerResolver from frostfs_testlib.clients.s3 import BucketContainerResolver
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode

View file

@ -128,7 +128,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Deletes the tags from the bucket.""" """Deletes the tags from the bucket."""
@abstractmethod @abstractmethod
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> dict:
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket.""" """This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
@abstractmethod @abstractmethod
@ -195,7 +195,9 @@ class S3ClientWrapper(HumanReadableABC):
""" """
@abstractmethod @abstractmethod
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
"""Returns some or all (up to 1,000) of the objects in a bucket with each request. """Returns some or all (up to 1,000) of the objects in a bucket with each request.
You can use the request parameters as selection criteria to return a subset of the objects in a bucket. You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
A 200 OK response can contain valid or invalid XML. Make sure to design your application A 200 OK response can contain valid or invalid XML. Make sure to design your application
@ -334,7 +336,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Lists the parts that have been uploaded for a specific multipart upload.""" """Lists the parts that have been uploaded for a specific multipart upload."""
@abstractmethod @abstractmethod
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
"""Completes a multipart upload by assembling previously uploaded parts.""" """Completes a multipart upload by assembling previously uploaded parts."""
@abstractmethod @abstractmethod
@ -578,3 +580,32 @@ class S3ClientWrapper(HumanReadableABC):
@abstractmethod @abstractmethod
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
"""Removes the specified tags from the user""" """Removes the specified tags from the user"""
# MFA methods
@abstractmethod
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
"""Creates a new virtual MFA device"""
@abstractmethod
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
"""Deactivates the specified MFA device and removes it from association with the user name"""
@abstractmethod
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
"""Deletes a virtual MFA device"""
@abstractmethod
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
"""Enables the specified MFA device and associates it with the specified IAM user"""
@abstractmethod
def iam_list_virtual_mfa_devices(self) -> dict:
"""Lists the MFA devices for an IAM user"""
@abstractmethod
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
"""Get session token for user"""

View file

@ -0,0 +1,127 @@
import hashlib
import logging
import xml.etree.ElementTree as ET
import httpx
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from frostfs_testlib import reporter
from frostfs_testlib.clients import HttpClient
from frostfs_testlib.utils.file_utils import TestFile
logger = logging.getLogger("NeoLogger")
DEFAULT_TIMEOUT = 60.0
class S3HttpClient:
def __init__(
self, s3gate_endpoint: str, access_key_id: str, secret_access_key: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.http_client = HttpClient()
self.s3gate_endpoint = s3gate_endpoint
self.credentials = Credentials(access_key_id, secret_access_key)
self.profile = profile
self.region = region
self.service = "s3"
self.signature = SigV4Auth(self.credentials, self.service, self.region)
def _to_s3_header(self, header: str) -> dict:
replacement_map = {
"Acl": "ACL",
"_": "-",
}
result = header
if not header.startswith("x_amz"):
result = header.title()
for find, replace in replacement_map.items():
result = result.replace(find, replace)
return result
def _convert_to_s3_headers(self, scope: dict, exclude: list[str] = None):
exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"]
return {self._to_s3_header(header): value for header, value in scope.items() if header not in exclude and value is not None}
def _create_aws_request(
self, method: str, url: str, headers: dict, content: str | bytes | TestFile = None, params: dict = None
) -> AWSRequest:
data = b""
if content is not None:
if isinstance(content, TestFile):
with open(content, "rb") as io_content:
data = io_content.read()
elif isinstance(content, str):
data = bytes(content, encoding="utf-8")
elif isinstance(content, bytes):
data = content
else:
raise TypeError(f"Content expected as a string, bytes or TestFile object, got: {content}")
headers["X-Amz-Content-SHA256"] = hashlib.sha256(data).hexdigest()
aws_request = AWSRequest(method, url, headers, data, params)
self.signature.add_auth(aws_request)
return aws_request
def _exec_request(
self,
method: str,
url: str,
headers: dict,
content: str | bytes | TestFile = None,
params: dict = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
aws_request = self._create_aws_request(method, url, headers, content, params)
response = self.http_client.send(
aws_request.method,
aws_request.url,
headers=dict(aws_request.headers),
data=aws_request.data,
params=aws_request.params,
timeout=timeout,
)
try:
response.raise_for_status()
except httpx.HTTPStatusError:
raise httpx.HTTPStatusError(response.text, request=response.request, response=response)
root = ET.fromstring(response.read())
data = {
"LastModified": root.find(".//LastModified").text,
"ETag": root.find(".//ETag").text,
}
if response.headers.get("x-amz-version-id"):
data["VersionId"] = response.headers.get("x-amz-version-id")
return data
@reporter.step("Patch object S3")
def patch_object(
self,
bucket: str,
key: str,
content: str | bytes | TestFile,
content_range: str,
version_id: str = None,
if_match: str = None,
if_unmodified_since: str = None,
x_amz_expected_bucket_owner: str = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
if content_range and not content_range.startswith("bytes"):
content_range = f"bytes {content_range}/*"
url = f"{self.s3gate_endpoint}/{bucket}/{key}"
headers = self._convert_to_s3_headers(locals(), exclude=["bucket", "key", "content", "version_id", "timeout"])
params = {"VersionId": version_id} if version_id is not None else None
return self._exec_request("PATCH", url, headers, content, params, timeout=timeout)

View file

@ -164,6 +164,9 @@ class DockerHost(Host):
return volume_path return volume_path
def send_signal_to_service(self, service_name: str, signal: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_metabase(self, service_name: str) -> None: def delete_metabase(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker") raise NotImplementedError("Not implemented for docker")
@ -247,6 +250,7 @@ class DockerHost(Host):
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None, priority: Optional[str] = None,
word_count: bool = None,
) -> str: ) -> str:
client = self._get_docker_client() client = self._get_docker_client()
filtered_logs = "" filtered_logs = ""

View file

@ -117,6 +117,17 @@ class Host(ABC):
service_name: Name of the service to stop. service_name: Name of the service to stop.
""" """
@abstractmethod
def send_signal_to_service(self, service_name: str, signal: str) -> None:
"""Send signal to service with specified name using kill -<signal>
The service must be hosted on this host.
Args:
service_name: Name of the service to stop.
signal: signal name. See kill -l to all names
"""
@abstractmethod @abstractmethod
def mask_service(self, service_name: str) -> None: def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it. """Prevent the service from start by any activity by masking it.
@ -313,6 +324,7 @@ class Host(ABC):
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None, priority: Optional[str] = None,
word_count: bool = None,
) -> str: ) -> str:
"""Get logs from host filtered by regex. """Get logs from host filtered by regex.
@ -323,6 +335,7 @@ class Host(ABC):
unit: required unit. unit: required unit.
priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher. priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher.
For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0. For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0.
word_count: output type, expected values: lines, bytes, json
Returns: Returns:
Found entries as str if any found. Found entries as str if any found.

View file

@ -53,3 +53,4 @@ HOSTING_CONFIG_FILE = os.getenv(
) )
MORE_LOG = os.getenv("MORE_LOG", "1") MORE_LOG = os.getenv("MORE_LOG", "1")
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"

View file

@ -9,6 +9,7 @@ OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed"
SESSION_NOT_FOUND = "code = 4096.*message = session token not found" SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
OUT_OF_RANGE = "code = 2053.*message = out of range" OUT_OF_RANGE = "code = 2053.*message = out of range"
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token" EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
ADD_CHAIN_ERROR = "code = 5120 message = apemanager access denied"
# TODO: Change to codes with message # TODO: Change to codes with message
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked" # OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed # LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
@ -27,6 +28,10 @@ S3_BUCKET_DOES_NOT_ALLOW_ACL = "The bucket does not allow ACLs"
S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema." S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema."
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied" RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied" # Errors from node missing reasons if request was forwarded. Commenting for now
# RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request"
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound" NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound" # Errors from node missing reasons if request was forwarded. Commenting for now
# NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request"

View file

@ -16,11 +16,10 @@ OPTIONAL_NODE_UNDER_LOAD = os.getenv("OPTIONAL_NODE_UNDER_LOAD")
OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true")) OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true"))
# Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped. # Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped.
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool( OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true"))
os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")
)
# Set this to False for disable autouse fixture like node healthcheck during developing time. # Set this to False for disable autouse fixture like node healthcheck during developing time.
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool( OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true"))
os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")
) # Use cache for fixtures with @cachec_fixture decorator
OPTIONAL_CACHE_FIXTURES = str_to_bool(os.getenv("OPTIONAL_CACHE_FIXTURES", "false"))

View file

@ -1,3 +0,0 @@
from frostfs_testlib.s3.aws_cli_client import AwsCliClient
from frostfs_testlib.s3.boto3_client import Boto3ClientWrapper
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus

View file

@ -7,9 +7,7 @@ from typing import Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
@ -111,6 +109,8 @@ def create_container(
options: Optional[dict] = None, options: Optional[dict] = None,
await_mode: bool = True, await_mode: bool = True,
wait_for_creation: bool = True, wait_for_creation: bool = True,
nns_zone: Optional[str] = None,
nns_name: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str: ) -> str:
""" """
@ -143,6 +143,8 @@ def create_container(
result = cli.container.create( result = cli.container.create(
rpc_endpoint=endpoint, rpc_endpoint=endpoint,
policy=rule, policy=rule,
nns_name=nns_name,
nns_zone=nns_zone,
basic_acl=basic_acl, basic_acl=basic_acl,
attributes=attributes, attributes=attributes,
name=name, name=name,

View file

@ -12,8 +12,8 @@ import requests
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli import GenericCli from frostfs_testlib.cli import GenericCli
from frostfs_testlib.clients.s3.aws_cli_client import command_options
from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE
from frostfs_testlib.s3.aws_cli_client import command_options
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.steps.cli.object import get_object from frostfs_testlib.steps.cli.object import get_object

View file

@ -4,16 +4,18 @@ from frostfs_testlib.storage.cluster import ClusterNode
class IpHelper: class IpHelper:
@staticmethod @staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[tuple]) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
for ip in block_ip: for ip, table in block_ip:
shell.exec(f"ip route add blackhole {ip}") if not table:
shell.exec(f"ip r a blackhole {ip}")
continue
shell.exec(f"ip r a blackhole {ip} table {table}")
@staticmethod @staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None: def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False)) unlock_ip = shell.exec("ip r l table all | grep blackhole", CommandOptions(check=False)).stdout
if unlock_ip.return_code != 0:
return for active_blackhole in unlock_ip.strip().split("\n"):
for ip in unlock_ip.stdout.strip().split("\n"): shell.exec(f"ip r d {active_blackhole}")
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")

View file

@ -6,8 +6,7 @@ from typing import Optional
from dateutil.parser import parse from dateutil.parser import parse
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus from frostfs_testlib.clients.s3 import BucketContainerResolver, S3ClientWrapper, VersioningStatus
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import search_nodes_with_container from frostfs_testlib.steps.cli.container import search_nodes_with_container
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
@ -185,3 +184,26 @@ def search_nodes_with_bucket(
break break
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster) nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
return nodes_list return nodes_list
def get_bytes_relative_to_object(value: int | str, object_size: int = None, part_size: int = None) -> int:
if isinstance(value, int):
return value
if "part" not in value and "object" not in value:
return int(value)
if object_size is not None:
value = value.replace("object", str(object_size))
if part_size is not None:
value = value.replace("part", str(part_size))
return int(eval(value))
def get_range_relative_to_object(rng: str, object_size: int = None, part_size: int = None, int_values: bool = False) -> str | int:
start, end = rng.split(":")
start = get_bytes_relative_to_object(start, object_size, part_size)
end = get_bytes_relative_to_object(end, object_size, part_size)
return (start, end) if int_values else f"bytes {start}-{end}/*"

View file

@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.metrics import Metrics
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry from frostfs_testlib.storage.service_registry import ServiceRegistry
from frostfs_testlib.storage.dataclasses.metrics import Metrics
class ClusterNode: class ClusterNode:

View file

@ -23,4 +23,6 @@ class PlacementRule:
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X" DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X" SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X" REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
REP_1_FOR_2_NODES_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 2 FROM * AS X"
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1" DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
EC_1_1_FOR_2_NODES_PLACEMENT_RULE = "EC 1.1 IN X CBF 1 SELECT 2 FROM * AS X"

View file

@ -1,4 +1,5 @@
import datetime import datetime
import itertools
import logging import logging
import time import time
from typing import TypeVar from typing import TypeVar
@ -39,7 +40,7 @@ class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: set[ClusterNode] = set()
self.excluded_from_netmap: list[StorageNode] = [] self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
@ -172,6 +173,15 @@ class ClusterStateController:
if service_type == StorageNode: if service_type == StorageNode:
self.wait_after_storage_startup() self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to all {service_type} services")
def sighup_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
if service_type == StorageNode:
self.wait_after_storage_startup()
@wait_for_success(600, 60) @wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate): def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"): with reporter.step(f"Wait for {s3gate} reconnection"):
@ -206,21 +216,27 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Stop {service_type} service on {node}") @reporter.step("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True): def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
service = node.service(service_type) service = node.service(service_type)
service.stop_service(mask) service.stop_service(mask)
self.stopped_services.add(service) self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to {service_type} service on {node}")
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type)
service.send_signal_to_service("SIGHUP")
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start {service_type} service on {node}") @reporter.step("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type) service = node.service(service_type)
service.start_service() service.start_service()
self.stopped_services.discard(service) self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start all stopped {service_type} services") @reporter.step("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]): def start_stopped_services_of_type(self, service_type: ServiceClass):
stopped_svc = self._get_stopped_by_type(service_type) stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc: if not stopped_svc:
return return
@ -310,22 +326,22 @@ class ClusterStateController:
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None: def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
list_ip = self._parse_interfaces(block_nodes, name_interface) interfaces_tables = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, list_ip) IpHelper.drop_input_traffic_to_node(node, interfaces_tables)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.append(node) self.dropped_traffic.add(node)
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic(self, node: ClusterNode) -> None: def restore_traffic(self, node: ClusterNode) -> None:
IpHelper.restore_input_traffic_to_node(node=node) IpHelper.restore_input_traffic_to_node(node=node)
index = self.dropped_traffic.index(node) self.dropped_traffic.discard(node)
self.dropped_traffic.pop(index)
@reporter.step("Restore blocked nodes") @reporter.step("Restore blocked nodes")
def restore_all_traffic(self): def restore_all_traffic(self):
if not self.dropped_traffic: if not self.dropped_traffic:
return return
parallel(self._restore_traffic_to_node, self.dropped_traffic) parallel(self._restore_traffic_to_node, self.dropped_traffic)
self.dropped_traffic.clear()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Hard reboot host {node} via magic SysRq option") @reporter.step("Hard reboot host {node} via magic SysRq option")
@ -501,17 +517,31 @@ class ClusterStateController:
return disk_controller return disk_controller
@reporter.step("Restore traffic {node}")
def _restore_traffic_to_node(self, node): def _restore_traffic_to_node(self, node):
IpHelper.restore_input_traffic_to_node(node) IpHelper.restore_input_traffic_to_node(node)
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str): def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str) -> list[tuple]:
interfaces = [] interfaces_and_tables = set()
for node in nodes: for node in nodes:
dict_interfaces = node.host.config.interfaces shell = node.host.get_shell()
for type, ip in dict_interfaces.items(): lines = shell.exec(f"ip r l table all | grep '{name_interface}'").stdout.splitlines()
if name_interface in type:
interfaces.append(ip) ips = []
return interfaces tables = []
for line in lines:
if "src" not in line or "table local" in line:
continue
parts = line.split()
ips.append(parts[-1])
if "table" in line:
tables.append(parts[parts.index("table") + 1])
tables.append(None)
[interfaces_and_tables.add((ip, table)) for ip, table in itertools.product(ips, tables)]
return interfaces_and_tables
@reporter.step("Ping node") @reporter.step("Ping node")
def _ping_host(self, node: ClusterNode): def _ping_host(self, node: ClusterNode):

View file

@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
self.cluster = self.csc.cluster self.cluster = self.csc.cluster
@reporter.step("Change configuration for {service_type} on all nodes") @reporter.step("Change configuration for {service_type} on all nodes")
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
nodes = self.cluster.nodes(services) nodes = self.cluster.nodes(services)
self.services_with_changed_config.update([(node, service_type) for node in nodes]) self.services_with_changed_config.update([(node, service_type) for node in nodes])
self.csc.stop_services_of_type(service_type) if not sighup:
self.csc.stop_services_of_type(service_type)
parallel([node.config(service_type).set for node in nodes], values=values) parallel([node.config(service_type).set for node in nodes], values=values)
self.csc.start_services_of_type(service_type) if not sighup:
self.csc.start_services_of_type(service_type)
else:
self.csc.sighup_services_of_type(service_type)
@reporter.step("Change configuration for {service_type} on {node}") @reporter.step("Change configuration for {service_type} on {node}")
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
self.csc.start_service_of_type(node, service_type) self.csc.start_service_of_type(node, service_type)
@reporter.step("Revert all configuration changes") @reporter.step("Revert all configuration changes")
def revert_all(self): def revert_all(self, sighup: bool = False):
if not self.services_with_changed_config: if not self.services_with_changed_config:
return return
parallel(self._revert_svc, self.services_with_changed_config) parallel(self._revert_svc, self.services_with_changed_config, sighup)
self.services_with_changed_config.clear() self.services_with_changed_config.clear()
self.csc.start_all_stopped_services() if not sighup:
self.csc.start_all_stopped_services()
# TODO: parallel can't have multiple parallel_items :( # TODO: parallel can't have multiple parallel_items :(
@reporter.step("Revert all configuration {node_and_service}") @reporter.step("Revert all configuration {node_and_service}")
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]): def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
node, service_type = node_and_service node, service_type = node_and_service
self.csc.stop_service_of_type(node, service_type) service = node.service(service_type)
if not sighup:
self.csc.stop_service_of_type(node, service_type)
node.config(service_type).revert() node.config(service_type).revert()
if sighup:
service.send_signal_to_service("SIGHUP")

View file

@ -13,6 +13,7 @@ FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
class ObjectOperations(HumanReadableEnum): class ObjectOperations(HumanReadableEnum):
PUT = "object.put" PUT = "object.put"
PATCH = "object.patch"
GET = "object.get" GET = "object.get"
HEAD = "object.head" HEAD = "object.head"
GET_RANGE = "object.range" GET_RANGE = "object.range"
@ -26,6 +27,18 @@ class ObjectOperations(HumanReadableEnum):
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL] return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
class ContainerOperations(HumanReadableEnum):
PUT = "container.put"
GET = "container.get"
LIST = "container.list"
DELETE = "container.delete"
WILDCARD_ALL = "container.*"
@staticmethod
def get_all():
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
@dataclass @dataclass
class Operations: class Operations:
GET_CONTAINER = "GetContainer" GET_CONTAINER = "GetContainer"
@ -124,7 +137,7 @@ class Rule:
if not operations: if not operations:
self.operations = [] self.operations = []
elif isinstance(operations, ObjectOperations): elif isinstance(operations, (ObjectOperations, ContainerOperations)):
self.operations = [operations] self.operations = [operations]
else: else:
self.operations = operations self.operations = operations

View file

@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
with reporter.step(f"Start {self.name} service on {self.host.config.address}"): with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name) self.host.start_service(self.name)
def send_signal_to_service(self, signal: str):
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
self.host.send_signal_to_service(self.name, signal)
@abstractmethod @abstractmethod
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
if attribute_name not in config.attributes: if attribute_name not in config.attributes:
if default_attribute_name is None: if default_attribute_name is None:
raise RuntimeError( raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
)
return config.attributes[default_attribute_name] return config.attributes[default_attribute_name]
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
return self.host.get_service_config(self.name) return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime: def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec( result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip()) start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc) current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time active_time = current_time - start_time

View file

@ -5,9 +5,9 @@ from typing import List, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.clients.s3 import BucketContainerResolver
from frostfs_testlib.plugins import load_plugin from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils from frostfs_testlib.utils import json_utils

View file

@ -206,6 +206,11 @@ class ObjectOperations(interfaces.ObjectInterface):
hash_type=hash_type, hash_type=hash_type,
timeout=timeout, timeout=timeout,
) )
if range:
# Cut off the range and return only hash
return result.stdout.split(":")[1].strip()
return result.stdout return result.stdout
@reporter.step("Head object") @reporter.step("Head object")
@ -407,6 +412,57 @@ class ObjectOperations(interfaces.ObjectInterface):
oid = id_str.split(":")[1] oid = id_str.split(":")[1]
return oid.strip() return oid.strip()
@reporter.step("Patch object")
def patch(
self,
cid: str,
oid: str,
endpoint: str,
ranges: list[str] = None,
payloads: list[str] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
trace: bool = False,
) -> str:
"""
PATCH an object.
Args:
cid: ID of Container where we get the Object from
oid: Object ID
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
ranges: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
payloads: An array of file paths to be applied in each range
new_attrs: Attributes to be changed in the format "key1=value1,key2=value2"
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
bearer: Path to Bearer Token file, appends to `--bearer` key
xhdr: Request X-Headers in form of Key=Value
session: Path to a JSON-encoded container session token
timeout: Timeout for the operation
trace: Generate trace ID and print it
Returns:
(str): ID of patched Object
"""
result = self.cli.object.patch(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
range=ranges,
payload=payloads,
new_attrs=new_attrs,
replace_attrs=replace_attrs,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
trace=trace,
)
return result.stdout.split(":")[1].strip()
@reporter.step("Put object to random node") @reporter.step("Put object to random node")
def put_to_random_node( def put_to_random_node(
self, self,
@ -622,3 +678,30 @@ class ObjectOperations(interfaces.ObjectInterface):
] ]
return object_nodes return object_nodes
@reporter.step("Search parts of object")
def parts(
self,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[str]:
endpoint = alive_node.storage_node.get_rpc_endpoint()
response = self.cli.object.nodes(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
json=True,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
response_json = json.loads(response.stdout)
return [data_object["object_id"] for data_object in response_json["data_objects"]]

View file

@ -198,6 +198,24 @@ class ObjectInterface(ABC):
) -> str: ) -> str:
pass pass
@abstractmethod
def patch(
self,
cid: str,
oid: str,
endpoint: str,
ranges: Optional[list[str]] = None,
payloads: Optional[list[str]] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
bearer: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
trace: bool = False,
) -> str:
pass
@abstractmethod @abstractmethod
def put_to_random_node( def put_to_random_node(
self, self,
@ -264,6 +282,20 @@ class ObjectInterface(ABC):
) -> List[ClusterNode]: ) -> List[ClusterNode]:
pass pass
@abstractmethod
def parts(
self,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = None,
) -> List[str]:
pass
class ContainerInterface(ABC): class ContainerInterface(ABC):
@abstractmethod @abstractmethod

View file

@ -1,4 +1,5 @@
import itertools import itertools
import traceback
from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager from contextlib import contextmanager
from typing import Callable, Collection, Optional, Union from typing import Callable, Collection, Optional, Union
@ -55,7 +56,42 @@ def parallel(
# Check for exceptions # Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()] exceptions = [future.exception() for future in futures if future.exception()]
if exceptions: if exceptions:
message = "\n".join([str(e) for e in exceptions]) # Prettify exception in parallel with all underlying stack traces
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
#
# RuntimeError: The following exceptions occured during parallel run:
# 1) Exception one text
# 2) Exception two text
# 3) Exception three text
# TRACES:
# ==== 1 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception one text")
# RuntimeError: Exception one text
#
# ==== 2 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception two text")
# RuntimeError: Exception two text
#
# ==== 3 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception three text")
# RuntimeError: Exception three text
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
stack_traces = "\n".join(
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
)
message = f"{short_summary}\nTRACES:\n{stack_traces}"
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}") raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
return futures return futures

View file

@ -1,13 +1,16 @@
import inspect import inspect
import logging import logging
import os
from functools import wraps from functools import wraps
from time import sleep, time from time import sleep, time
from typing import Any from typing import Any
import yaml
from _pytest.outcomes import Failed from _pytest.outcomes import Failed
from pytest import fail from pytest import fail
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.utils.func_utils import format_by_args from frostfs_testlib.utils.func_utils import format_by_args
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -128,6 +131,42 @@ def run_optionally(enabled: bool, mock_value: Any = True):
return deco return deco
def cached_fixture(enabled: bool):
"""
Decorator to cache fixtures.
MUST be placed after @pytest.fixture and before @allure decorators.
Args:
enabled: if true, decorated func will be cached.
"""
def deco(func):
@wraps(func)
def func_impl(*a, **kw):
# TODO: *a and *kw should be parsed to some kind of hashsum and used in filename to prevent cache load from different parameters
cache_file = os.path.join(ASSETS_DIR, f"fixture_cache_{func.__name__}.yml")
if enabled and os.path.exists(cache_file):
with open(cache_file, "r") as cache_input:
return yaml.load(cache_input, Loader=yaml.Loader)
result = func(*a, **kw)
if enabled:
with open(cache_file, "w") as cache_output:
yaml.dump(result, cache_output)
return result
# TODO: cache yielding fixtures
@wraps(func)
def gen_impl(*a, **kw):
raise NotImplementedError("Not implemented for yielding fixtures")
return gen_impl if inspect.isgeneratorfunction(func) else func_impl
return deco
def wait_for_success( def wait_for_success(
max_wait_time: int = 60, max_wait_time: int = 60,
interval: int = 1, interval: int = 1,

View file

@ -80,6 +80,9 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d
if not params: if not params:
params = {} params = {}
if params.get("Body") and len(params.get("Body")) > 1000:
params["Body"] = "<large text data>"
output_params = params output_params = params
try: try:

View file

@ -45,7 +45,7 @@ def ensure_directory_opener(path, flags):
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps # TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
# Use object_size dt in future as argument # Use object_size dt in future as argument
@reporter.step("Generate file") @reporter.step("Generate file")
def generate_file(size: int) -> TestFile: def generate_file(size: int, file_name: Optional[str] = None) -> TestFile:
"""Generates a binary file with the specified size in bytes. """Generates a binary file with the specified size in bytes.
Args: Args:
@ -54,7 +54,11 @@ def generate_file(size: int) -> TestFile:
Returns: Returns:
The path to the generated file. The path to the generated file.
""" """
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
if file_name is None:
file_name = string_utils.unique_name("object-")
test_file = TestFile(os.path.join(ASSETS_DIR, file_name))
with open(test_file, "wb", opener=ensure_directory_opener) as file: with open(test_file, "wb", opener=ensure_directory_opener) as file:
file.write(os.urandom(size)) file.write(os.urandom(size))
logger.info(f"File with size {size} bytes has been generated: {test_file}") logger.info(f"File with size {size} bytes has been generated: {test_file}")

View file

@ -2,7 +2,7 @@ from typing import Any
import pytest import pytest
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper from frostfs_testlib.clients import AwsCliClient, Boto3ClientWrapper
from frostfs_testlib.storage.dataclasses.acl import EACLRole from frostfs_testlib.storage.dataclasses.acl import EACLRole
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.object_size import ObjectSize