Compare commits

...

24 commits

Author SHA1 Message Date
0a3de927a2 [#343] Extend testsuites for PATCH method
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-25 15:39:17 +00:00
9e3380d519 [#336] Refine CODEOWNERS settings
Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2024-12-25 09:53:48 +00:00
6e951443ed [#342] Remove try-catch from delete block
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-12-24 08:17:18 +00:00
0479701258 [#341] Add test for multipart object in Test_http_object testsuite
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-18 18:03:00 +03:00
dc5a9e7bb9 [#340] Move s3 and http directories to avoid conflict with requests
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-18 12:57:24 +03:00
335eed85b1 [#338] Added parameter word_count to method get_logs
Signed-off-by: Ilyas Niyazov <i.niyazov@yadro.com>
2024-12-17 14:25:10 +00:00
cc7bd4ffc9 [#339] Added ns args for func container create
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-12-17 13:55:15 +03:00
cd15be3b7c [#334] Automation of PATCH method in S3
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-13 10:53:02 +03:00
8ff1e72499 [#337] Add rule chain error
Signed-off-by: Ekaterina Chernitsyna <e.chernitsyna@yadro.com>
2024-12-13 10:45:14 +03:00
0ebb845329 [#335] Fixed iam boto3 client 2024-12-06 10:50:34 +03:00
ee7d9df4a9 [#333] Fix files param in http client part two
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 16:48:23 +03:00
61353cb38c [#332] Fix files param in http client
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 14:26:24 +03:00
b3d05c5c28 [#326] Automation of PATCH method in GRPC
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-05 10:28:06 +00:00
8ec7e21e84 [#331] Fix type hints for service methods
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-12-03 14:55:12 +03:00
0e040d2722 [#330] Improve CURL generation and fix Boto3 logging
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-02 15:54:38 +03:00
7d6768c83f [#325] Added get nns records method to frostfs-adm
Signed-off-by: Dmitry Anurin <danurin@yadro.com>
2024-11-29 10:21:41 +00:00
3dc7a5bdb0 [#328] Change logic activating split-brain
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-11-29 08:55:08 +00:00
24e1dfef28 [#324]Extension list_objects method 2024-11-26 07:37:56 +00:00
0c9660fffc [#323] Update APE related entities
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-20 17:14:33 +03:00
8eaa511e5c [#322] Added classmethod decorator in Http client
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-11-18 15:07:24 +00:00
a1953684b8 [#307] added methods for testing MFA 2024-11-18 07:08:42 +00:00
451de5e07e [#320] Added shards detach function
Signed-off-by: Dmitry Anurin <danurin@yadro.com>
2024-11-14 16:22:06 +03:00
f24bfc06fd [#319] Add cached fixture feature
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 17:46:03 +03:00
47bc11835b [#318] Add tombstone expiration test
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-13 10:11:03 +03:00
39 changed files with 971 additions and 218 deletions

View file

@ -1 +1,3 @@
* @JuliaKovshova @abereziny @d.zayakin @anikeev-yadro @anurindm @ylukoyan @i.niyazov .* @TrueCloudLab/qa-committers
.forgejo/.* @potyarkin
Makefile @potyarkin

View file

@ -62,7 +62,7 @@ authmate = "frostfs_testlib.credentials.authmate_s3_provider:AuthmateS3Credentia
wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider" wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider"
[project.entry-points."frostfs.testlib.bucket_cid_resolver"] [project.entry-points."frostfs.testlib.bucket_cid_resolver"]
frostfs = "frostfs_testlib.s3.curl_bucket_resolver:CurlBucketContainerResolver" frostfs = "frostfs_testlib.clients.s3.curl_bucket_resolver:CurlBucketContainerResolver"
[tool.isort] [tool.isort]
profile = "black" profile = "black"

View file

@ -463,3 +463,26 @@ class FrostfsAdmMorph(CliCommand):
"morph ape rm-rule-chain", "morph ape rm-rule-chain",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def get_nns_records(
self,
name: str,
type: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
alphabet_wallets: Optional[str] = None,
) -> CommandResult:
"""Returns domain record of the specified type
Args:
name: Domain name
type: Domain name service record type(A|CNAME|SOA|TXT)
rpc_endpoint: N3 RPC node endpoint
alphabet_wallets: path to alphabet wallets dir
Returns:
Command's result
"""
return self._execute(
"morph nns get-records",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -276,6 +276,54 @@ class FrostfsCliObject(CliCommand):
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def patch(
self,
rpc_endpoint: str,
cid: str,
oid: str,
range: list[str] = None,
payload: list[str] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
trace: bool = False,
ttl: Optional[int] = None,
wallet: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
PATCH an object.
Args:
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>')
cid: Container ID
oid: Object ID
range: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
payload: An array of file paths to be applied in each range
new_attrs: Attributes to be changed in the format Key1=Value1,Key2=Value2
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
address: Address of wallet account
bearer: File with signed JSON or binary encoded bearer token
generate_key: Generate new private key
session: Filepath to a JSON- or binary-encoded token of the object RANGE session
timeout: Timeout for the operation
trace: Generate trace ID and print it
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Dict with request X-Headers
Returns:
Command's result.
"""
return self._execute(
"object patch",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def range( def range(
self, self,
rpc_endpoint: str, rpc_endpoint: str,

View file

@ -241,3 +241,21 @@ class FrostfsCliShards(CliCommand):
"control shards evacuation status", "control shards evacuation status",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def detach(self, endpoint: str, address: Optional[str] = None, id: Optional[str] = None, timeout: Optional[str] = None):
"""
Detach and close the shards
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
id: List of shard IDs in base58 encoding
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards detach",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -0,0 +1,5 @@
from frostfs_testlib.clients.http.http_client import HttpClient
from frostfs_testlib.clients.s3.aws_cli_client import AwsCliClient
from frostfs_testlib.clients.s3.boto3_client import Boto3ClientWrapper
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper
from frostfs_testlib.clients.s3.s3_http_client import S3HttpClient

View file

@ -0,0 +1,144 @@
import io
import json
import logging
import logging.config
from typing import Mapping, Sequence
import httpx
from frostfs_testlib import reporter
timeout = httpx.Timeout(60, read=150)
LOGGING_CONFIG = {
"disable_existing_loggers": False,
"version": 1,
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
"formatters": {
"http": {
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"loggers": {
"httpx": {
"handlers": ["default"],
"level": "DEBUG",
},
"httpcore": {
"handlers": ["default"],
"level": "ERROR",
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("NeoLogger")
class HttpClient:
@reporter.step("Send {method} request to {url}")
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
transport = httpx.HTTPTransport(verify=False, retries=5)
client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs)
self._attach_response(response, **kwargs)
logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code:
assert (
response.status_code == expected_status_code
), f"Got {response.status_code} response code while {expected_status_code} expected"
return response
@classmethod
def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None:
try:
content = readable.read()
except Exception as e:
logger.warning(f"Unable to read file: {str(e)}")
return None
if not content:
return None
request_body = None
try:
request_body = json.loads(content)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Unable to convert body to json: {str(e)}")
if request_body is not None:
return json.dumps(request_body, default=str, indent=4)
try:
request_body = content.decode()
except UnicodeDecodeError as e:
logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}")
request_body = content if request_body is None else request_body
request_body = "<large text data>" if len(request_body) > 1000 else request_body
return request_body
@classmethod
def _parse_files(cls, files: Mapping | Sequence | None) -> dict:
filepaths = {}
if not files:
return filepaths
if isinstance(files, Sequence):
items = files
elif isinstance(files, Mapping):
items = files.items()
else:
raise TypeError(f"'files' must be either Sequence or Mapping, got: {type(files).__name__}")
for name, file in items:
if isinstance(file, io.IOBase):
filepaths[name] = file.name
elif isinstance(file, Sequence):
filepaths[name] = file[1].name
return filepaths
@classmethod
def _attach_response(cls, response: httpx.Response, **kwargs):
request = response.request
request_headers = json.dumps(dict(request.headers), default=str, indent=4)
request_body = cls._parse_body(request)
files = kwargs.get("files")
request_files = cls._parse_files(files)
response_headers = json.dumps(dict(response.headers), default=str, indent=4)
response_body = cls._parse_body(response)
report = (
f"Method: {request.method}\n\n"
+ f"URL: {request.url}\n\n"
+ f"Request Headers: {request_headers}\n\n"
+ (f"Request Body: {request_body}\n\n" if request_body else "")
+ (f"Request Files: {request_files}\n\n" if request_files else "")
+ f"Response Status Code: {response.status_code}\n\n"
+ f"Response Headers: {response_headers}\n\n"
+ (f"Response Body: {response_body}\n\n" if response_body else "")
)
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, request_files)
reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL")
@classmethod
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else ""
for name, path in files.items():
data += f' -F "{name}=@{path}"'
# Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -0,0 +1 @@
from frostfs_testlib.clients.s3.interfaces import BucketContainerResolver, S3ClientWrapper, VersioningStatus

View file

@ -6,8 +6,8 @@ from time import sleep
from typing import Literal, Optional, Union from typing import Literal, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.utils import string_utils from frostfs_testlib.utils import string_utils
@ -171,7 +171,7 @@ class AwsCliClient(S3ClientWrapper):
return response.get("TagSet") return response.get("TagSet")
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> dict:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
@ -179,8 +179,7 @@ class AwsCliClient(S3ClientWrapper):
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) return self._to_json(output)
return response.get("Grants")
@reporter.step("Get bucket location") @reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict: def get_bucket_location(self, bucket: str) -> dict:
@ -196,11 +195,20 @@ class AwsCliClient(S3ClientWrapper):
return response.get("LocationConstraint") return response.get("LocationConstraint")
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} "
if page_size:
cmd = cmd.replace("--no-paginate", "")
cmd += f" --page-size {page_size} "
if prefix:
cmd += f" --prefix {prefix}"
if self.profile:
cmd += f" --profile {self.profile} "
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -852,7 +860,7 @@ class AwsCliClient(S3ClientWrapper):
return response["Parts"] return response["Parts"]
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
@ -1440,3 +1448,90 @@ class AwsCliClient(S3ClientWrapper):
response = self._to_json(output) response = self._to_json(output)
return response return response
# MFA METHODS
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(self, virtual_mfa_device_name: str, outfile: str, bootstrap_method: str) -> tuple:
cmd = f"aws {self.common_flags} iam create-virtual-mfa-device --virtual-mfa-device-name {virtual_mfa_device_name}\
--outfile {outfile} --bootstrap-method {bootstrap_method} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
return serial_number, False
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam deactivate-mfa-device --user-name {user_name} --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
cmd = f"aws {self.common_flags} iam delete-virtual-mfa-device --serial-number {serial_number} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
cmd = f"aws {self.common_flags} iam enable-mfa-device --user-name {user_name} --serial-number {serial_number} --authentication-code1 {authentication_code1}\
--authentication-code2 {authentication_code2} --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
cmd = f"aws {self.common_flags} iam list-virtual-mfa-devices --endpoint {self.iam_endpoint}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
cmd = f"aws {self.common_flags} sts get-session-token --endpoint {self.iam_endpoint}"
if duration_seconds:
cmd += f" --duration-seconds {duration_seconds}"
if serial_number:
cmd += f" --serial-number {serial_number}"
if token_code:
cmd += f" --token-code {token_code}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

View file

@ -13,8 +13,8 @@ from botocore.exceptions import ClientError
from mypy_boto3_s3 import S3Client from mypy_boto3_s3 import S3Client
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.utils import string_utils from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run # TODO: Refactor this code to use shell instead of _cmd_run
@ -41,6 +41,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
self.boto3_iam_client: S3Client = None self.boto3_iam_client: S3Client = None
self.iam_endpoint: str = "" self.iam_endpoint: str = ""
self.boto3_sts_client: S3Client = None
self.access_key_id: str = access_key_id self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key self.secret_access_key: str = secret_access_key
self.profile = profile self.profile = profile
@ -84,9 +86,18 @@ class Boto3ClientWrapper(S3ClientWrapper):
service_name="iam", service_name="iam",
aws_access_key_id=self.access_key_id, aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key, aws_secret_access_key=self.secret_access_key,
region_name=self.region,
endpoint_url=self.iam_endpoint, endpoint_url=self.iam_endpoint,
verify=False, verify=False,
) )
# since the STS does not have an enpoint, IAM is used
self.boto3_sts_client = self.session.client(
service_name="sts",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint,
verify=False,
)
def _to_s3_param(self, param: str) -> str: def _to_s3_param(self, param: str) -> str:
replacement_map = { replacement_map = {
@ -219,14 +230,13 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response.get("TagSet") return response.get("TagSet")
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> dict:
response = self._exec_request( return self._exec_request(
self.boto3_client.get_bucket_acl, self.boto3_client.get_bucket_acl,
params={"Bucket": bucket}, params={"Bucket": bucket},
endpoint=self.s3gate_endpoint, endpoint=self.s3gate_endpoint,
profile=self.profile, profile=self.profile,
) )
return response.get("Grants")
@reporter.step("Delete bucket tagging") @reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None: def delete_bucket_tagging(self, bucket: str) -> None:
@ -388,10 +398,17 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response if full_output else obj_list return response if full_output else obj_list
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
params = {"Bucket": bucket}
if page_size:
params["MaxKeys"] = page_size
if prefix:
params["Prefix"] = prefix
response = self._exec_request( response = self._exec_request(
self.boto3_client.list_objects, self.boto3_client.list_objects,
params={"Bucket": bucket}, params,
endpoint=self.s3gate_endpoint, endpoint=self.s3gate_endpoint,
profile=self.profile, profile=self.profile,
) )
@ -687,7 +704,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response["Parts"] return response["Parts"]
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts] parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
params = self._convert_to_s3_params(locals(), exclude=["parts"]) params = self._convert_to_s3_params(locals(), exclude=["parts"])
params["MultipartUpload"] = {"Parts": parts} params["MultipartUpload"] = {"Parts": parts}
@ -1265,3 +1282,66 @@ class Boto3ClientWrapper(S3ClientWrapper):
endpoint=self.iam_endpoint, endpoint=self.iam_endpoint,
profile=self.profile, profile=self.profile,
) )
# MFA methods
@reporter.step("Creates a new virtual MFA device")
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
response = self.boto3_iam_client.create_virtual_mfa_device(VirtualMFADeviceName=virtual_mfa_device_name)
serial_number = response.get("VirtualMFADevice", {}).get("SerialNumber")
base32StringSeed = response.get("VirtualMFADevice", {}).get("Base32StringSeed")
assert serial_number, f"Expected SerialNumber in response:\n{response}"
assert base32StringSeed, f"Expected Base32StringSeed in response:\n{response}"
return serial_number, base32StringSeed
@reporter.step("Deactivates the specified MFA device and removes it from association with the user name")
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
response = self.boto3_iam_client.deactivate_mfa_device(UserName=user_name, SerialNumber=serial_number)
return response
@reporter.step("Deletes a virtual MFA device")
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
response = self.boto3_iam_client.delete_virtual_mfa_device(SerialNumber=serial_number)
return response
@reporter.step("Enables the specified MFA device and associates it with the specified IAM user")
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
response = self.boto3_iam_client.enable_mfa_device(
UserName=user_name,
SerialNumber=serial_number,
AuthenticationCode1=authentication_code1,
AuthenticationCode2=authentication_code2,
)
return response
@reporter.step("Lists the MFA devices for an IAM user")
def iam_list_virtual_mfa_devices(self) -> dict:
response = self.boto3_iam_client.list_virtual_mfa_devices()
assert response.get("VirtualMFADevices"), f"Expected VirtualMFADevices in response:\n{response}"
return response
@reporter.step("Get session token for user")
def sts_get_session_token(
self, duration_seconds: Optional[str] = "", serial_number: Optional[str] = "", token_code: Optional[str] = ""
) -> tuple:
response = self.boto3_sts_client.get_session_token(
DurationSeconds=duration_seconds,
SerialNumber=serial_number,
TokenCode=token_code,
)
access_key = response.get("Credentials", {}).get("AccessKeyId")
secret_access_key = response.get("Credentials", {}).get("SecretAccessKey")
session_token = response.get("Credentials", {}).get("SessionToken")
assert access_key, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
assert session_token, f"Expected SessionToken in response:\n{response}"
return access_key, secret_access_key, session_token

View file

@ -1,7 +1,7 @@
import re import re
from frostfs_testlib.cli.generic_cli import GenericCli from frostfs_testlib.cli.generic_cli import GenericCli
from frostfs_testlib.s3.interfaces import BucketContainerResolver from frostfs_testlib.clients.s3 import BucketContainerResolver
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode

View file

@ -128,7 +128,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Deletes the tags from the bucket.""" """Deletes the tags from the bucket."""
@abstractmethod @abstractmethod
def get_bucket_acl(self, bucket: str) -> list: def get_bucket_acl(self, bucket: str) -> dict:
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket.""" """This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
@abstractmethod @abstractmethod
@ -195,7 +195,9 @@ class S3ClientWrapper(HumanReadableABC):
""" """
@abstractmethod @abstractmethod
def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]: def list_objects(
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
"""Returns some or all (up to 1,000) of the objects in a bucket with each request. """Returns some or all (up to 1,000) of the objects in a bucket with each request.
You can use the request parameters as selection criteria to return a subset of the objects in a bucket. You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
A 200 OK response can contain valid or invalid XML. Make sure to design your application A 200 OK response can contain valid or invalid XML. Make sure to design your application
@ -334,7 +336,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Lists the parts that have been uploaded for a specific multipart upload.""" """Lists the parts that have been uploaded for a specific multipart upload."""
@abstractmethod @abstractmethod
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
"""Completes a multipart upload by assembling previously uploaded parts.""" """Completes a multipart upload by assembling previously uploaded parts."""
@abstractmethod @abstractmethod
@ -578,3 +580,32 @@ class S3ClientWrapper(HumanReadableABC):
@abstractmethod @abstractmethod
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict: def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
"""Removes the specified tags from the user""" """Removes the specified tags from the user"""
# MFA methods
@abstractmethod
def iam_create_virtual_mfa_device(
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
) -> tuple:
"""Creates a new virtual MFA device"""
@abstractmethod
def iam_deactivate_mfa_device(self, user_name: str, serial_number: str) -> dict:
"""Deactivates the specified MFA device and removes it from association with the user name"""
@abstractmethod
def iam_delete_virtual_mfa_device(self, serial_number: str) -> dict:
"""Deletes a virtual MFA device"""
@abstractmethod
def iam_enable_mfa_device(self, user_name: str, serial_number: str, authentication_code1: str, authentication_code2: str) -> dict:
"""Enables the specified MFA device and associates it with the specified IAM user"""
@abstractmethod
def iam_list_virtual_mfa_devices(self) -> dict:
"""Lists the MFA devices for an IAM user"""
@abstractmethod
def sts_get_session_token(
self, duration_seconds: Optional[str] = None, serial_number: Optional[str] = None, token_code: Optional[str] = None
) -> tuple:
"""Get session token for user"""

View file

@ -0,0 +1,149 @@
import hashlib
import logging
import xml.etree.ElementTree as ET
import httpx
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from frostfs_testlib import reporter
from frostfs_testlib.clients import HttpClient
from frostfs_testlib.utils.file_utils import TestFile
logger = logging.getLogger("NeoLogger")
DEFAULT_TIMEOUT = 60.0
class S3HttpClient:
def __init__(
self, s3gate_endpoint: str, access_key_id: str, secret_access_key: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.http_client = HttpClient()
self.credentials = Credentials(access_key_id, secret_access_key)
self.profile = profile
self.region = region
self.iam_endpoint: str = None
self.s3gate_endpoint: str = None
self.service: str = None
self.signature: SigV4Auth = None
self.set_endpoint(s3gate_endpoint)
def _to_s3_header(self, header: str) -> dict:
replacement_map = {
"Acl": "ACL",
"_": "-",
}
result = header
if not header.startswith("x_amz"):
result = header.title()
for find, replace in replacement_map.items():
result = result.replace(find, replace)
return result
def _convert_to_s3_headers(self, scope: dict, exclude: list[str] = None):
exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"]
return {self._to_s3_header(header): value for header, value in scope.items() if header not in exclude and value is not None}
def _create_aws_request(
self, method: str, url: str, headers: dict, content: str | bytes | TestFile = None, params: dict = None
) -> AWSRequest:
data = b""
if content is not None:
if isinstance(content, TestFile):
with open(content, "rb") as io_content:
data = io_content.read()
elif isinstance(content, str):
data = bytes(content, encoding="utf-8")
elif isinstance(content, bytes):
data = content
else:
raise TypeError(f"Content expected as a string, bytes or TestFile object, got: {content}")
headers["X-Amz-Content-SHA256"] = hashlib.sha256(data).hexdigest()
aws_request = AWSRequest(method, url, headers, data, params)
self.signature.add_auth(aws_request)
return aws_request
def _exec_request(
self,
method: str,
url: str,
headers: dict,
content: str | bytes | TestFile = None,
params: dict = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
aws_request = self._create_aws_request(method, url, headers, content, params)
response = self.http_client.send(
aws_request.method,
aws_request.url,
headers=dict(aws_request.headers),
data=aws_request.data,
params=aws_request.params,
timeout=timeout,
)
try:
response.raise_for_status()
except httpx.HTTPStatusError:
raise httpx.HTTPStatusError(response.text, request=response.request, response=response)
root = ET.fromstring(response.read())
data = {
"LastModified": root.find(".//LastModified").text,
"ETag": root.find(".//ETag").text,
}
if response.headers.get("x-amz-version-id"):
data["VersionId"] = response.headers.get("x-amz-version-id")
return data
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
def set_endpoint(self, s3gate_endpoint: str):
if self.s3gate_endpoint == s3gate_endpoint:
return
self.s3gate_endpoint = s3gate_endpoint
self.service = "s3"
self.signature = SigV4Auth(self.credentials, self.service, self.region)
@reporter.step("Set endpoint IAM to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str):
if self.iam_endpoint == iam_endpoint:
return
self.iam_endpoint = iam_endpoint
self.service = "iam"
self.signature = SigV4Auth(self.credentials, self.service, self.region)
@reporter.step("Patch object S3")
def patch_object(
self,
bucket: str,
key: str,
content: str | bytes | TestFile,
content_range: str,
version_id: str = None,
if_match: str = None,
if_unmodified_since: str = None,
x_amz_expected_bucket_owner: str = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
if content_range and not content_range.startswith("bytes"):
content_range = f"bytes {content_range}/*"
url = f"{self.s3gate_endpoint}/{bucket}/{key}"
headers = self._convert_to_s3_headers(locals(), exclude=["bucket", "key", "content", "version_id", "timeout"])
params = {"VersionId": version_id} if version_id is not None else None
return self._exec_request("PATCH", url, headers, content, params, timeout=timeout)

View file

@ -164,6 +164,9 @@ class DockerHost(Host):
return volume_path return volume_path
def send_signal_to_service(self, service_name: str, signal: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_metabase(self, service_name: str) -> None: def delete_metabase(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker") raise NotImplementedError("Not implemented for docker")
@ -247,6 +250,7 @@ class DockerHost(Host):
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None, priority: Optional[str] = None,
word_count: bool = None,
) -> str: ) -> str:
client = self._get_docker_client() client = self._get_docker_client()
filtered_logs = "" filtered_logs = ""

View file

@ -117,6 +117,17 @@ class Host(ABC):
service_name: Name of the service to stop. service_name: Name of the service to stop.
""" """
@abstractmethod
def send_signal_to_service(self, service_name: str, signal: str) -> None:
"""Send signal to service with specified name using kill -<signal>
The service must be hosted on this host.
Args:
service_name: Name of the service to stop.
signal: signal name. See kill -l to all names
"""
@abstractmethod @abstractmethod
def mask_service(self, service_name: str) -> None: def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it. """Prevent the service from start by any activity by masking it.
@ -313,6 +324,7 @@ class Host(ABC):
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None, priority: Optional[str] = None,
word_count: bool = None,
) -> str: ) -> str:
"""Get logs from host filtered by regex. """Get logs from host filtered by regex.
@ -323,6 +335,7 @@ class Host(ABC):
unit: required unit. unit: required unit.
priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher. priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher.
For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0. For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0.
word_count: output type, expected values: lines, bytes, json
Returns: Returns:
Found entries as str if any found. Found entries as str if any found.

View file

@ -1,95 +0,0 @@
import json
import logging
import logging.config
import httpx
from frostfs_testlib import reporter
timeout = httpx.Timeout(60, read=150)
LOGGING_CONFIG = {
"disable_existing_loggers": False,
"version": 1,
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
"formatters": {
"http": {
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"loggers": {
"httpx": {
"handlers": ["default"],
"level": "DEBUG",
},
"httpcore": {
"handlers": ["default"],
"level": "ERROR",
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("NeoLogger")
class HttpClient:
@reporter.step("Send {method} request to {url}")
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
transport = httpx.HTTPTransport(verify=False, retries=5)
client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs)
self._attach_response(response)
logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code:
assert response.status_code == expected_status_code, (
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
)
return response
def _attach_response(self, response: httpx.Response):
request = response.request
try:
request_headers = json.dumps(dict(request.headers), indent=4)
except json.JSONDecodeError:
request_headers = str(request.headers)
try:
request_body = request.read()
try:
request_body = request_body.decode("utf-8")
except UnicodeDecodeError as e:
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
except Exception as e:
request_body = f"Error reading request body: {str(e)}"
request_body = "" if request_body is None else request_body
try:
response_headers = json.dumps(dict(response.headers), indent=4)
except json.JSONDecodeError:
response_headers = str(response.headers)
report = (
f"Method: {request.method}\n\n"
f"URL: {request.url}\n\n"
f"Request Headers: {request_headers}\n\n"
f"Request Body: {request_body}\n\n"
f"Response Status Code: {response.status_code}\n\n"
f"Response Headers: {response_headers}\n\n"
f"Response Body: {response.text}\n\n"
)
curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body)
reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL")
def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else ""
# Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -53,3 +53,4 @@ HOSTING_CONFIG_FILE = os.getenv(
) )
MORE_LOG = os.getenv("MORE_LOG", "1") MORE_LOG = os.getenv("MORE_LOG", "1")
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"

View file

@ -9,6 +9,7 @@ OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed"
SESSION_NOT_FOUND = "code = 4096.*message = session token not found" SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
OUT_OF_RANGE = "code = 2053.*message = out of range" OUT_OF_RANGE = "code = 2053.*message = out of range"
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token" EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
ADD_CHAIN_ERROR = "code = 5120 message = apemanager access denied"
# TODO: Change to codes with message # TODO: Change to codes with message
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked" # OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed # LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
@ -27,6 +28,10 @@ S3_BUCKET_DOES_NOT_ALLOW_ACL = "The bucket does not allow ACLs"
S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema." S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema."
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied" RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied" # Errors from node missing reasons if request was forwarded. Commenting for now
# RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request"
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound" NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound" # Errors from node missing reasons if request was forwarded. Commenting for now
# NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request"

View file

@ -16,11 +16,10 @@ OPTIONAL_NODE_UNDER_LOAD = os.getenv("OPTIONAL_NODE_UNDER_LOAD")
OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true")) OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true"))
# Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped. # Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped.
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool( OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true"))
os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")
)
# Set this to False for disable autouse fixture like node healthcheck during developing time. # Set this to False for disable autouse fixture like node healthcheck during developing time.
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool( OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true"))
os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")
) # Use cache for fixtures with @cachec_fixture decorator
OPTIONAL_CACHE_FIXTURES = str_to_bool(os.getenv("OPTIONAL_CACHE_FIXTURES", "false"))

View file

@ -1,3 +0,0 @@
from frostfs_testlib.s3.aws_cli_client import AwsCliClient
from frostfs_testlib.s3.boto3_client import Boto3ClientWrapper
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus

View file

@ -7,9 +7,7 @@ from typing import Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
@ -111,6 +109,8 @@ def create_container(
options: Optional[dict] = None, options: Optional[dict] = None,
await_mode: bool = True, await_mode: bool = True,
wait_for_creation: bool = True, wait_for_creation: bool = True,
nns_zone: Optional[str] = None,
nns_name: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str: ) -> str:
""" """
@ -143,6 +143,8 @@ def create_container(
result = cli.container.create( result = cli.container.create(
rpc_endpoint=endpoint, rpc_endpoint=endpoint,
policy=rule, policy=rule,
nns_name=nns_name,
nns_zone=nns_zone,
basic_acl=basic_acl, basic_acl=basic_acl,
attributes=attributes, attributes=attributes,
name=name, name=name,

View file

@ -12,8 +12,8 @@ import requests
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli import GenericCli from frostfs_testlib.cli import GenericCli
from frostfs_testlib.clients.s3.aws_cli_client import command_options
from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE
from frostfs_testlib.s3.aws_cli_client import command_options
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.steps.cli.object import get_object from frostfs_testlib.steps.cli.object import get_object
@ -38,34 +38,34 @@ def get_via_http_gate(
""" """
This function gets given object from HTTP gate This function gets given object from HTTP gate
cid: container id to get object from cid: container id to get object from
oid: object ID oid: object id / object key
node: node to make request node: node to make request
request_path: (optional) http request, if ommited - use default [{endpoint}/get/{cid}/{oid}] request_path: (optional) http request, if ommited - use default [{endpoint}/get/{cid}/{oid}]
""" """
# if `request_path` parameter omitted, use default request = f"{node.http_gate.get_endpoint()}/get/{cid}/{oid}"
if request_path is None: if request_path:
request = f"{node.http_gate.get_endpoint()}/get/{cid}/{oid}"
else:
request = f"{node.http_gate.get_endpoint()}{request_path}" request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(request, stream=True, timeout=timeout, verify=False) response = requests.get(request, stream=True, timeout=timeout, verify=False)
if not resp.ok: if not response.ok:
raise Exception( raise Exception(
f"""Failed to get object via HTTP gate: f"""Failed to get object via HTTP gate:
request: {resp.request.path_url}, request: {response.request.path_url},
response: {resp.text}, response: {response.text},
headers: {resp.headers}, headers: {response.headers},
status code: {resp.status_code} {resp.reason}""" status code: {response.status_code} {response.reason}"""
) )
logger.info(f"Request: {request}") logger.info(f"Request: {request}")
_attach_allure_step(request, resp.status_code) _attach_allure_step(request, response.status_code)
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}")) test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}"))
with open(test_file, "wb") as file: with open(test_file, "wb") as file:
shutil.copyfileobj(resp.raw, file) for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
return test_file return test_file
@ -117,12 +117,12 @@ def get_via_http_gate_by_attribute(
endpoint: http gate endpoint endpoint: http gate endpoint
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}] request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
""" """
attr_name = list(attribute.keys())[0] attr_name = list(attribute.keys())[0]
attr_value = quote_plus(str(attribute.get(attr_name))) attr_value = quote_plus(str(attribute.get(attr_name)))
# if `request_path` parameter ommited, use default
if request_path is None: request = f"{node.http_gate.get_endpoint()}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}"
request = f"{node.http_gate.get_endpoint()}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}" if request_path:
else:
request = f"{node.http_gate.get_endpoint()}{request_path}" request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(request, stream=True, timeout=timeout, verify=False) resp = requests.get(request, stream=True, timeout=timeout, verify=False)
@ -357,19 +357,9 @@ def try_to_get_object_via_passed_request_and_expect_error(
) -> None: ) -> None:
try: try:
if attrs is None: if attrs is None:
get_via_http_gate( get_via_http_gate(cid, oid, node, http_request_path)
cid=cid,
oid=oid,
node=node,
request_path=http_request_path,
)
else: else:
get_via_http_gate_by_attribute( get_via_http_gate_by_attribute(cid, attrs, node, http_request_path)
cid=cid,
attribute=attrs,
node=node,
request_path=http_request_path,
)
raise AssertionError(f"Expected error on getting object with cid: {cid}") raise AssertionError(f"Expected error on getting object with cid: {cid}")
except Exception as err: except Exception as err:
match = error_pattern.casefold() in str(err).casefold() match = error_pattern.casefold() in str(err).casefold()

View file

@ -4,16 +4,18 @@ from frostfs_testlib.storage.cluster import ClusterNode
class IpHelper: class IpHelper:
@staticmethod @staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None: def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[tuple]) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
for ip in block_ip: for ip, table in block_ip:
shell.exec(f"ip route add blackhole {ip}") if not table:
shell.exec(f"ip r a blackhole {ip}")
continue
shell.exec(f"ip r a blackhole {ip} table {table}")
@staticmethod @staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None: def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False)) unlock_ip = shell.exec("ip r l table all | grep blackhole", CommandOptions(check=False)).stdout
if unlock_ip.return_code != 0:
return for active_blackhole in unlock_ip.strip().split("\n"):
for ip in unlock_ip.stdout.strip().split("\n"): shell.exec(f"ip r d {active_blackhole}")
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")

View file

@ -6,8 +6,7 @@ from typing import Optional
from dateutil.parser import parse from dateutil.parser import parse
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus from frostfs_testlib.clients.s3 import BucketContainerResolver, S3ClientWrapper, VersioningStatus
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import search_nodes_with_container from frostfs_testlib.steps.cli.container import search_nodes_with_container
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
@ -185,3 +184,26 @@ def search_nodes_with_bucket(
break break
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster) nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
return nodes_list return nodes_list
def get_bytes_relative_to_object(value: int | str, object_size: int = None, part_size: int = None) -> int:
if isinstance(value, int):
return value
if "part" not in value and "object" not in value:
return int(value)
if object_size is not None:
value = value.replace("object", str(object_size))
if part_size is not None:
value = value.replace("part", str(part_size))
return int(eval(value))
def get_range_relative_to_object(rng: str, object_size: int = None, part_size: int = None, int_values: bool = False) -> str | int:
start, end = rng.split(":")
start = get_bytes_relative_to_object(start, object_size, part_size)
end = get_bytes_relative_to_object(end, object_size, part_size)
return (start, end) if int_values else f"bytes {start}-{end}/*"

View file

@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.metrics import Metrics
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry from frostfs_testlib.storage.service_registry import ServiceRegistry
from frostfs_testlib.storage.dataclasses.metrics import Metrics
class ClusterNode: class ClusterNode:

View file

@ -23,4 +23,6 @@ class PlacementRule:
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X" DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X" SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X" REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
REP_1_FOR_2_NODES_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 2 FROM * AS X"
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1" DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
EC_1_1_FOR_2_NODES_PLACEMENT_RULE = "EC 1.1 IN X CBF 1 SELECT 2 FROM * AS X"

View file

@ -1,4 +1,5 @@
import datetime import datetime
import itertools
import logging import logging
import time import time
from typing import TypeVar from typing import TypeVar
@ -39,7 +40,7 @@ class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: list[ClusterNode] = [] self.dropped_traffic: set[ClusterNode] = set()
self.excluded_from_netmap: list[StorageNode] = [] self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
@ -172,6 +173,15 @@ class ClusterStateController:
if service_type == StorageNode: if service_type == StorageNode:
self.wait_after_storage_startup() self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to all {service_type} services")
def sighup_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
if service_type == StorageNode:
self.wait_after_storage_startup()
@wait_for_success(600, 60) @wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate): def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"): with reporter.step(f"Wait for {s3gate} reconnection"):
@ -206,21 +216,27 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Stop {service_type} service on {node}") @reporter.step("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True): def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True):
service = node.service(service_type) service = node.service(service_type)
service.stop_service(mask) service.stop_service(mask)
self.stopped_services.add(service) self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to {service_type} service on {node}")
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type)
service.send_signal_to_service("SIGHUP")
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start {service_type} service on {node}") @reporter.step("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type) service = node.service(service_type)
service.start_service() service.start_service()
self.stopped_services.discard(service) self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start all stopped {service_type} services") @reporter.step("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: type[ServiceClass]): def start_stopped_services_of_type(self, service_type: ServiceClass):
stopped_svc = self._get_stopped_by_type(service_type) stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc: if not stopped_svc:
return return
@ -310,22 +326,22 @@ class ClusterStateController:
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None: def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
list_ip = self._parse_interfaces(block_nodes, name_interface) interfaces_tables = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, list_ip) IpHelper.drop_input_traffic_to_node(node, interfaces_tables)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.append(node) self.dropped_traffic.add(node)
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic(self, node: ClusterNode) -> None: def restore_traffic(self, node: ClusterNode) -> None:
IpHelper.restore_input_traffic_to_node(node=node) IpHelper.restore_input_traffic_to_node(node=node)
index = self.dropped_traffic.index(node) self.dropped_traffic.discard(node)
self.dropped_traffic.pop(index)
@reporter.step("Restore blocked nodes") @reporter.step("Restore blocked nodes")
def restore_all_traffic(self): def restore_all_traffic(self):
if not self.dropped_traffic: if not self.dropped_traffic:
return return
parallel(self._restore_traffic_to_node, self.dropped_traffic) parallel(self._restore_traffic_to_node, self.dropped_traffic)
self.dropped_traffic.clear()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Hard reboot host {node} via magic SysRq option") @reporter.step("Hard reboot host {node} via magic SysRq option")
@ -501,17 +517,31 @@ class ClusterStateController:
return disk_controller return disk_controller
@reporter.step("Restore traffic {node}")
def _restore_traffic_to_node(self, node): def _restore_traffic_to_node(self, node):
IpHelper.restore_input_traffic_to_node(node) IpHelper.restore_input_traffic_to_node(node)
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str): def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str) -> list[tuple]:
interfaces = [] interfaces_and_tables = set()
for node in nodes: for node in nodes:
dict_interfaces = node.host.config.interfaces shell = node.host.get_shell()
for type, ip in dict_interfaces.items(): lines = shell.exec(f"ip r l table all | grep '{name_interface}'").stdout.splitlines()
if name_interface in type:
interfaces.append(ip) ips = []
return interfaces tables = []
for line in lines:
if "src" not in line or "table local" in line:
continue
parts = line.split()
ips.append(parts[-1])
if "table" in line:
tables.append(parts[parts.index("table") + 1])
tables.append(None)
[interfaces_and_tables.add((ip, table)) for ip, table in itertools.product(ips, tables)]
return interfaces_and_tables
@reporter.step("Ping node") @reporter.step("Ping node")
def _ping_host(self, node: ClusterNode): def _ping_host(self, node: ClusterNode):

View file

@ -14,14 +14,19 @@ class ConfigStateManager(StateManager):
self.cluster = self.csc.cluster self.cluster = self.csc.cluster
@reporter.step("Change configuration for {service_type} on all nodes") @reporter.step("Change configuration for {service_type} on all nodes")
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
nodes = self.cluster.nodes(services) nodes = self.cluster.nodes(services)
self.services_with_changed_config.update([(node, service_type) for node in nodes]) self.services_with_changed_config.update([(node, service_type) for node in nodes])
self.csc.stop_services_of_type(service_type) if not sighup:
self.csc.stop_services_of_type(service_type)
parallel([node.config(service_type).set for node in nodes], values=values) parallel([node.config(service_type).set for node in nodes], values=values)
self.csc.start_services_of_type(service_type) if not sighup:
self.csc.start_services_of_type(service_type)
else:
self.csc.sighup_services_of_type(service_type)
@reporter.step("Change configuration for {service_type} on {node}") @reporter.step("Change configuration for {service_type} on {node}")
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
@ -32,18 +37,26 @@ class ConfigStateManager(StateManager):
self.csc.start_service_of_type(node, service_type) self.csc.start_service_of_type(node, service_type)
@reporter.step("Revert all configuration changes") @reporter.step("Revert all configuration changes")
def revert_all(self): def revert_all(self, sighup: bool = False):
if not self.services_with_changed_config: if not self.services_with_changed_config:
return return
parallel(self._revert_svc, self.services_with_changed_config) parallel(self._revert_svc, self.services_with_changed_config, sighup)
self.services_with_changed_config.clear() self.services_with_changed_config.clear()
self.csc.start_all_stopped_services() if not sighup:
self.csc.start_all_stopped_services()
# TODO: parallel can't have multiple parallel_items :( # TODO: parallel can't have multiple parallel_items :(
@reporter.step("Revert all configuration {node_and_service}") @reporter.step("Revert all configuration {node_and_service}")
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]): def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False):
node, service_type = node_and_service node, service_type = node_and_service
self.csc.stop_service_of_type(node, service_type) service = node.service(service_type)
if not sighup:
self.csc.stop_service_of_type(node, service_type)
node.config(service_type).revert() node.config(service_type).revert()
if sighup:
service.send_signal_to_service("SIGHUP")

View file

@ -13,6 +13,7 @@ FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
class ObjectOperations(HumanReadableEnum): class ObjectOperations(HumanReadableEnum):
PUT = "object.put" PUT = "object.put"
PATCH = "object.patch"
GET = "object.get" GET = "object.get"
HEAD = "object.head" HEAD = "object.head"
GET_RANGE = "object.range" GET_RANGE = "object.range"
@ -26,6 +27,18 @@ class ObjectOperations(HumanReadableEnum):
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL] return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
class ContainerOperations(HumanReadableEnum):
PUT = "container.put"
GET = "container.get"
LIST = "container.list"
DELETE = "container.delete"
WILDCARD_ALL = "container.*"
@staticmethod
def get_all():
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
@dataclass @dataclass
class Operations: class Operations:
GET_CONTAINER = "GetContainer" GET_CONTAINER = "GetContainer"
@ -39,6 +52,7 @@ class Operations:
SEARCH_OBJECT = "SearchObject" SEARCH_OBJECT = "SearchObject"
HEAD_OBJECT = "HeadObject" HEAD_OBJECT = "HeadObject"
PUT_OBJECT = "PutObject" PUT_OBJECT = "PutObject"
PATCH_OBJECT = "PatchObject"
class Verb(HumanReadableEnum): class Verb(HumanReadableEnum):
@ -124,7 +138,7 @@ class Rule:
if not operations: if not operations:
self.operations = [] self.operations = []
elif isinstance(operations, ObjectOperations): elif isinstance(operations, (ObjectOperations, ContainerOperations)):
self.operations = [operations] self.operations = [operations]
else: else:
self.operations = operations self.operations = operations

View file

@ -65,6 +65,10 @@ class NodeBase(HumanReadableABC):
with reporter.step(f"Start {self.name} service on {self.host.config.address}"): with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name) self.host.start_service(self.name)
def send_signal_to_service(self, signal: str):
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
self.host.send_signal_to_service(self.name, signal)
@abstractmethod @abstractmethod
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
@ -185,9 +189,7 @@ class NodeBase(HumanReadableABC):
if attribute_name not in config.attributes: if attribute_name not in config.attributes:
if default_attribute_name is None: if default_attribute_name is None:
raise RuntimeError( raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either")
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
)
return config.attributes[default_attribute_name] return config.attributes[default_attribute_name]
@ -197,9 +199,7 @@ class NodeBase(HumanReadableABC):
return self.host.get_service_config(self.name) return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime: def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec( result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2")
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip()) start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc) current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time active_time = current_time - start_time

View file

@ -5,9 +5,9 @@ from typing import List, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.clients.s3 import BucketContainerResolver
from frostfs_testlib.plugins import load_plugin from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils from frostfs_testlib.utils import json_utils
@ -181,20 +181,17 @@ class ContainerOperations(interfaces.ContainerInterface):
force: bool = False, force: bool = False,
trace: bool = False, trace: bool = False,
): ):
try: return self.cli.container.delete(
return self.cli.container.delete( rpc_endpoint=endpoint,
rpc_endpoint=endpoint, cid=cid,
cid=cid, address=address,
address=address, await_mode=await_mode,
await_mode=await_mode, session=session,
session=session, ttl=ttl,
ttl=ttl, xhdr=xhdr,
xhdr=xhdr, force=force,
force=force, trace=trace,
trace=trace, ).stdout
).stdout
except RuntimeError as e:
print(f"Error request:\n{e}")
@reporter.step("Get container") @reporter.step("Get container")
def get( def get(

View file

@ -206,6 +206,11 @@ class ObjectOperations(interfaces.ObjectInterface):
hash_type=hash_type, hash_type=hash_type,
timeout=timeout, timeout=timeout,
) )
if range:
# Cut off the range and return only hash
return result.stdout.split(":")[1].strip()
return result.stdout return result.stdout
@reporter.step("Head object") @reporter.step("Head object")
@ -407,6 +412,57 @@ class ObjectOperations(interfaces.ObjectInterface):
oid = id_str.split(":")[1] oid = id_str.split(":")[1]
return oid.strip() return oid.strip()
@reporter.step("Patch object")
def patch(
self,
cid: str,
oid: str,
endpoint: str,
ranges: list[str] = None,
payloads: list[str] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
trace: bool = False,
) -> str:
"""
PATCH an object.
Args:
cid: ID of Container where we get the Object from
oid: Object ID
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
ranges: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
payloads: An array of file paths to be applied in each range
new_attrs: Attributes to be changed in the format "key1=value1,key2=value2"
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
bearer: Path to Bearer Token file, appends to `--bearer` key
xhdr: Request X-Headers in form of Key=Value
session: Path to a JSON-encoded container session token
timeout: Timeout for the operation
trace: Generate trace ID and print it
Returns:
(str): ID of patched Object
"""
result = self.cli.object.patch(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
range=ranges,
payload=payloads,
new_attrs=new_attrs,
replace_attrs=replace_attrs,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
trace=trace,
)
return result.stdout.split(":")[1].strip()
@reporter.step("Put object to random node") @reporter.step("Put object to random node")
def put_to_random_node( def put_to_random_node(
self, self,
@ -622,3 +678,30 @@ class ObjectOperations(interfaces.ObjectInterface):
] ]
return object_nodes return object_nodes
@reporter.step("Search parts of object")
def parts(
self,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[str]:
endpoint = alive_node.storage_node.get_rpc_endpoint()
response = self.cli.object.nodes(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
json=True,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
response_json = json.loads(response.stdout)
return [data_object["object_id"] for data_object in response_json["data_objects"]]

View file

@ -198,6 +198,24 @@ class ObjectInterface(ABC):
) -> str: ) -> str:
pass pass
@abstractmethod
def patch(
self,
cid: str,
oid: str,
endpoint: str,
ranges: Optional[list[str]] = None,
payloads: Optional[list[str]] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
bearer: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
trace: bool = False,
) -> str:
pass
@abstractmethod @abstractmethod
def put_to_random_node( def put_to_random_node(
self, self,
@ -264,6 +282,20 @@ class ObjectInterface(ABC):
) -> List[ClusterNode]: ) -> List[ClusterNode]:
pass pass
@abstractmethod
def parts(
self,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = None,
) -> List[str]:
pass
class ContainerInterface(ABC): class ContainerInterface(ABC):
@abstractmethod @abstractmethod

View file

@ -1,13 +1,16 @@
import inspect import inspect
import logging import logging
import os
from functools import wraps from functools import wraps
from time import sleep, time from time import sleep, time
from typing import Any from typing import Any
import yaml
from _pytest.outcomes import Failed from _pytest.outcomes import Failed
from pytest import fail from pytest import fail
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.utils.func_utils import format_by_args from frostfs_testlib.utils.func_utils import format_by_args
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -128,6 +131,42 @@ def run_optionally(enabled: bool, mock_value: Any = True):
return deco return deco
def cached_fixture(enabled: bool):
"""
Decorator to cache fixtures.
MUST be placed after @pytest.fixture and before @allure decorators.
Args:
enabled: if true, decorated func will be cached.
"""
def deco(func):
@wraps(func)
def func_impl(*a, **kw):
# TODO: *a and *kw should be parsed to some kind of hashsum and used in filename to prevent cache load from different parameters
cache_file = os.path.join(ASSETS_DIR, f"fixture_cache_{func.__name__}.yml")
if enabled and os.path.exists(cache_file):
with open(cache_file, "r") as cache_input:
return yaml.load(cache_input, Loader=yaml.Loader)
result = func(*a, **kw)
if enabled:
with open(cache_file, "w") as cache_output:
yaml.dump(result, cache_output)
return result
# TODO: cache yielding fixtures
@wraps(func)
def gen_impl(*a, **kw):
raise NotImplementedError("Not implemented for yielding fixtures")
return gen_impl if inspect.isgeneratorfunction(func) else func_impl
return deco
def wait_for_success( def wait_for_success(
max_wait_time: int = 60, max_wait_time: int = 60,
interval: int = 1, interval: int = 1,

View file

@ -80,6 +80,9 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d
if not params: if not params:
params = {} params = {}
if params.get("Body") and len(params.get("Body")) > 1000:
params["Body"] = "<large text data>"
output_params = params output_params = params
try: try:

View file

@ -45,7 +45,7 @@ def ensure_directory_opener(path, flags):
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps # TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
# Use object_size dt in future as argument # Use object_size dt in future as argument
@reporter.step("Generate file") @reporter.step("Generate file")
def generate_file(size: int) -> TestFile: def generate_file(size: int, file_name: Optional[str] = None) -> TestFile:
"""Generates a binary file with the specified size in bytes. """Generates a binary file with the specified size in bytes.
Args: Args:
@ -54,7 +54,11 @@ def generate_file(size: int) -> TestFile:
Returns: Returns:
The path to the generated file. The path to the generated file.
""" """
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
if file_name is None:
file_name = string_utils.unique_name("object-")
test_file = TestFile(os.path.join(ASSETS_DIR, file_name))
with open(test_file, "wb", opener=ensure_directory_opener) as file: with open(test_file, "wb", opener=ensure_directory_opener) as file:
file.write(os.urandom(size)) file.write(os.urandom(size))
logger.info(f"File with size {size} bytes has been generated: {test_file}") logger.info(f"File with size {size} bytes has been generated: {test_file}")

View file

@ -2,7 +2,7 @@ from typing import Any
import pytest import pytest
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper from frostfs_testlib.clients import AwsCliClient, Boto3ClientWrapper
from frostfs_testlib.storage.dataclasses.acl import EACLRole from frostfs_testlib.storage.dataclasses.acl import EACLRole
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.object_size import ObjectSize