Compare commits

..

1 commit

Author SHA1 Message Date
2089578f94 [#307] added methods for testing MFA
Some checks failed
DCO action / DCO (pull_request) Has been cancelled
2024-11-11 16:24:27 +03:00
40 changed files with 132 additions and 834 deletions

View file

@ -1,3 +1 @@
.* @TrueCloudLab/qa-committers * @JuliaKovshova @abereziny @d.zayakin @anikeev-yadro @anurindm @ylukoyan @i.niyazov
.forgejo/.* @potyarkin
Makefile @potyarkin

View file

@ -62,7 +62,7 @@ authmate = "frostfs_testlib.credentials.authmate_s3_provider:AuthmateS3Credentia
wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider" wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider"
[project.entry-points."frostfs.testlib.bucket_cid_resolver"] [project.entry-points."frostfs.testlib.bucket_cid_resolver"]
frostfs = "frostfs_testlib.clients.s3.curl_bucket_resolver:CurlBucketContainerResolver" frostfs = "frostfs_testlib.s3.curl_bucket_resolver:CurlBucketContainerResolver"
[tool.isort] [tool.isort]
profile = "black" profile = "black"

View file

@ -353,7 +353,6 @@ class FrostfsAdmMorph(CliCommand):
rule: Optional[list[str]] = None, rule: Optional[list[str]] = None,
path: Optional[str] = None, path: Optional[str] = None,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -384,7 +383,6 @@ class FrostfsAdmMorph(CliCommand):
target_name: str, target_name: str,
target_type: str, target_type: str,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -412,7 +410,6 @@ class FrostfsAdmMorph(CliCommand):
target_type: str, target_type: str,
target_name: Optional[str] = None, target_name: Optional[str] = None,
rpc_endpoint: Optional[str] = None, rpc_endpoint: Optional[str] = None,
chain_name: Optional[str] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
timeout: Optional[str] = None, timeout: Optional[str] = None,
@ -439,7 +436,6 @@ class FrostfsAdmMorph(CliCommand):
target_name: str, target_name: str,
target_type: str, target_type: str,
all: Optional[bool] = None, all: Optional[bool] = None,
chain_name: Optional[str] = None,
chain_id_hex: Optional[bool] = None, chain_id_hex: Optional[bool] = None,
wallet: Optional[str] = None, wallet: Optional[str] = None,
address: Optional[str] = None, address: Optional[str] = None,
@ -463,26 +459,3 @@ class FrostfsAdmMorph(CliCommand):
"morph ape rm-rule-chain", "morph ape rm-rule-chain",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def get_nns_records(
self,
name: str,
type: Optional[str] = None,
rpc_endpoint: Optional[str] = None,
alphabet_wallets: Optional[str] = None,
) -> CommandResult:
"""Returns domain record of the specified type
Args:
name: Domain name
type: Domain name service record type(A|CNAME|SOA|TXT)
rpc_endpoint: N3 RPC node endpoint
alphabet_wallets: path to alphabet wallets dir
Returns:
Command's result
"""
return self._execute(
"morph nns get-records",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -276,54 +276,6 @@ class FrostfsCliObject(CliCommand):
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def patch(
self,
rpc_endpoint: str,
cid: str,
oid: str,
range: list[str] = None,
payload: list[str] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional[bool] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
trace: bool = False,
ttl: Optional[int] = None,
wallet: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> CommandResult:
"""
PATCH an object.
Args:
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>')
cid: Container ID
oid: Object ID
range: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
payload: An array of file paths to be applied in each range
new_attrs: Attributes to be changed in the format Key1=Value1,Key2=Value2
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
address: Address of wallet account
bearer: File with signed JSON or binary encoded bearer token
generate_key: Generate new private key
session: Filepath to a JSON- or binary-encoded token of the object RANGE session
timeout: Timeout for the operation
trace: Generate trace ID and print it
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Dict with request X-Headers
Returns:
Command's result.
"""
return self._execute(
"object patch",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def range( def range(
self, self,
rpc_endpoint: str, rpc_endpoint: str,

View file

@ -241,21 +241,3 @@ class FrostfsCliShards(CliCommand):
"control shards evacuation status", "control shards evacuation status",
**{param: value for param, value in locals().items() if param not in ["self"]}, **{param: value for param, value in locals().items() if param not in ["self"]},
) )
def detach(self, endpoint: str, address: Optional[str] = None, id: Optional[str] = None, timeout: Optional[str] = None):
"""
Detach and close the shards
Args:
address: Address of wallet account
endpoint: Remote node control address (as 'multiaddr' or '<host>:<port>')
id: List of shard IDs in base58 encoding
timeout: Timeout for an operation (default 15s)
Returns:
Command's result.
"""
return self._execute(
"control shards detach",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -1,7 +1,7 @@
import re import re
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeNetInfo, NodeNetmapInfo, NodeStatus from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo, NodeStatus
class NetmapParser: class NetmapParser:
@ -85,7 +85,7 @@ class NetmapParser:
@staticmethod @staticmethod
def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None: def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None:
snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output) snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output)
snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.get_interface(Interfaces.MGMT)] snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.host_ip]
if not snapshot_node: if not snapshot_node:
return None return None
return snapshot_node[0] return snapshot_node[0]

View file

@ -1,5 +0,0 @@
from frostfs_testlib.clients.http.http_client import HttpClient
from frostfs_testlib.clients.s3.aws_cli_client import AwsCliClient
from frostfs_testlib.clients.s3.boto3_client import Boto3ClientWrapper
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper
from frostfs_testlib.clients.s3.s3_http_client import S3HttpClient

View file

@ -1,144 +0,0 @@
import io
import json
import logging
import logging.config
from typing import Mapping, Sequence
import httpx
from frostfs_testlib import reporter
timeout = httpx.Timeout(60, read=150)
LOGGING_CONFIG = {
"disable_existing_loggers": False,
"version": 1,
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
"formatters": {
"http": {
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"loggers": {
"httpx": {
"handlers": ["default"],
"level": "DEBUG",
},
"httpcore": {
"handlers": ["default"],
"level": "ERROR",
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("NeoLogger")
class HttpClient:
@reporter.step("Send {method} request to {url}")
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
transport = httpx.HTTPTransport(verify=False, retries=5)
client = httpx.Client(timeout=timeout, transport=transport)
response = client.request(method, url, **kwargs)
self._attach_response(response, **kwargs)
logger.info(f"Response: {response.status_code} => {response.text}")
if expected_status_code:
assert (
response.status_code == expected_status_code
), f"Got {response.status_code} response code while {expected_status_code} expected"
return response
@classmethod
def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None:
try:
content = readable.read()
except Exception as e:
logger.warning(f"Unable to read file: {str(e)}")
return None
if not content:
return None
request_body = None
try:
request_body = json.loads(content)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Unable to convert body to json: {str(e)}")
if request_body is not None:
return json.dumps(request_body, default=str, indent=4)
try:
request_body = content.decode()
except UnicodeDecodeError as e:
logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}")
request_body = content if request_body is None else request_body
request_body = "<large text data>" if len(request_body) > 1000 else request_body
return request_body
@classmethod
def _parse_files(cls, files: Mapping | Sequence | None) -> dict:
filepaths = {}
if not files:
return filepaths
if isinstance(files, Sequence):
items = files
elif isinstance(files, Mapping):
items = files.items()
else:
raise TypeError(f"'files' must be either Sequence or Mapping, got: {type(files).__name__}")
for name, file in items:
if isinstance(file, io.IOBase):
filepaths[name] = file.name
elif isinstance(file, Sequence):
filepaths[name] = file[1].name
return filepaths
@classmethod
def _attach_response(cls, response: httpx.Response, **kwargs):
request = response.request
request_headers = json.dumps(dict(request.headers), default=str, indent=4)
request_body = cls._parse_body(request)
files = kwargs.get("files")
request_files = cls._parse_files(files)
response_headers = json.dumps(dict(response.headers), default=str, indent=4)
response_body = cls._parse_body(response)
report = (
f"Method: {request.method}\n\n"
+ f"URL: {request.url}\n\n"
+ f"Request Headers: {request_headers}\n\n"
+ (f"Request Body: {request_body}\n\n" if request_body else "")
+ (f"Request Files: {request_files}\n\n" if request_files else "")
+ f"Response Status Code: {response.status_code}\n\n"
+ f"Response Headers: {response_headers}\n\n"
+ (f"Response Body: {response_body}\n\n" if response_body else "")
)
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, request_files)
reporter.attach(report, "Requests Info")
reporter.attach(curl_request, "CURL")
@classmethod
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict) -> str:
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
data = f" -d '{data}'" if data else ""
for name, path in files.items():
data += f' -F "{name}=@{path}"'
# Option -k means no verify SSL
return f"curl {url} -X {method} {headers}{data} -k"

View file

@ -1 +0,0 @@
from frostfs_testlib.clients.s3.interfaces import BucketContainerResolver, S3ClientWrapper, VersioningStatus

View file

@ -1,149 +0,0 @@
import hashlib
import logging
import xml.etree.ElementTree as ET
import httpx
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from frostfs_testlib import reporter
from frostfs_testlib.clients import HttpClient
from frostfs_testlib.utils.file_utils import TestFile
logger = logging.getLogger("NeoLogger")
DEFAULT_TIMEOUT = 60.0
class S3HttpClient:
def __init__(
self, s3gate_endpoint: str, access_key_id: str, secret_access_key: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.http_client = HttpClient()
self.credentials = Credentials(access_key_id, secret_access_key)
self.profile = profile
self.region = region
self.iam_endpoint: str = None
self.s3gate_endpoint: str = None
self.service: str = None
self.signature: SigV4Auth = None
self.set_endpoint(s3gate_endpoint)
def _to_s3_header(self, header: str) -> dict:
replacement_map = {
"Acl": "ACL",
"_": "-",
}
result = header
if not header.startswith("x_amz"):
result = header.title()
for find, replace in replacement_map.items():
result = result.replace(find, replace)
return result
def _convert_to_s3_headers(self, scope: dict, exclude: list[str] = None):
exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"]
return {self._to_s3_header(header): value for header, value in scope.items() if header not in exclude and value is not None}
def _create_aws_request(
self, method: str, url: str, headers: dict, content: str | bytes | TestFile = None, params: dict = None
) -> AWSRequest:
data = b""
if content is not None:
if isinstance(content, TestFile):
with open(content, "rb") as io_content:
data = io_content.read()
elif isinstance(content, str):
data = bytes(content, encoding="utf-8")
elif isinstance(content, bytes):
data = content
else:
raise TypeError(f"Content expected as a string, bytes or TestFile object, got: {content}")
headers["X-Amz-Content-SHA256"] = hashlib.sha256(data).hexdigest()
aws_request = AWSRequest(method, url, headers, data, params)
self.signature.add_auth(aws_request)
return aws_request
def _exec_request(
self,
method: str,
url: str,
headers: dict,
content: str | bytes | TestFile = None,
params: dict = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
aws_request = self._create_aws_request(method, url, headers, content, params)
response = self.http_client.send(
aws_request.method,
aws_request.url,
headers=dict(aws_request.headers),
data=aws_request.data,
params=aws_request.params,
timeout=timeout,
)
try:
response.raise_for_status()
except httpx.HTTPStatusError:
raise httpx.HTTPStatusError(response.text, request=response.request, response=response)
root = ET.fromstring(response.read())
data = {
"LastModified": root.find(".//LastModified").text,
"ETag": root.find(".//ETag").text,
}
if response.headers.get("x-amz-version-id"):
data["VersionId"] = response.headers.get("x-amz-version-id")
return data
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
def set_endpoint(self, s3gate_endpoint: str):
if self.s3gate_endpoint == s3gate_endpoint:
return
self.s3gate_endpoint = s3gate_endpoint
self.service = "s3"
self.signature = SigV4Auth(self.credentials, self.service, self.region)
@reporter.step("Set endpoint IAM to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str):
if self.iam_endpoint == iam_endpoint:
return
self.iam_endpoint = iam_endpoint
self.service = "iam"
self.signature = SigV4Auth(self.credentials, self.service, self.region)
@reporter.step("Patch object S3")
def patch_object(
self,
bucket: str,
key: str,
content: str | bytes | TestFile,
content_range: str,
version_id: str = None,
if_match: str = None,
if_unmodified_since: str = None,
x_amz_expected_bucket_owner: str = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
if content_range and not content_range.startswith("bytes"):
content_range = f"bytes {content_range}/*"
url = f"{self.s3gate_endpoint}/{bucket}/{key}"
headers = self._convert_to_s3_headers(locals(), exclude=["bucket", "key", "content", "version_id", "timeout"])
params = {"VersionId": version_id} if version_id is not None else None
return self._exec_request("PATCH", url, headers, content, params, timeout=timeout)

View file

@ -164,9 +164,6 @@ class DockerHost(Host):
return volume_path return volume_path
def send_signal_to_service(self, service_name: str, signal: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_metabase(self, service_name: str) -> None: def delete_metabase(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker") raise NotImplementedError("Not implemented for docker")
@ -250,7 +247,6 @@ class DockerHost(Host):
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None, priority: Optional[str] = None,
word_count: bool = None,
) -> str: ) -> str:
client = self._get_docker_client() client = self._get_docker_client()
filtered_logs = "" filtered_logs = ""

View file

@ -117,17 +117,6 @@ class Host(ABC):
service_name: Name of the service to stop. service_name: Name of the service to stop.
""" """
@abstractmethod
def send_signal_to_service(self, service_name: str, signal: str) -> None:
"""Send signal to service with specified name using kill -<signal>
The service must be hosted on this host.
Args:
service_name: Name of the service to stop.
signal: signal name. See kill -l to all names
"""
@abstractmethod @abstractmethod
def mask_service(self, service_name: str) -> None: def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it. """Prevent the service from start by any activity by masking it.
@ -324,7 +313,6 @@ class Host(ABC):
unit: Optional[str] = None, unit: Optional[str] = None,
exclude_filter: Optional[str] = None, exclude_filter: Optional[str] = None,
priority: Optional[str] = None, priority: Optional[str] = None,
word_count: bool = None,
) -> str: ) -> str:
"""Get logs from host filtered by regex. """Get logs from host filtered by regex.
@ -335,7 +323,6 @@ class Host(ABC):
unit: required unit. unit: required unit.
priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher. priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher.
For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0. For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0.
word_count: output type, expected values: lines, bytes, json
Returns: Returns:
Found entries as str if any found. Found entries as str if any found.

View file

@ -53,4 +53,3 @@ HOSTING_CONFIG_FILE = os.getenv(
) )
MORE_LOG = os.getenv("MORE_LOG", "1") MORE_LOG = os.getenv("MORE_LOG", "1")
EXPIRATION_EPOCH_ATTRIBUTE = "__SYSTEM__EXPIRATION_EPOCH"

View file

@ -9,7 +9,6 @@ OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed"
SESSION_NOT_FOUND = "code = 4096.*message = session token not found" SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
OUT_OF_RANGE = "code = 2053.*message = out of range" OUT_OF_RANGE = "code = 2053.*message = out of range"
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token" EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
ADD_CHAIN_ERROR = "code = 5120 message = apemanager access denied"
# TODO: Change to codes with message # TODO: Change to codes with message
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked" # OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed # LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
@ -28,10 +27,6 @@ S3_BUCKET_DOES_NOT_ALLOW_ACL = "The bucket does not allow ACLs"
S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema." S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema."
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied" RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
# Errors from node missing reasons if request was forwarded. Commenting for now RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
# RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request"
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound" NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
# Errors from node missing reasons if request was forwarded. Commenting for now NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
# NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request"

View file

@ -16,10 +16,11 @@ OPTIONAL_NODE_UNDER_LOAD = os.getenv("OPTIONAL_NODE_UNDER_LOAD")
OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true")) OPTIONAL_FAILOVER_ENABLED = str_to_bool(os.getenv("OPTIONAL_FAILOVER_ENABLED", "true"))
# Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped. # Set this to True to disable background load. I.E. node which supposed to be stopped will not be actually stopped.
OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")) OPTIONAL_BACKGROUND_LOAD_ENABLED = str_to_bool(
os.getenv("OPTIONAL_BACKGROUND_LOAD_ENABLED", "true")
)
# Set this to False for disable autouse fixture like node healthcheck during developing time. # Set this to False for disable autouse fixture like node healthcheck during developing time.
OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")) OPTIONAL_AUTOUSE_FIXTURES_ENABLED = str_to_bool(
os.getenv("OPTIONAL_AUTOUSE_FIXTURES_ENABLED", "true")
# Use cache for fixtures with @cachec_fixture decorator )
OPTIONAL_CACHE_FIXTURES = str_to_bool(os.getenv("OPTIONAL_CACHE_FIXTURES", "false"))

View file

@ -0,0 +1,3 @@
from frostfs_testlib.s3.aws_cli_client import AwsCliClient
from frostfs_testlib.s3.boto3_client import Boto3ClientWrapper
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus

View file

@ -6,8 +6,8 @@ from time import sleep
from typing import Literal, Optional, Union from typing import Literal, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.shell import CommandOptions from frostfs_testlib.shell import CommandOptions
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.utils import string_utils from frostfs_testlib.utils import string_utils
@ -171,7 +171,7 @@ class AwsCliClient(S3ClientWrapper):
return response.get("TagSet") return response.get("TagSet")
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> dict: def get_bucket_acl(self, bucket: str) -> list:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
@ -179,7 +179,8 @@ class AwsCliClient(S3ClientWrapper):
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}" f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
) )
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
return self._to_json(output) response = self._to_json(output)
return response.get("Grants")
@reporter.step("Get bucket location") @reporter.step("Get bucket location")
def get_bucket_location(self, bucket: str) -> dict: def get_bucket_location(self, bucket: str) -> dict:
@ -195,20 +196,11 @@ class AwsCliClient(S3ClientWrapper):
return response.get("LocationConstraint") return response.get("LocationConstraint")
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects( def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'
cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} " cmd = f"aws {self.common_flags} s3api list-objects --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
if page_size:
cmd = cmd.replace("--no-paginate", "")
cmd += f" --page-size {page_size} "
if prefix:
cmd += f" --prefix {prefix}"
if self.profile:
cmd += f" --profile {self.profile} "
output = self.local_shell.exec(cmd).stdout output = self.local_shell.exec(cmd).stdout
response = self._to_json(output) response = self._to_json(output)
@ -860,7 +852,7 @@ class AwsCliClient(S3ClientWrapper):
return response["Parts"] return response["Parts"]
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
if bucket.startswith("-") or " " in bucket: if bucket.startswith("-") or " " in bucket:
bucket = f'"{bucket}"' bucket = f'"{bucket}"'

View file

@ -13,8 +13,8 @@ from botocore.exceptions import ClientError
from mypy_boto3_s3 import S3Client from mypy_boto3_s3 import S3Client
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.utils import string_utils from frostfs_testlib.utils import string_utils
# TODO: Refactor this code to use shell instead of _cmd_run # TODO: Refactor this code to use shell instead of _cmd_run
@ -86,7 +86,6 @@ class Boto3ClientWrapper(S3ClientWrapper):
service_name="iam", service_name="iam",
aws_access_key_id=self.access_key_id, aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key, aws_secret_access_key=self.secret_access_key,
region_name=self.region,
endpoint_url=self.iam_endpoint, endpoint_url=self.iam_endpoint,
verify=False, verify=False,
) )
@ -230,13 +229,14 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response.get("TagSet") return response.get("TagSet")
@reporter.step("Get bucket acl") @reporter.step("Get bucket acl")
def get_bucket_acl(self, bucket: str) -> dict: def get_bucket_acl(self, bucket: str) -> list:
return self._exec_request( response = self._exec_request(
self.boto3_client.get_bucket_acl, self.boto3_client.get_bucket_acl,
params={"Bucket": bucket}, params={"Bucket": bucket},
endpoint=self.s3gate_endpoint, endpoint=self.s3gate_endpoint,
profile=self.profile, profile=self.profile,
) )
return response.get("Grants")
@reporter.step("Delete bucket tagging") @reporter.step("Delete bucket tagging")
def delete_bucket_tagging(self, bucket: str) -> None: def delete_bucket_tagging(self, bucket: str) -> None:
@ -398,17 +398,10 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response if full_output else obj_list return response if full_output else obj_list
@reporter.step("List objects S3") @reporter.step("List objects S3")
def list_objects( def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
params = {"Bucket": bucket}
if page_size:
params["MaxKeys"] = page_size
if prefix:
params["Prefix"] = prefix
response = self._exec_request( response = self._exec_request(
self.boto3_client.list_objects, self.boto3_client.list_objects,
params, params={"Bucket": bucket},
endpoint=self.s3gate_endpoint, endpoint=self.s3gate_endpoint,
profile=self.profile, profile=self.profile,
) )
@ -704,7 +697,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
return response["Parts"] return response["Parts"]
@reporter.step("Complete multipart upload S3") @reporter.step("Complete multipart upload S3")
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts] parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
params = self._convert_to_s3_params(locals(), exclude=["parts"]) params = self._convert_to_s3_params(locals(), exclude=["parts"])
params["MultipartUpload"] = {"Parts": parts} params["MultipartUpload"] = {"Parts": parts}

View file

@ -1,7 +1,7 @@
import re import re
from frostfs_testlib.cli.generic_cli import GenericCli from frostfs_testlib.cli.generic_cli import GenericCli
from frostfs_testlib.clients.s3 import BucketContainerResolver from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode

View file

@ -128,7 +128,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Deletes the tags from the bucket.""" """Deletes the tags from the bucket."""
@abstractmethod @abstractmethod
def get_bucket_acl(self, bucket: str) -> dict: def get_bucket_acl(self, bucket: str) -> list:
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket.""" """This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
@abstractmethod @abstractmethod
@ -195,9 +195,7 @@ class S3ClientWrapper(HumanReadableABC):
""" """
@abstractmethod @abstractmethod
def list_objects( def list_objects(self, bucket: str, full_output: bool = False) -> Union[dict, list[str]]:
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
) -> Union[dict, list[str]]:
"""Returns some or all (up to 1,000) of the objects in a bucket with each request. """Returns some or all (up to 1,000) of the objects in a bucket with each request.
You can use the request parameters as selection criteria to return a subset of the objects in a bucket. You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
A 200 OK response can contain valid or invalid XML. Make sure to design your application A 200 OK response can contain valid or invalid XML. Make sure to design your application
@ -336,7 +334,7 @@ class S3ClientWrapper(HumanReadableABC):
"""Lists the parts that have been uploaded for a specific multipart upload.""" """Lists the parts that have been uploaded for a specific multipart upload."""
@abstractmethod @abstractmethod
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict: def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
"""Completes a multipart upload by assembling previously uploaded parts.""" """Completes a multipart upload by assembling previously uploaded parts."""
@abstractmethod @abstractmethod

View file

@ -7,7 +7,9 @@ from typing import Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
@ -109,8 +111,6 @@ def create_container(
options: Optional[dict] = None, options: Optional[dict] = None,
await_mode: bool = True, await_mode: bool = True,
wait_for_creation: bool = True, wait_for_creation: bool = True,
nns_zone: Optional[str] = None,
nns_name: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> str: ) -> str:
""" """
@ -143,8 +143,6 @@ def create_container(
result = cli.container.create( result = cli.container.create(
rpc_endpoint=endpoint, rpc_endpoint=endpoint,
policy=rule, policy=rule,
nns_name=nns_name,
nns_zone=nns_zone,
basic_acl=basic_acl, basic_acl=basic_acl,
attributes=attributes, attributes=attributes,
name=name, name=name,

View file

@ -12,7 +12,6 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC,
from frostfs_testlib.resources.common import ASSETS_DIR from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import wait_for_success from frostfs_testlib.testing import wait_for_success
from frostfs_testlib.utils import json_utils from frostfs_testlib.utils import json_utils
@ -753,10 +752,7 @@ def get_object_nodes(
] ]
object_nodes = [ object_nodes = [
cluster_node cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip
for netmap_node in netmap_nodes
for cluster_node in cluster.cluster_nodes
if netmap_node.node == cluster_node.get_interface(Interfaces.MGMT)
] ]
return object_nodes return object_nodes

View file

@ -12,8 +12,8 @@ import requests
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli import GenericCli from frostfs_testlib.cli import GenericCli
from frostfs_testlib.clients.s3.aws_cli_client import command_options
from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE
from frostfs_testlib.s3.aws_cli_client import command_options
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.shell.local_shell import LocalShell from frostfs_testlib.shell.local_shell import LocalShell
from frostfs_testlib.steps.cli.object import get_object from frostfs_testlib.steps.cli.object import get_object
@ -38,34 +38,34 @@ def get_via_http_gate(
""" """
This function gets given object from HTTP gate This function gets given object from HTTP gate
cid: container id to get object from cid: container id to get object from
oid: object id / object key oid: object ID
node: node to make request node: node to make request
request_path: (optional) http request, if ommited - use default [{endpoint}/get/{cid}/{oid}] request_path: (optional) http request, if ommited - use default [{endpoint}/get/{cid}/{oid}]
""" """
request = f"{node.http_gate.get_endpoint()}/get/{cid}/{oid}" # if `request_path` parameter omitted, use default
if request_path: if request_path is None:
request = f"{node.http_gate.get_endpoint()}/get/{cid}/{oid}"
else:
request = f"{node.http_gate.get_endpoint()}{request_path}" request = f"{node.http_gate.get_endpoint()}{request_path}"
response = requests.get(request, stream=True, timeout=timeout, verify=False) resp = requests.get(request, stream=True, timeout=timeout, verify=False)
if not response.ok: if not resp.ok:
raise Exception( raise Exception(
f"""Failed to get object via HTTP gate: f"""Failed to get object via HTTP gate:
request: {response.request.path_url}, request: {resp.request.path_url},
response: {response.text}, response: {resp.text},
headers: {response.headers}, headers: {resp.headers},
status code: {response.status_code} {response.reason}""" status code: {resp.status_code} {resp.reason}"""
) )
logger.info(f"Request: {request}") logger.info(f"Request: {request}")
_attach_allure_step(request, response.status_code) _attach_allure_step(request, resp.status_code)
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}")) test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}"))
with open(test_file, "wb") as file: with open(test_file, "wb") as file:
for chunk in response.iter_content(chunk_size=8192): shutil.copyfileobj(resp.raw, file)
file.write(chunk)
return test_file return test_file
@ -117,12 +117,12 @@ def get_via_http_gate_by_attribute(
endpoint: http gate endpoint endpoint: http gate endpoint
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}] request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
""" """
attr_name = list(attribute.keys())[0] attr_name = list(attribute.keys())[0]
attr_value = quote_plus(str(attribute.get(attr_name))) attr_value = quote_plus(str(attribute.get(attr_name)))
# if `request_path` parameter ommited, use default
request = f"{node.http_gate.get_endpoint()}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}" if request_path is None:
if request_path: request = f"{node.http_gate.get_endpoint()}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}"
else:
request = f"{node.http_gate.get_endpoint()}{request_path}" request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(request, stream=True, timeout=timeout, verify=False) resp = requests.get(request, stream=True, timeout=timeout, verify=False)
@ -357,9 +357,19 @@ def try_to_get_object_via_passed_request_and_expect_error(
) -> None: ) -> None:
try: try:
if attrs is None: if attrs is None:
get_via_http_gate(cid, oid, node, http_request_path) get_via_http_gate(
cid=cid,
oid=oid,
node=node,
request_path=http_request_path,
)
else: else:
get_via_http_gate_by_attribute(cid, attrs, node, http_request_path) get_via_http_gate_by_attribute(
cid=cid,
attribute=attrs,
node=node,
request_path=http_request_path,
)
raise AssertionError(f"Expected error on getting object with cid: {cid}") raise AssertionError(f"Expected error on getting object with cid: {cid}")
except Exception as err: except Exception as err:
match = error_pattern.casefold() in str(err).casefold() match = error_pattern.casefold() in str(err).casefold()

View file

@ -4,18 +4,16 @@ from frostfs_testlib.storage.cluster import ClusterNode
class IpHelper: class IpHelper:
@staticmethod @staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[tuple]) -> None: def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
for ip, table in block_ip: for ip in block_ip:
if not table: shell.exec(f"ip route add blackhole {ip}")
shell.exec(f"ip r a blackhole {ip}")
continue
shell.exec(f"ip r a blackhole {ip} table {table}")
@staticmethod @staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None: def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell() shell = node.host.get_shell()
unlock_ip = shell.exec("ip r l table all | grep blackhole", CommandOptions(check=False)).stdout unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False))
if unlock_ip.return_code != 0:
for active_blackhole in unlock_ip.strip().split("\n"): return
shell.exec(f"ip r d {active_blackhole}") for ip in unlock_ip.stdout.strip().split("\n"):
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")

View file

@ -6,7 +6,8 @@ from typing import Optional
from dateutil.parser import parse from dateutil.parser import parse
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.clients.s3 import BucketContainerResolver, S3ClientWrapper, VersioningStatus from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import search_nodes_with_container from frostfs_testlib.steps.cli.container import search_nodes_with_container
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
@ -184,26 +185,3 @@ def search_nodes_with_bucket(
break break
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster) nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
return nodes_list return nodes_list
def get_bytes_relative_to_object(value: int | str, object_size: int = None, part_size: int = None) -> int:
if isinstance(value, int):
return value
if "part" not in value and "object" not in value:
return int(value)
if object_size is not None:
value = value.replace("object", str(object_size))
if part_size is not None:
value = value.replace("part", str(part_size))
return int(eval(value))
def get_range_relative_to_object(rng: str, object_size: int = None, part_size: int = None, int_values: bool = False) -> str | int:
start, end = rng.split(":")
start = get_bytes_relative_to_object(start, object_size, part_size)
end = get_bytes_relative_to_object(end, object_size, part_size)
return (start, end) if int_values else f"bytes {start}-{end}/*"

View file

@ -11,10 +11,10 @@ from frostfs_testlib.storage import get_service_registry
from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml from frostfs_testlib.storage.configuration.interfaces import ServiceConfigurationYml
from frostfs_testlib.storage.constants import ConfigAttributes from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.metrics import Metrics
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry from frostfs_testlib.storage.service_registry import ServiceRegistry
from frostfs_testlib.storage.dataclasses.metrics import Metrics
class ClusterNode: class ClusterNode:

View file

@ -23,6 +23,4 @@ class PlacementRule:
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X" DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X" SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X" REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
REP_1_FOR_2_NODES_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 2 FROM * AS X"
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1" DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
EC_1_1_FOR_2_NODES_PLACEMENT_RULE = "EC 1.1 IN X CBF 1 SELECT 2 FROM * AS X"

View file

@ -1,5 +1,4 @@
import datetime import datetime
import itertools
import logging import logging
import time import time
from typing import TypeVar from typing import TypeVar
@ -19,7 +18,7 @@ from frostfs_testlib.steps.node_management import include_node_to_network_map, r
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeStatus from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for_success from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for_success
@ -40,7 +39,7 @@ class ClusterStateController:
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None: def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
self.stopped_nodes: list[ClusterNode] = [] self.stopped_nodes: list[ClusterNode] = []
self.detached_disks: dict[str, DiskController] = {} self.detached_disks: dict[str, DiskController] = {}
self.dropped_traffic: set[ClusterNode] = set() self.dropped_traffic: list[ClusterNode] = []
self.excluded_from_netmap: list[StorageNode] = [] self.excluded_from_netmap: list[StorageNode] = []
self.stopped_services: set[NodeBase] = set() self.stopped_services: set[NodeBase] = set()
self.cluster = cluster self.cluster = cluster
@ -173,15 +172,6 @@ class ClusterStateController:
if service_type == StorageNode: if service_type == StorageNode:
self.wait_after_storage_startup() self.wait_after_storage_startup()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to all {service_type} services")
def sighup_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.send_signal_to_service for service in services], signal="SIGHUP")
if service_type == StorageNode:
self.wait_after_storage_startup()
@wait_for_success(600, 60) @wait_for_success(600, 60)
def wait_s3gate(self, s3gate: S3Gate): def wait_s3gate(self, s3gate: S3Gate):
with reporter.step(f"Wait for {s3gate} reconnection"): with reporter.step(f"Wait for {s3gate} reconnection"):
@ -216,27 +206,21 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Stop {service_type} service on {node}") @reporter.step("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: ServiceClass, mask: bool = True): def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True):
service = node.service(service_type) service = node.service(service_type)
service.stop_service(mask) service.stop_service(mask)
self.stopped_services.add(service) self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Send sighup to {service_type} service on {node}")
def sighup_service_of_type(self, node: ClusterNode, service_type: ServiceClass):
service = node.service(service_type)
service.send_signal_to_service("SIGHUP")
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start {service_type} service on {node}") @reporter.step("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: ServiceClass): def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type) service = node.service(service_type)
service.start_service() service.start_service()
self.stopped_services.discard(service) self.stopped_services.discard(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Start all stopped {service_type} services") @reporter.step("Start all stopped {service_type} services")
def start_stopped_services_of_type(self, service_type: ServiceClass): def start_stopped_services_of_type(self, service_type: type[ServiceClass]):
stopped_svc = self._get_stopped_by_type(service_type) stopped_svc = self._get_stopped_by_type(service_type)
if not stopped_svc: if not stopped_svc:
return return
@ -326,22 +310,22 @@ class ClusterStateController:
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}") @reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None: def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
interfaces_tables = self._parse_interfaces(block_nodes, name_interface) list_ip = self._parse_interfaces(block_nodes, name_interface)
IpHelper.drop_input_traffic_to_node(node, interfaces_tables) IpHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout) time.sleep(wakeup_timeout)
self.dropped_traffic.add(node) self.dropped_traffic.append(node)
@reporter.step("Start traffic to {node}") @reporter.step("Start traffic to {node}")
def restore_traffic(self, node: ClusterNode) -> None: def restore_traffic(self, node: ClusterNode) -> None:
IpHelper.restore_input_traffic_to_node(node=node) IpHelper.restore_input_traffic_to_node(node=node)
self.dropped_traffic.discard(node) index = self.dropped_traffic.index(node)
self.dropped_traffic.pop(index)
@reporter.step("Restore blocked nodes") @reporter.step("Restore blocked nodes")
def restore_all_traffic(self): def restore_all_traffic(self):
if not self.dropped_traffic: if not self.dropped_traffic:
return return
parallel(self._restore_traffic_to_node, self.dropped_traffic) parallel(self._restore_traffic_to_node, self.dropped_traffic)
self.dropped_traffic.clear()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step("Hard reboot host {node} via magic SysRq option") @reporter.step("Hard reboot host {node} via magic SysRq option")
@ -454,11 +438,9 @@ class ClusterStateController:
if not checker_node: if not checker_node:
checker_node = cluster_node checker_node = cluster_node
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout) netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
netmap = [node for node in netmap if cluster_node.get_interface(Interfaces.MGMT) == node.node] netmap = [node for node in netmap if cluster_node.host_ip == node.node]
if status == NodeStatus.OFFLINE: if status == NodeStatus.OFFLINE:
assert ( assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
cluster_node.get_interface(Interfaces.MGMT) not in netmap
), f"{cluster_node.get_interface(Interfaces.MGMT)} not in Offline"
else: else:
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'" assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
@ -519,31 +501,17 @@ class ClusterStateController:
return disk_controller return disk_controller
@reporter.step("Restore traffic {node}")
def _restore_traffic_to_node(self, node): def _restore_traffic_to_node(self, node):
IpHelper.restore_input_traffic_to_node(node) IpHelper.restore_input_traffic_to_node(node)
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str) -> list[tuple]: def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str):
interfaces_and_tables = set() interfaces = []
for node in nodes: for node in nodes:
shell = node.host.get_shell() dict_interfaces = node.host.config.interfaces
lines = shell.exec(f"ip r l table all | grep '{name_interface}'").stdout.splitlines() for type, ip in dict_interfaces.items():
if name_interface in type:
ips = [] interfaces.append(ip)
tables = [] return interfaces
for line in lines:
if "src" not in line or "table local" in line:
continue
parts = line.split()
ips.append(parts[-1])
if "table" in line:
tables.append(parts[parts.index("table") + 1])
tables.append(None)
[interfaces_and_tables.add((ip, table)) for ip, table in itertools.product(ips, tables)]
return interfaces_and_tables
@reporter.step("Ping node") @reporter.step("Ping node")
def _ping_host(self, node: ClusterNode): def _ping_host(self, node: ClusterNode):

View file

@ -14,19 +14,14 @@ class ConfigStateManager(StateManager):
self.cluster = self.csc.cluster self.cluster = self.csc.cluster
@reporter.step("Change configuration for {service_type} on all nodes") @reporter.step("Change configuration for {service_type} on all nodes")
def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any], sighup: bool = False): def set_on_all_nodes(self, service_type: type[ServiceClass], values: dict[str, Any]):
services = self.cluster.services(service_type) services = self.cluster.services(service_type)
nodes = self.cluster.nodes(services) nodes = self.cluster.nodes(services)
self.services_with_changed_config.update([(node, service_type) for node in nodes]) self.services_with_changed_config.update([(node, service_type) for node in nodes])
if not sighup: self.csc.stop_services_of_type(service_type)
self.csc.stop_services_of_type(service_type)
parallel([node.config(service_type).set for node in nodes], values=values) parallel([node.config(service_type).set for node in nodes], values=values)
if not sighup: self.csc.start_services_of_type(service_type)
self.csc.start_services_of_type(service_type)
else:
self.csc.sighup_services_of_type(service_type)
@reporter.step("Change configuration for {service_type} on {node}") @reporter.step("Change configuration for {service_type} on {node}")
def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]): def set_on_node(self, node: ClusterNode, service_type: type[ServiceClass], values: dict[str, Any]):
@ -37,26 +32,18 @@ class ConfigStateManager(StateManager):
self.csc.start_service_of_type(node, service_type) self.csc.start_service_of_type(node, service_type)
@reporter.step("Revert all configuration changes") @reporter.step("Revert all configuration changes")
def revert_all(self, sighup: bool = False): def revert_all(self):
if not self.services_with_changed_config: if not self.services_with_changed_config:
return return
parallel(self._revert_svc, self.services_with_changed_config, sighup) parallel(self._revert_svc, self.services_with_changed_config)
self.services_with_changed_config.clear() self.services_with_changed_config.clear()
if not sighup: self.csc.start_all_stopped_services()
self.csc.start_all_stopped_services()
# TODO: parallel can't have multiple parallel_items :( # TODO: parallel can't have multiple parallel_items :(
@reporter.step("Revert all configuration {node_and_service}") @reporter.step("Revert all configuration {node_and_service}")
def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass], sighup: bool = False): def _revert_svc(self, node_and_service: tuple[ClusterNode, ServiceClass]):
node, service_type = node_and_service node, service_type = node_and_service
service = node.service(service_type) self.csc.stop_service_of_type(node, service_type)
if not sighup:
self.csc.stop_service_of_type(node, service_type)
node.config(service_type).revert() node.config(service_type).revert()
if sighup:
service.send_signal_to_service("SIGHUP")

View file

@ -13,7 +13,6 @@ FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
class ObjectOperations(HumanReadableEnum): class ObjectOperations(HumanReadableEnum):
PUT = "object.put" PUT = "object.put"
PATCH = "object.patch"
GET = "object.get" GET = "object.get"
HEAD = "object.head" HEAD = "object.head"
GET_RANGE = "object.range" GET_RANGE = "object.range"
@ -27,18 +26,6 @@ class ObjectOperations(HumanReadableEnum):
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL] return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
class ContainerOperations(HumanReadableEnum):
PUT = "container.put"
GET = "container.get"
LIST = "container.list"
DELETE = "container.delete"
WILDCARD_ALL = "container.*"
@staticmethod
def get_all():
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
@dataclass @dataclass
class Operations: class Operations:
GET_CONTAINER = "GetContainer" GET_CONTAINER = "GetContainer"
@ -52,7 +39,6 @@ class Operations:
SEARCH_OBJECT = "SearchObject" SEARCH_OBJECT = "SearchObject"
HEAD_OBJECT = "HeadObject" HEAD_OBJECT = "HeadObject"
PUT_OBJECT = "PutObject" PUT_OBJECT = "PutObject"
PATCH_OBJECT = "PatchObject"
class Verb(HumanReadableEnum): class Verb(HumanReadableEnum):
@ -138,7 +124,7 @@ class Rule:
if not operations: if not operations:
self.operations = [] self.operations = []
elif isinstance(operations, (ObjectOperations, ContainerOperations)): elif isinstance(operations, ObjectOperations):
self.operations = [operations] self.operations = [operations]
else: else:
self.operations = operations self.operations = operations

View file

@ -65,10 +65,6 @@ class NodeBase(HumanReadableABC):
with reporter.step(f"Start {self.name} service on {self.host.config.address}"): with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name) self.host.start_service(self.name)
def send_signal_to_service(self, signal: str):
with reporter.step(f"Send -{signal} signal to {self.name} service on {self.host.config.address}"):
self.host.send_signal_to_service(self.name, signal)
@abstractmethod @abstractmethod
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
@ -189,7 +185,9 @@ class NodeBase(HumanReadableABC):
if attribute_name not in config.attributes: if attribute_name not in config.attributes:
if default_attribute_name is None: if default_attribute_name is None:
raise RuntimeError(f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either") raise RuntimeError(
f"Service {self.name} has no {attribute_name} in config and fallback attribute isn't set either"
)
return config.attributes[default_attribute_name] return config.attributes[default_attribute_name]
@ -199,7 +197,9 @@ class NodeBase(HumanReadableABC):
return self.host.get_service_config(self.name) return self.host.get_service_config(self.name)
def get_service_uptime(self, service: str) -> datetime: def get_service_uptime(self, service: str) -> datetime:
result = self.host.get_shell().exec(f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2") result = self.host.get_shell().exec(
f"systemctl show {service} --property ActiveEnterTimestamp | cut -d '=' -f 2"
)
start_time = parser.parse(result.stdout.strip()) start_time = parser.parse(result.stdout.strip())
current_time = datetime.now(tz=timezone.utc) current_time = datetime.now(tz=timezone.utc)
active_time = current_time - start_time active_time = current_time - start_time

View file

@ -6,7 +6,7 @@ from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, Interfaces, NodeNetmapInfo from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output from frostfs_testlib.utils.cli_utils import parse_netmap_output
@ -30,7 +30,7 @@ class ChunksOperations(interfaces.ChunksInterface):
result = [] result = []
for node_info in netmap: for node_info in netmap:
for cluster_node in cluster.cluster_nodes: for cluster_node in cluster.cluster_nodes:
if node_info.node == cluster_node.get_interface(Interfaces.MGMT): if node_info.node == cluster_node.host_ip:
result.append(cluster_node) result.append(cluster_node)
return result return result
@ -40,7 +40,7 @@ class ChunksOperations(interfaces.ChunksInterface):
for node_info in netmap: for node_info in netmap:
if node_info.node_id in chunk.confirmed_nodes: if node_info.node_id in chunk.confirmed_nodes:
for cluster_node in cluster.cluster_nodes: for cluster_node in cluster.cluster_nodes:
if cluster_node.get_interface(Interfaces.MGMT) == node_info.node: if cluster_node.host_ip == node_info.node:
return (cluster_node, node_info) return (cluster_node, node_info)
@wait_for_success(300, 5, fail_testcase=None) @wait_for_success(300, 5, fail_testcase=None)

View file

@ -5,9 +5,9 @@ from typing import List, Optional, Union
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.clients.s3 import BucketContainerResolver
from frostfs_testlib.plugins import load_plugin from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.s3.interfaces import BucketContainerResolver
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.utils import json_utils from frostfs_testlib.utils import json_utils
@ -181,17 +181,20 @@ class ContainerOperations(interfaces.ContainerInterface):
force: bool = False, force: bool = False,
trace: bool = False, trace: bool = False,
): ):
return self.cli.container.delete( try:
rpc_endpoint=endpoint, return self.cli.container.delete(
cid=cid, rpc_endpoint=endpoint,
address=address, cid=cid,
await_mode=await_mode, address=address,
session=session, await_mode=await_mode,
ttl=ttl, session=session,
xhdr=xhdr, ttl=ttl,
force=force, xhdr=xhdr,
trace=trace, force=force,
).stdout trace=trace,
).stdout
except RuntimeError as e:
print(f"Error request:\n{e}")
@reporter.step("Get container") @reporter.step("Get container")
def get( def get(

View file

@ -11,7 +11,6 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.common import ASSETS_DIR from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.shell.interfaces import CommandResult
from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations import interfaces
from frostfs_testlib.storage.grpc_operations.implementations.chunks import ChunksOperations from frostfs_testlib.storage.grpc_operations.implementations.chunks import ChunksOperations
from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.testing.test_control import wait_for_success
@ -207,11 +206,6 @@ class ObjectOperations(interfaces.ObjectInterface):
hash_type=hash_type, hash_type=hash_type,
timeout=timeout, timeout=timeout,
) )
if range:
# Cut off the range and return only hash
return result.stdout.split(":")[1].strip()
return result.stdout return result.stdout
@reporter.step("Head object") @reporter.step("Head object")
@ -413,57 +407,6 @@ class ObjectOperations(interfaces.ObjectInterface):
oid = id_str.split(":")[1] oid = id_str.split(":")[1]
return oid.strip() return oid.strip()
@reporter.step("Patch object")
def patch(
self,
cid: str,
oid: str,
endpoint: str,
ranges: list[str] = None,
payloads: list[str] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
bearer: str = "",
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
trace: bool = False,
) -> str:
"""
PATCH an object.
Args:
cid: ID of Container where we get the Object from
oid: Object ID
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
ranges: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
payloads: An array of file paths to be applied in each range
new_attrs: Attributes to be changed in the format "key1=value1,key2=value2"
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
bearer: Path to Bearer Token file, appends to `--bearer` key
xhdr: Request X-Headers in form of Key=Value
session: Path to a JSON-encoded container session token
timeout: Timeout for the operation
trace: Generate trace ID and print it
Returns:
(str): ID of patched Object
"""
result = self.cli.object.patch(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
range=ranges,
payload=payloads,
new_attrs=new_attrs,
replace_attrs=replace_attrs,
bearer=bearer,
xhdr=xhdr,
session=session,
timeout=timeout,
trace=trace,
)
return result.stdout.split(":")[1].strip()
@reporter.step("Put object to random node") @reporter.step("Put object to random node")
def put_to_random_node( def put_to_random_node(
self, self,
@ -675,34 +618,7 @@ class ObjectOperations(interfaces.ObjectInterface):
cluster_node cluster_node
for netmap_node in netmap_nodes for netmap_node in netmap_nodes
for cluster_node in cluster.cluster_nodes for cluster_node in cluster.cluster_nodes
if netmap_node.node == cluster_node.get_interface(Interfaces.MGMT) if netmap_node.node == cluster_node.host_ip
] ]
return object_nodes return object_nodes
@reporter.step("Search parts of object")
def parts(
self,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[str]:
endpoint = alive_node.storage_node.get_rpc_endpoint()
response = self.cli.object.nodes(
rpc_endpoint=endpoint,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
json=True,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
response_json = json.loads(response.stdout)
return [data_object["object_id"] for data_object in response_json["data_objects"]]

View file

@ -198,24 +198,6 @@ class ObjectInterface(ABC):
) -> str: ) -> str:
pass pass
@abstractmethod
def patch(
self,
cid: str,
oid: str,
endpoint: str,
ranges: Optional[list[str]] = None,
payloads: Optional[list[str]] = None,
new_attrs: Optional[str] = None,
replace_attrs: bool = False,
bearer: Optional[str] = None,
xhdr: Optional[dict] = None,
session: Optional[str] = None,
timeout: Optional[str] = None,
trace: bool = False,
) -> str:
pass
@abstractmethod @abstractmethod
def put_to_random_node( def put_to_random_node(
self, self,
@ -282,20 +264,6 @@ class ObjectInterface(ABC):
) -> List[ClusterNode]: ) -> List[ClusterNode]:
pass pass
@abstractmethod
def parts(
self,
cid: str,
oid: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
timeout: Optional[str] = None,
) -> List[str]:
pass
class ContainerInterface(ABC): class ContainerInterface(ABC):
@abstractmethod @abstractmethod

View file

@ -1,5 +1,4 @@
import itertools import itertools
import traceback
from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager from contextlib import contextmanager
from typing import Callable, Collection, Optional, Union from typing import Callable, Collection, Optional, Union
@ -56,42 +55,7 @@ def parallel(
# Check for exceptions # Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()] exceptions = [future.exception() for future in futures if future.exception()]
if exceptions: if exceptions:
# Prettify exception in parallel with all underlying stack traces message = "\n".join([str(e) for e in exceptions])
# For example, we had 3 RuntimeError exceptions during parallel. This format will give us something like
#
# RuntimeError: The following exceptions occured during parallel run:
# 1) Exception one text
# 2) Exception two text
# 3) Exception three text
# TRACES:
# ==== 1 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception one text")
# RuntimeError: Exception one text
#
# ==== 2 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception two text")
# RuntimeError: Exception two text
#
# ==== 3 ====
# Traceback (most recent call last):
# File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
# result = self.fn(*self.args, **self.kwargs)
# File "frostfs_testcases/pytest_tests/testsuites/object/test_object_tombstone.py", line 17, in check_service
# raise RuntimeError(f"Exception three text")
# RuntimeError: Exception three text
short_summary = "\n".join([f"{i}) {str(e)}" for i, e in enumerate(exceptions, 1)])
stack_traces = "\n".join(
[f"==== {i} ====\n{''.join(traceback.TracebackException.from_exception(e).format())}" for i, e in enumerate(exceptions, 1)]
)
message = f"{short_summary}\nTRACES:\n{stack_traces}"
raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}") raise RuntimeError(f"The following exceptions occured during parallel run:\n{message}")
return futures return futures

View file

@ -1,16 +1,13 @@
import inspect import inspect
import logging import logging
import os
from functools import wraps from functools import wraps
from time import sleep, time from time import sleep, time
from typing import Any from typing import Any
import yaml
from _pytest.outcomes import Failed from _pytest.outcomes import Failed
from pytest import fail from pytest import fail
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.utils.func_utils import format_by_args from frostfs_testlib.utils.func_utils import format_by_args
logger = logging.getLogger("NeoLogger") logger = logging.getLogger("NeoLogger")
@ -131,42 +128,6 @@ def run_optionally(enabled: bool, mock_value: Any = True):
return deco return deco
def cached_fixture(enabled: bool):
"""
Decorator to cache fixtures.
MUST be placed after @pytest.fixture and before @allure decorators.
Args:
enabled: if true, decorated func will be cached.
"""
def deco(func):
@wraps(func)
def func_impl(*a, **kw):
# TODO: *a and *kw should be parsed to some kind of hashsum and used in filename to prevent cache load from different parameters
cache_file = os.path.join(ASSETS_DIR, f"fixture_cache_{func.__name__}.yml")
if enabled and os.path.exists(cache_file):
with open(cache_file, "r") as cache_input:
return yaml.load(cache_input, Loader=yaml.Loader)
result = func(*a, **kw)
if enabled:
with open(cache_file, "w") as cache_output:
yaml.dump(result, cache_output)
return result
# TODO: cache yielding fixtures
@wraps(func)
def gen_impl(*a, **kw):
raise NotImplementedError("Not implemented for yielding fixtures")
return gen_impl if inspect.isgeneratorfunction(func) else func_impl
return deco
def wait_for_success( def wait_for_success(
max_wait_time: int = 60, max_wait_time: int = 60,
interval: int = 1, interval: int = 1,

View file

@ -80,9 +80,6 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d
if not params: if not params:
params = {} params = {}
if params.get("Body") and len(params.get("Body")) > 1000:
params["Body"] = "<large text data>"
output_params = params output_params = params
try: try:

View file

@ -2,7 +2,7 @@ from typing import Any
import pytest import pytest
from frostfs_testlib.clients import AwsCliClient, Boto3ClientWrapper from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper
from frostfs_testlib.storage.dataclasses.acl import EACLRole from frostfs_testlib.storage.dataclasses.acl import EACLRole
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.object_size import ObjectSize