Compare commits
1 commit
master
...
feature--1
Author | SHA1 | Date | |
---|---|---|---|
b2a33656e2 |
35 changed files with 204 additions and 653 deletions
|
@ -1,3 +1 @@
|
||||||
.* @TrueCloudLab/qa-committers
|
* @JuliaKovshova @abereziny @d.zayakin @anikeev-yadro @anurindm @ylukoyan @i.niyazov
|
||||||
.forgejo/.* @potyarkin
|
|
||||||
Makefile @potyarkin
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ authmate = "frostfs_testlib.credentials.authmate_s3_provider:AuthmateS3Credentia
|
||||||
wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider"
|
wallet_factory = "frostfs_testlib.credentials.wallet_factory_provider:WalletFactoryProvider"
|
||||||
|
|
||||||
[project.entry-points."frostfs.testlib.bucket_cid_resolver"]
|
[project.entry-points."frostfs.testlib.bucket_cid_resolver"]
|
||||||
frostfs = "frostfs_testlib.clients.s3.curl_bucket_resolver:CurlBucketContainerResolver"
|
frostfs = "frostfs_testlib.s3.curl_bucket_resolver:CurlBucketContainerResolver"
|
||||||
|
|
||||||
[tool.isort]
|
[tool.isort]
|
||||||
profile = "black"
|
profile = "black"
|
||||||
|
|
|
@ -463,26 +463,3 @@ class FrostfsAdmMorph(CliCommand):
|
||||||
"morph ape rm-rule-chain",
|
"morph ape rm-rule-chain",
|
||||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_nns_records(
|
|
||||||
self,
|
|
||||||
name: str,
|
|
||||||
type: Optional[str] = None,
|
|
||||||
rpc_endpoint: Optional[str] = None,
|
|
||||||
alphabet_wallets: Optional[str] = None,
|
|
||||||
) -> CommandResult:
|
|
||||||
"""Returns domain record of the specified type
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: Domain name
|
|
||||||
type: Domain name service record type(A|CNAME|SOA|TXT)
|
|
||||||
rpc_endpoint: N3 RPC node endpoint
|
|
||||||
alphabet_wallets: path to alphabet wallets dir
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Command's result
|
|
||||||
"""
|
|
||||||
return self._execute(
|
|
||||||
"morph nns get-records",
|
|
||||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
|
||||||
)
|
|
||||||
|
|
|
@ -276,54 +276,6 @@ class FrostfsCliObject(CliCommand):
|
||||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
def patch(
|
|
||||||
self,
|
|
||||||
rpc_endpoint: str,
|
|
||||||
cid: str,
|
|
||||||
oid: str,
|
|
||||||
range: list[str] = None,
|
|
||||||
payload: list[str] = None,
|
|
||||||
new_attrs: Optional[str] = None,
|
|
||||||
replace_attrs: bool = False,
|
|
||||||
address: Optional[str] = None,
|
|
||||||
bearer: Optional[str] = None,
|
|
||||||
generate_key: Optional[bool] = None,
|
|
||||||
session: Optional[str] = None,
|
|
||||||
timeout: Optional[str] = None,
|
|
||||||
trace: bool = False,
|
|
||||||
ttl: Optional[int] = None,
|
|
||||||
wallet: Optional[str] = None,
|
|
||||||
xhdr: Optional[dict] = None,
|
|
||||||
) -> CommandResult:
|
|
||||||
"""
|
|
||||||
PATCH an object.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>')
|
|
||||||
cid: Container ID
|
|
||||||
oid: Object ID
|
|
||||||
range: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
|
|
||||||
payload: An array of file paths to be applied in each range
|
|
||||||
new_attrs: Attributes to be changed in the format Key1=Value1,Key2=Value2
|
|
||||||
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
|
|
||||||
address: Address of wallet account
|
|
||||||
bearer: File with signed JSON or binary encoded bearer token
|
|
||||||
generate_key: Generate new private key
|
|
||||||
session: Filepath to a JSON- or binary-encoded token of the object RANGE session
|
|
||||||
timeout: Timeout for the operation
|
|
||||||
trace: Generate trace ID and print it
|
|
||||||
ttl: TTL value in request meta header (default 2)
|
|
||||||
wallet: WIF (NEP-2) string or path to the wallet or binary key
|
|
||||||
xhdr: Dict with request X-Headers
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Command's result.
|
|
||||||
"""
|
|
||||||
return self._execute(
|
|
||||||
"object patch",
|
|
||||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
|
||||||
)
|
|
||||||
|
|
||||||
def range(
|
def range(
|
||||||
self,
|
self,
|
||||||
rpc_endpoint: str,
|
rpc_endpoint: str,
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeNetInfo, NodeNetmapInfo, NodeStatus
|
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo, NodeStatus
|
||||||
|
|
||||||
|
|
||||||
class NetmapParser:
|
class NetmapParser:
|
||||||
|
@ -85,7 +85,7 @@ class NetmapParser:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None:
|
def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None:
|
||||||
snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output)
|
snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output)
|
||||||
snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.get_interface(Interfaces.MGMT)]
|
snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.host_ip]
|
||||||
if not snapshot_node:
|
if not snapshot_node:
|
||||||
return None
|
return None
|
||||||
return snapshot_node[0]
|
return snapshot_node[0]
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
from frostfs_testlib.clients.http.http_client import HttpClient
|
|
||||||
from frostfs_testlib.clients.s3.aws_cli_client import AwsCliClient
|
|
||||||
from frostfs_testlib.clients.s3.boto3_client import Boto3ClientWrapper
|
|
||||||
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper
|
|
||||||
from frostfs_testlib.clients.s3.s3_http_client import S3HttpClient
|
|
|
@ -1,144 +0,0 @@
|
||||||
import io
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
from typing import Mapping, Sequence
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
|
||||||
|
|
||||||
timeout = httpx.Timeout(60, read=150)
|
|
||||||
LOGGING_CONFIG = {
|
|
||||||
"disable_existing_loggers": False,
|
|
||||||
"version": 1,
|
|
||||||
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
|
|
||||||
"formatters": {
|
|
||||||
"http": {
|
|
||||||
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
|
|
||||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"loggers": {
|
|
||||||
"httpx": {
|
|
||||||
"handlers": ["default"],
|
|
||||||
"level": "DEBUG",
|
|
||||||
},
|
|
||||||
"httpcore": {
|
|
||||||
"handlers": ["default"],
|
|
||||||
"level": "ERROR",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
logging.config.dictConfig(LOGGING_CONFIG)
|
|
||||||
logger = logging.getLogger("NeoLogger")
|
|
||||||
|
|
||||||
|
|
||||||
class HttpClient:
|
|
||||||
@reporter.step("Send {method} request to {url}")
|
|
||||||
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
|
|
||||||
transport = httpx.HTTPTransport(verify=False, retries=5)
|
|
||||||
client = httpx.Client(timeout=timeout, transport=transport)
|
|
||||||
response = client.request(method, url, **kwargs)
|
|
||||||
|
|
||||||
self._attach_response(response, **kwargs)
|
|
||||||
logger.info(f"Response: {response.status_code} => {response.text}")
|
|
||||||
|
|
||||||
if expected_status_code:
|
|
||||||
assert (
|
|
||||||
response.status_code == expected_status_code
|
|
||||||
), f"Got {response.status_code} response code while {expected_status_code} expected"
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None:
|
|
||||||
try:
|
|
||||||
content = readable.read()
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Unable to read file: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not content:
|
|
||||||
return None
|
|
||||||
|
|
||||||
request_body = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
request_body = json.loads(content)
|
|
||||||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
||||||
logger.warning(f"Unable to convert body to json: {str(e)}")
|
|
||||||
|
|
||||||
if request_body is not None:
|
|
||||||
return json.dumps(request_body, default=str, indent=4)
|
|
||||||
|
|
||||||
try:
|
|
||||||
request_body = content.decode()
|
|
||||||
except UnicodeDecodeError as e:
|
|
||||||
logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}")
|
|
||||||
|
|
||||||
request_body = content if request_body is None else request_body
|
|
||||||
request_body = "<large text data>" if len(request_body) > 1000 else request_body
|
|
||||||
|
|
||||||
return request_body
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _parse_files(cls, files: Mapping | Sequence | None) -> dict:
|
|
||||||
filepaths = {}
|
|
||||||
|
|
||||||
if not files:
|
|
||||||
return filepaths
|
|
||||||
|
|
||||||
if isinstance(files, Sequence):
|
|
||||||
items = files
|
|
||||||
elif isinstance(files, Mapping):
|
|
||||||
items = files.items()
|
|
||||||
else:
|
|
||||||
raise TypeError(f"'files' must be either Sequence or Mapping, got: {type(files).__name__}")
|
|
||||||
|
|
||||||
for name, file in items:
|
|
||||||
if isinstance(file, io.IOBase):
|
|
||||||
filepaths[name] = file.name
|
|
||||||
elif isinstance(file, Sequence):
|
|
||||||
filepaths[name] = file[1].name
|
|
||||||
|
|
||||||
return filepaths
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _attach_response(cls, response: httpx.Response, **kwargs):
|
|
||||||
request = response.request
|
|
||||||
request_headers = json.dumps(dict(request.headers), default=str, indent=4)
|
|
||||||
request_body = cls._parse_body(request)
|
|
||||||
|
|
||||||
files = kwargs.get("files")
|
|
||||||
request_files = cls._parse_files(files)
|
|
||||||
|
|
||||||
response_headers = json.dumps(dict(response.headers), default=str, indent=4)
|
|
||||||
response_body = cls._parse_body(response)
|
|
||||||
|
|
||||||
report = (
|
|
||||||
f"Method: {request.method}\n\n"
|
|
||||||
+ f"URL: {request.url}\n\n"
|
|
||||||
+ f"Request Headers: {request_headers}\n\n"
|
|
||||||
+ (f"Request Body: {request_body}\n\n" if request_body else "")
|
|
||||||
+ (f"Request Files: {request_files}\n\n" if request_files else "")
|
|
||||||
+ f"Response Status Code: {response.status_code}\n\n"
|
|
||||||
+ f"Response Headers: {response_headers}\n\n"
|
|
||||||
+ (f"Response Body: {response_body}\n\n" if response_body else "")
|
|
||||||
)
|
|
||||||
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, request_files)
|
|
||||||
|
|
||||||
reporter.attach(report, "Requests Info")
|
|
||||||
reporter.attach(curl_request, "CURL")
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict) -> str:
|
|
||||||
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
|
||||||
data = f" -d '{data}'" if data else ""
|
|
||||||
|
|
||||||
for name, path in files.items():
|
|
||||||
data += f' -F "{name}=@{path}"'
|
|
||||||
|
|
||||||
# Option -k means no verify SSL
|
|
||||||
return f"curl {url} -X {method} {headers}{data} -k"
|
|
|
@ -1 +0,0 @@
|
||||||
from frostfs_testlib.clients.s3.interfaces import BucketContainerResolver, S3ClientWrapper, VersioningStatus
|
|
|
@ -1,149 +0,0 @@
|
||||||
import hashlib
|
|
||||||
import logging
|
|
||||||
import xml.etree.ElementTree as ET
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
from botocore.auth import SigV4Auth
|
|
||||||
from botocore.awsrequest import AWSRequest
|
|
||||||
from botocore.credentials import Credentials
|
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
|
||||||
from frostfs_testlib.clients import HttpClient
|
|
||||||
from frostfs_testlib.utils.file_utils import TestFile
|
|
||||||
|
|
||||||
logger = logging.getLogger("NeoLogger")
|
|
||||||
|
|
||||||
DEFAULT_TIMEOUT = 60.0
|
|
||||||
|
|
||||||
|
|
||||||
class S3HttpClient:
|
|
||||||
def __init__(
|
|
||||||
self, s3gate_endpoint: str, access_key_id: str, secret_access_key: str, profile: str = "default", region: str = "us-east-1"
|
|
||||||
) -> None:
|
|
||||||
self.http_client = HttpClient()
|
|
||||||
self.credentials = Credentials(access_key_id, secret_access_key)
|
|
||||||
self.profile = profile
|
|
||||||
self.region = region
|
|
||||||
|
|
||||||
self.iam_endpoint: str = None
|
|
||||||
self.s3gate_endpoint: str = None
|
|
||||||
self.service: str = None
|
|
||||||
self.signature: SigV4Auth = None
|
|
||||||
|
|
||||||
self.set_endpoint(s3gate_endpoint)
|
|
||||||
|
|
||||||
def _to_s3_header(self, header: str) -> dict:
|
|
||||||
replacement_map = {
|
|
||||||
"Acl": "ACL",
|
|
||||||
"_": "-",
|
|
||||||
}
|
|
||||||
|
|
||||||
result = header
|
|
||||||
if not header.startswith("x_amz"):
|
|
||||||
result = header.title()
|
|
||||||
|
|
||||||
for find, replace in replacement_map.items():
|
|
||||||
result = result.replace(find, replace)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _convert_to_s3_headers(self, scope: dict, exclude: list[str] = None):
|
|
||||||
exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"]
|
|
||||||
return {self._to_s3_header(header): value for header, value in scope.items() if header not in exclude and value is not None}
|
|
||||||
|
|
||||||
def _create_aws_request(
|
|
||||||
self, method: str, url: str, headers: dict, content: str | bytes | TestFile = None, params: dict = None
|
|
||||||
) -> AWSRequest:
|
|
||||||
data = b""
|
|
||||||
|
|
||||||
if content is not None:
|
|
||||||
if isinstance(content, TestFile):
|
|
||||||
with open(content, "rb") as io_content:
|
|
||||||
data = io_content.read()
|
|
||||||
elif isinstance(content, str):
|
|
||||||
data = bytes(content, encoding="utf-8")
|
|
||||||
elif isinstance(content, bytes):
|
|
||||||
data = content
|
|
||||||
else:
|
|
||||||
raise TypeError(f"Content expected as a string, bytes or TestFile object, got: {content}")
|
|
||||||
|
|
||||||
headers["X-Amz-Content-SHA256"] = hashlib.sha256(data).hexdigest()
|
|
||||||
aws_request = AWSRequest(method, url, headers, data, params)
|
|
||||||
self.signature.add_auth(aws_request)
|
|
||||||
|
|
||||||
return aws_request
|
|
||||||
|
|
||||||
def _exec_request(
|
|
||||||
self,
|
|
||||||
method: str,
|
|
||||||
url: str,
|
|
||||||
headers: dict,
|
|
||||||
content: str | bytes | TestFile = None,
|
|
||||||
params: dict = None,
|
|
||||||
timeout: float = DEFAULT_TIMEOUT,
|
|
||||||
) -> dict:
|
|
||||||
aws_request = self._create_aws_request(method, url, headers, content, params)
|
|
||||||
response = self.http_client.send(
|
|
||||||
aws_request.method,
|
|
||||||
aws_request.url,
|
|
||||||
headers=dict(aws_request.headers),
|
|
||||||
data=aws_request.data,
|
|
||||||
params=aws_request.params,
|
|
||||||
timeout=timeout,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
response.raise_for_status()
|
|
||||||
except httpx.HTTPStatusError:
|
|
||||||
raise httpx.HTTPStatusError(response.text, request=response.request, response=response)
|
|
||||||
|
|
||||||
root = ET.fromstring(response.read())
|
|
||||||
data = {
|
|
||||||
"LastModified": root.find(".//LastModified").text,
|
|
||||||
"ETag": root.find(".//ETag").text,
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.headers.get("x-amz-version-id"):
|
|
||||||
data["VersionId"] = response.headers.get("x-amz-version-id")
|
|
||||||
|
|
||||||
return data
|
|
||||||
|
|
||||||
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
|
|
||||||
def set_endpoint(self, s3gate_endpoint: str):
|
|
||||||
if self.s3gate_endpoint == s3gate_endpoint:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.s3gate_endpoint = s3gate_endpoint
|
|
||||||
self.service = "s3"
|
|
||||||
self.signature = SigV4Auth(self.credentials, self.service, self.region)
|
|
||||||
|
|
||||||
@reporter.step("Set endpoint IAM to {iam_endpoint}")
|
|
||||||
def set_iam_endpoint(self, iam_endpoint: str):
|
|
||||||
if self.iam_endpoint == iam_endpoint:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.iam_endpoint = iam_endpoint
|
|
||||||
self.service = "iam"
|
|
||||||
self.signature = SigV4Auth(self.credentials, self.service, self.region)
|
|
||||||
|
|
||||||
@reporter.step("Patch object S3")
|
|
||||||
def patch_object(
|
|
||||||
self,
|
|
||||||
bucket: str,
|
|
||||||
key: str,
|
|
||||||
content: str | bytes | TestFile,
|
|
||||||
content_range: str,
|
|
||||||
version_id: str = None,
|
|
||||||
if_match: str = None,
|
|
||||||
if_unmodified_since: str = None,
|
|
||||||
x_amz_expected_bucket_owner: str = None,
|
|
||||||
timeout: float = DEFAULT_TIMEOUT,
|
|
||||||
) -> dict:
|
|
||||||
if content_range and not content_range.startswith("bytes"):
|
|
||||||
content_range = f"bytes {content_range}/*"
|
|
||||||
|
|
||||||
url = f"{self.s3gate_endpoint}/{bucket}/{key}"
|
|
||||||
headers = self._convert_to_s3_headers(locals(), exclude=["bucket", "key", "content", "version_id", "timeout"])
|
|
||||||
params = {"VersionId": version_id} if version_id is not None else None
|
|
||||||
|
|
||||||
return self._exec_request("PATCH", url, headers, content, params, timeout=timeout)
|
|
|
@ -250,7 +250,6 @@ class DockerHost(Host):
|
||||||
unit: Optional[str] = None,
|
unit: Optional[str] = None,
|
||||||
exclude_filter: Optional[str] = None,
|
exclude_filter: Optional[str] = None,
|
||||||
priority: Optional[str] = None,
|
priority: Optional[str] = None,
|
||||||
word_count: bool = None,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
client = self._get_docker_client()
|
client = self._get_docker_client()
|
||||||
filtered_logs = ""
|
filtered_logs = ""
|
||||||
|
|
|
@ -324,7 +324,6 @@ class Host(ABC):
|
||||||
unit: Optional[str] = None,
|
unit: Optional[str] = None,
|
||||||
exclude_filter: Optional[str] = None,
|
exclude_filter: Optional[str] = None,
|
||||||
priority: Optional[str] = None,
|
priority: Optional[str] = None,
|
||||||
word_count: bool = None,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Get logs from host filtered by regex.
|
"""Get logs from host filtered by regex.
|
||||||
|
|
||||||
|
@ -335,7 +334,6 @@ class Host(ABC):
|
||||||
unit: required unit.
|
unit: required unit.
|
||||||
priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher.
|
priority: logs level, 0 - emergency, 7 - debug. All messages with that code and higher.
|
||||||
For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0.
|
For example, if we specify the -p 2 option, journalctl will show all messages with levels 2, 1 and 0.
|
||||||
word_count: output type, expected values: lines, bytes, json
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Found entries as str if any found.
|
Found entries as str if any found.
|
||||||
|
|
95
src/frostfs_testlib/http/http_client.py
Normal file
95
src/frostfs_testlib/http/http_client.py
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from frostfs_testlib import reporter
|
||||||
|
|
||||||
|
timeout = httpx.Timeout(60, read=150)
|
||||||
|
LOGGING_CONFIG = {
|
||||||
|
"disable_existing_loggers": False,
|
||||||
|
"version": 1,
|
||||||
|
"handlers": {"default": {"class": "logging.StreamHandler", "formatter": "http", "stream": "ext://sys.stderr"}},
|
||||||
|
"formatters": {
|
||||||
|
"http": {
|
||||||
|
"format": "%(levelname)s [%(asctime)s] %(name)s - %(message)s",
|
||||||
|
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"loggers": {
|
||||||
|
"httpx": {
|
||||||
|
"handlers": ["default"],
|
||||||
|
"level": "DEBUG",
|
||||||
|
},
|
||||||
|
"httpcore": {
|
||||||
|
"handlers": ["default"],
|
||||||
|
"level": "ERROR",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.config.dictConfig(LOGGING_CONFIG)
|
||||||
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
|
||||||
|
class HttpClient:
|
||||||
|
@reporter.step("Send {method} request to {url}")
|
||||||
|
def send(self, method: str, url: str, expected_status_code: int = None, **kwargs: dict) -> httpx.Response:
|
||||||
|
transport = httpx.HTTPTransport(verify=False, retries=5)
|
||||||
|
client = httpx.Client(timeout=timeout, transport=transport)
|
||||||
|
response = client.request(method, url, **kwargs)
|
||||||
|
|
||||||
|
self._attach_response(response)
|
||||||
|
logger.info(f"Response: {response.status_code} => {response.text}")
|
||||||
|
|
||||||
|
if expected_status_code:
|
||||||
|
assert response.status_code == expected_status_code, (
|
||||||
|
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
|
||||||
|
)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _attach_response(self, response: httpx.Response):
|
||||||
|
request = response.request
|
||||||
|
|
||||||
|
try:
|
||||||
|
request_headers = json.dumps(dict(request.headers), indent=4)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
request_headers = str(request.headers)
|
||||||
|
|
||||||
|
try:
|
||||||
|
request_body = request.read()
|
||||||
|
try:
|
||||||
|
request_body = request_body.decode("utf-8")
|
||||||
|
except UnicodeDecodeError as e:
|
||||||
|
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
|
||||||
|
except Exception as e:
|
||||||
|
request_body = f"Error reading request body: {str(e)}"
|
||||||
|
|
||||||
|
request_body = "" if request_body is None else request_body
|
||||||
|
|
||||||
|
try:
|
||||||
|
response_headers = json.dumps(dict(response.headers), indent=4)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
response_headers = str(response.headers)
|
||||||
|
|
||||||
|
report = (
|
||||||
|
f"Method: {request.method}\n\n"
|
||||||
|
f"URL: {request.url}\n\n"
|
||||||
|
f"Request Headers: {request_headers}\n\n"
|
||||||
|
f"Request Body: {request_body}\n\n"
|
||||||
|
f"Response Status Code: {response.status_code}\n\n"
|
||||||
|
f"Response Headers: {response_headers}\n\n"
|
||||||
|
f"Response Body: {response.text}\n\n"
|
||||||
|
)
|
||||||
|
curl_request = self._create_curl_request(request.url, request.method, request.headers, request_body)
|
||||||
|
|
||||||
|
reporter.attach(report, "Requests Info")
|
||||||
|
reporter.attach(curl_request, "CURL")
|
||||||
|
|
||||||
|
def _create_curl_request(self, url: str, method: str, headers: httpx.Headers, data: str) -> str:
|
||||||
|
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
||||||
|
data = f" -d '{data}'" if data else ""
|
||||||
|
# Option -k means no verify SSL
|
||||||
|
return f"curl {url} -X {method} {headers}{data} -k"
|
|
@ -9,7 +9,6 @@ OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed"
|
||||||
SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
|
SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
|
||||||
OUT_OF_RANGE = "code = 2053.*message = out of range"
|
OUT_OF_RANGE = "code = 2053.*message = out of range"
|
||||||
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
|
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
|
||||||
ADD_CHAIN_ERROR = "code = 5120 message = apemanager access denied"
|
|
||||||
# TODO: Change to codes with message
|
# TODO: Change to codes with message
|
||||||
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
|
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
|
||||||
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
|
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
|
||||||
|
@ -28,10 +27,6 @@ S3_BUCKET_DOES_NOT_ALLOW_ACL = "The bucket does not allow ACLs"
|
||||||
S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema."
|
S3_MALFORMED_XML_REQUEST = "The XML you provided was not well-formed or did not validate against our published schema."
|
||||||
|
|
||||||
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
|
RULE_ACCESS_DENIED_CONTAINER = "access to container operation {operation} is denied by access policy engine: Access denied"
|
||||||
# Errors from node missing reasons if request was forwarded. Commenting for now
|
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
|
||||||
# RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request: method {operation}: Access denied"
|
|
||||||
RULE_ACCESS_DENIED_OBJECT = "access to object operation denied: ape denied request"
|
|
||||||
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
|
NO_RULE_FOUND_CONTAINER = "access to container operation {operation} is denied by access policy engine: NoRuleFound"
|
||||||
# Errors from node missing reasons if request was forwarded. Commenting for now
|
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
|
||||||
# NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request: method {operation}: NoRuleFound"
|
|
||||||
NO_RULE_FOUND_OBJECT = "access to object operation denied: ape denied request"
|
|
||||||
|
|
3
src/frostfs_testlib/s3/__init__.py
Normal file
3
src/frostfs_testlib/s3/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
from frostfs_testlib.s3.aws_cli_client import AwsCliClient
|
||||||
|
from frostfs_testlib.s3.boto3_client import Boto3ClientWrapper
|
||||||
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus
|
|
@ -6,8 +6,8 @@ from time import sleep
|
||||||
from typing import Literal, Optional, Union
|
from typing import Literal, Optional, Union
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
||||||
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||||
from frostfs_testlib.shell import CommandOptions
|
from frostfs_testlib.shell import CommandOptions
|
||||||
from frostfs_testlib.shell.local_shell import LocalShell
|
from frostfs_testlib.shell.local_shell import LocalShell
|
||||||
from frostfs_testlib.utils import string_utils
|
from frostfs_testlib.utils import string_utils
|
||||||
|
@ -171,7 +171,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
return response.get("TagSet")
|
return response.get("TagSet")
|
||||||
|
|
||||||
@reporter.step("Get bucket acl")
|
@reporter.step("Get bucket acl")
|
||||||
def get_bucket_acl(self, bucket: str) -> dict:
|
def get_bucket_acl(self, bucket: str) -> list:
|
||||||
if bucket.startswith("-") or " " in bucket:
|
if bucket.startswith("-") or " " in bucket:
|
||||||
bucket = f'"{bucket}"'
|
bucket = f'"{bucket}"'
|
||||||
|
|
||||||
|
@ -179,7 +179,8 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
return self._to_json(output)
|
response = self._to_json(output)
|
||||||
|
return response.get("Grants")
|
||||||
|
|
||||||
@reporter.step("Get bucket location")
|
@reporter.step("Get bucket location")
|
||||||
def get_bucket_location(self, bucket: str) -> dict:
|
def get_bucket_location(self, bucket: str) -> dict:
|
||||||
|
@ -196,7 +197,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step("List objects S3")
|
@reporter.step("List objects S3")
|
||||||
def list_objects(
|
def list_objects(
|
||||||
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
|
self, bucket: str, page_size: Optional[int] = None, prefix: Optional[str] = None, full_output: bool = False
|
||||||
) -> Union[dict, list[str]]:
|
) -> Union[dict, list[str]]:
|
||||||
if bucket.startswith("-") or " " in bucket:
|
if bucket.startswith("-") or " " in bucket:
|
||||||
bucket = f'"{bucket}"'
|
bucket = f'"{bucket}"'
|
||||||
|
@ -860,7 +861,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
return response["Parts"]
|
return response["Parts"]
|
||||||
|
|
||||||
@reporter.step("Complete multipart upload S3")
|
@reporter.step("Complete multipart upload S3")
|
||||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
||||||
if bucket.startswith("-") or " " in bucket:
|
if bucket.startswith("-") or " " in bucket:
|
||||||
bucket = f'"{bucket}"'
|
bucket = f'"{bucket}"'
|
||||||
|
|
|
@ -13,8 +13,8 @@ from botocore.exceptions import ClientError
|
||||||
from mypy_boto3_s3 import S3Client
|
from mypy_boto3_s3 import S3Client
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.clients.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
||||||
|
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||||
from frostfs_testlib.utils import string_utils
|
from frostfs_testlib.utils import string_utils
|
||||||
|
|
||||||
# TODO: Refactor this code to use shell instead of _cmd_run
|
# TODO: Refactor this code to use shell instead of _cmd_run
|
||||||
|
@ -86,7 +86,6 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
service_name="iam",
|
service_name="iam",
|
||||||
aws_access_key_id=self.access_key_id,
|
aws_access_key_id=self.access_key_id,
|
||||||
aws_secret_access_key=self.secret_access_key,
|
aws_secret_access_key=self.secret_access_key,
|
||||||
region_name=self.region,
|
|
||||||
endpoint_url=self.iam_endpoint,
|
endpoint_url=self.iam_endpoint,
|
||||||
verify=False,
|
verify=False,
|
||||||
)
|
)
|
||||||
|
@ -230,13 +229,14 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
return response.get("TagSet")
|
return response.get("TagSet")
|
||||||
|
|
||||||
@reporter.step("Get bucket acl")
|
@reporter.step("Get bucket acl")
|
||||||
def get_bucket_acl(self, bucket: str) -> dict:
|
def get_bucket_acl(self, bucket: str) -> list:
|
||||||
return self._exec_request(
|
response = self._exec_request(
|
||||||
self.boto3_client.get_bucket_acl,
|
self.boto3_client.get_bucket_acl,
|
||||||
params={"Bucket": bucket},
|
params={"Bucket": bucket},
|
||||||
endpoint=self.s3gate_endpoint,
|
endpoint=self.s3gate_endpoint,
|
||||||
profile=self.profile,
|
profile=self.profile,
|
||||||
)
|
)
|
||||||
|
return response.get("Grants")
|
||||||
|
|
||||||
@reporter.step("Delete bucket tagging")
|
@reporter.step("Delete bucket tagging")
|
||||||
def delete_bucket_tagging(self, bucket: str) -> None:
|
def delete_bucket_tagging(self, bucket: str) -> None:
|
||||||
|
@ -399,7 +399,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
|
|
||||||
@reporter.step("List objects S3")
|
@reporter.step("List objects S3")
|
||||||
def list_objects(
|
def list_objects(
|
||||||
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
|
self, bucket: str, page_size: Optional[int] = None, prefix: Optional[str] = None, full_output: bool = False
|
||||||
) -> Union[dict, list[str]]:
|
) -> Union[dict, list[str]]:
|
||||||
params = {"Bucket": bucket}
|
params = {"Bucket": bucket}
|
||||||
if page_size:
|
if page_size:
|
||||||
|
@ -704,7 +704,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
return response["Parts"]
|
return response["Parts"]
|
||||||
|
|
||||||
@reporter.step("Complete multipart upload S3")
|
@reporter.step("Complete multipart upload S3")
|
||||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
||||||
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
|
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
|
||||||
params = self._convert_to_s3_params(locals(), exclude=["parts"])
|
params = self._convert_to_s3_params(locals(), exclude=["parts"])
|
||||||
params["MultipartUpload"] = {"Parts": parts}
|
params["MultipartUpload"] = {"Parts": parts}
|
||||||
|
@ -1283,7 +1283,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
profile=self.profile,
|
profile=self.profile,
|
||||||
)
|
)
|
||||||
|
|
||||||
# MFA methods
|
# MFA methods
|
||||||
|
|
||||||
@reporter.step("Creates a new virtual MFA device")
|
@reporter.step("Creates a new virtual MFA device")
|
||||||
def iam_create_virtual_mfa_device(
|
def iam_create_virtual_mfa_device(
|
||||||
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
self, virtual_mfa_device_name: str, outfile: Optional[str] = None, bootstrap_method: Optional[str] = None
|
|
@ -1,7 +1,7 @@
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from frostfs_testlib.cli.generic_cli import GenericCli
|
from frostfs_testlib.cli.generic_cli import GenericCli
|
||||||
from frostfs_testlib.clients.s3 import BucketContainerResolver
|
from frostfs_testlib.s3.interfaces import BucketContainerResolver
|
||||||
from frostfs_testlib.storage.cluster import ClusterNode
|
from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,7 @@ class S3ClientWrapper(HumanReadableABC):
|
||||||
"""Deletes the tags from the bucket."""
|
"""Deletes the tags from the bucket."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_bucket_acl(self, bucket: str) -> dict:
|
def get_bucket_acl(self, bucket: str) -> list:
|
||||||
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
|
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
@ -196,7 +196,7 @@ class S3ClientWrapper(HumanReadableABC):
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def list_objects(
|
def list_objects(
|
||||||
self, bucket: str, full_output: bool = False, page_size: Optional[int] = None, prefix: Optional[str] = None
|
self, bucket: str, page_size: Optional[int] = None, prefix: Optional[str] = None, full_output: bool = False
|
||||||
) -> Union[dict, list[str]]:
|
) -> Union[dict, list[str]]:
|
||||||
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
|
"""Returns some or all (up to 1,000) of the objects in a bucket with each request.
|
||||||
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
|
You can use the request parameters as selection criteria to return a subset of the objects in a bucket.
|
||||||
|
@ -336,7 +336,7 @@ class S3ClientWrapper(HumanReadableABC):
|
||||||
"""Lists the parts that have been uploaded for a specific multipart upload."""
|
"""Lists the parts that have been uploaded for a specific multipart upload."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
||||||
"""Completes a multipart upload by assembling previously uploaded parts."""
|
"""Completes a multipart upload by assembling previously uploaded parts."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
|
@ -7,7 +7,9 @@ from typing import Optional, Union
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.cli import FrostfsCli
|
from frostfs_testlib.cli import FrostfsCli
|
||||||
|
from frostfs_testlib.plugins import load_plugin
|
||||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
|
||||||
|
from frostfs_testlib.s3.interfaces import BucketContainerResolver
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.shell import Shell
|
||||||
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
|
from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
|
@ -109,8 +111,6 @@ def create_container(
|
||||||
options: Optional[dict] = None,
|
options: Optional[dict] = None,
|
||||||
await_mode: bool = True,
|
await_mode: bool = True,
|
||||||
wait_for_creation: bool = True,
|
wait_for_creation: bool = True,
|
||||||
nns_zone: Optional[str] = None,
|
|
||||||
nns_name: Optional[str] = None,
|
|
||||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
|
@ -143,8 +143,6 @@ def create_container(
|
||||||
result = cli.container.create(
|
result = cli.container.create(
|
||||||
rpc_endpoint=endpoint,
|
rpc_endpoint=endpoint,
|
||||||
policy=rule,
|
policy=rule,
|
||||||
nns_name=nns_name,
|
|
||||||
nns_zone=nns_zone,
|
|
||||||
basic_acl=basic_acl,
|
basic_acl=basic_acl,
|
||||||
attributes=attributes,
|
attributes=attributes,
|
||||||
name=name,
|
name=name,
|
||||||
|
|
|
@ -12,7 +12,6 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC,
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR
|
from frostfs_testlib.resources.common import ASSETS_DIR
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.shell import Shell
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
|
||||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
from frostfs_testlib.testing import wait_for_success
|
from frostfs_testlib.testing import wait_for_success
|
||||||
from frostfs_testlib.utils import json_utils
|
from frostfs_testlib.utils import json_utils
|
||||||
|
@ -753,10 +752,7 @@ def get_object_nodes(
|
||||||
]
|
]
|
||||||
|
|
||||||
object_nodes = [
|
object_nodes = [
|
||||||
cluster_node
|
cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip
|
||||||
for netmap_node in netmap_nodes
|
|
||||||
for cluster_node in cluster.cluster_nodes
|
|
||||||
if netmap_node.node == cluster_node.get_interface(Interfaces.MGMT)
|
|
||||||
]
|
]
|
||||||
|
|
||||||
return object_nodes
|
return object_nodes
|
||||||
|
|
0
src/frostfs_testlib/steps/http/__init__.py
Normal file
0
src/frostfs_testlib/steps/http/__init__.py
Normal file
|
@ -12,8 +12,8 @@ import requests
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.cli import GenericCli
|
from frostfs_testlib.cli import GenericCli
|
||||||
from frostfs_testlib.clients.s3.aws_cli_client import command_options
|
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE
|
from frostfs_testlib.resources.common import ASSETS_DIR, SIMPLE_OBJECT_SIZE
|
||||||
|
from frostfs_testlib.s3.aws_cli_client import command_options
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.shell import Shell
|
||||||
from frostfs_testlib.shell.local_shell import LocalShell
|
from frostfs_testlib.shell.local_shell import LocalShell
|
||||||
from frostfs_testlib.steps.cli.object import get_object
|
from frostfs_testlib.steps.cli.object import get_object
|
||||||
|
@ -38,34 +38,34 @@ def get_via_http_gate(
|
||||||
"""
|
"""
|
||||||
This function gets given object from HTTP gate
|
This function gets given object from HTTP gate
|
||||||
cid: container id to get object from
|
cid: container id to get object from
|
||||||
oid: object id / object key
|
oid: object ID
|
||||||
node: node to make request
|
node: node to make request
|
||||||
request_path: (optional) http request, if ommited - use default [{endpoint}/get/{cid}/{oid}]
|
request_path: (optional) http request, if ommited - use default [{endpoint}/get/{cid}/{oid}]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
request = f"{node.http_gate.get_endpoint()}/get/{cid}/{oid}"
|
# if `request_path` parameter omitted, use default
|
||||||
if request_path:
|
if request_path is None:
|
||||||
|
request = f"{node.http_gate.get_endpoint()}/get/{cid}/{oid}"
|
||||||
|
else:
|
||||||
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
||||||
|
|
||||||
response = requests.get(request, stream=True, timeout=timeout, verify=False)
|
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
||||||
|
|
||||||
if not response.ok:
|
if not resp.ok:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
f"""Failed to get object via HTTP gate:
|
f"""Failed to get object via HTTP gate:
|
||||||
request: {response.request.path_url},
|
request: {resp.request.path_url},
|
||||||
response: {response.text},
|
response: {resp.text},
|
||||||
headers: {response.headers},
|
headers: {resp.headers},
|
||||||
status code: {response.status_code} {response.reason}"""
|
status code: {resp.status_code} {resp.reason}"""
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Request: {request}")
|
logger.info(f"Request: {request}")
|
||||||
_attach_allure_step(request, response.status_code)
|
_attach_allure_step(request, resp.status_code)
|
||||||
|
|
||||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}"))
|
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}"))
|
||||||
with open(test_file, "wb") as file:
|
with open(test_file, "wb") as file:
|
||||||
for chunk in response.iter_content(chunk_size=8192):
|
shutil.copyfileobj(resp.raw, file)
|
||||||
file.write(chunk)
|
|
||||||
|
|
||||||
return test_file
|
return test_file
|
||||||
|
|
||||||
|
|
||||||
|
@ -117,12 +117,12 @@ def get_via_http_gate_by_attribute(
|
||||||
endpoint: http gate endpoint
|
endpoint: http gate endpoint
|
||||||
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
|
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
attr_name = list(attribute.keys())[0]
|
attr_name = list(attribute.keys())[0]
|
||||||
attr_value = quote_plus(str(attribute.get(attr_name)))
|
attr_value = quote_plus(str(attribute.get(attr_name)))
|
||||||
|
# if `request_path` parameter ommited, use default
|
||||||
request = f"{node.http_gate.get_endpoint()}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}"
|
if request_path is None:
|
||||||
if request_path:
|
request = f"{node.http_gate.get_endpoint()}/get_by_attribute/{cid}/{quote_plus(str(attr_name))}/{attr_value}"
|
||||||
|
else:
|
||||||
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
||||||
|
|
||||||
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
||||||
|
@ -357,9 +357,19 @@ def try_to_get_object_via_passed_request_and_expect_error(
|
||||||
) -> None:
|
) -> None:
|
||||||
try:
|
try:
|
||||||
if attrs is None:
|
if attrs is None:
|
||||||
get_via_http_gate(cid, oid, node, http_request_path)
|
get_via_http_gate(
|
||||||
|
cid=cid,
|
||||||
|
oid=oid,
|
||||||
|
node=node,
|
||||||
|
request_path=http_request_path,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
get_via_http_gate_by_attribute(cid, attrs, node, http_request_path)
|
get_via_http_gate_by_attribute(
|
||||||
|
cid=cid,
|
||||||
|
attribute=attrs,
|
||||||
|
node=node,
|
||||||
|
request_path=http_request_path,
|
||||||
|
)
|
||||||
raise AssertionError(f"Expected error on getting object with cid: {cid}")
|
raise AssertionError(f"Expected error on getting object with cid: {cid}")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
match = error_pattern.casefold() in str(err).casefold()
|
match = error_pattern.casefold() in str(err).casefold()
|
|
@ -4,18 +4,16 @@ from frostfs_testlib.storage.cluster import ClusterNode
|
||||||
|
|
||||||
class IpHelper:
|
class IpHelper:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[tuple]) -> None:
|
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
|
||||||
shell = node.host.get_shell()
|
shell = node.host.get_shell()
|
||||||
for ip, table in block_ip:
|
for ip in block_ip:
|
||||||
if not table:
|
shell.exec(f"ip route add blackhole {ip}")
|
||||||
shell.exec(f"ip r a blackhole {ip}")
|
|
||||||
continue
|
|
||||||
shell.exec(f"ip r a blackhole {ip} table {table}")
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def restore_input_traffic_to_node(node: ClusterNode) -> None:
|
def restore_input_traffic_to_node(node: ClusterNode) -> None:
|
||||||
shell = node.host.get_shell()
|
shell = node.host.get_shell()
|
||||||
unlock_ip = shell.exec("ip r l table all | grep blackhole", CommandOptions(check=False)).stdout
|
unlock_ip = shell.exec("ip route list | grep blackhole", CommandOptions(check=False))
|
||||||
|
if unlock_ip.return_code != 0:
|
||||||
for active_blackhole in unlock_ip.strip().split("\n"):
|
return
|
||||||
shell.exec(f"ip r d {active_blackhole}")
|
for ip in unlock_ip.stdout.strip().split("\n"):
|
||||||
|
shell.exec(f"ip route del blackhole {ip.split(' ')[1]}")
|
||||||
|
|
|
@ -6,7 +6,8 @@ from typing import Optional
|
||||||
from dateutil.parser import parse
|
from dateutil.parser import parse
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.clients.s3 import BucketContainerResolver, S3ClientWrapper, VersioningStatus
|
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
||||||
|
from frostfs_testlib.s3.interfaces import BucketContainerResolver
|
||||||
from frostfs_testlib.shell import Shell
|
from frostfs_testlib.shell import Shell
|
||||||
from frostfs_testlib.steps.cli.container import search_nodes_with_container
|
from frostfs_testlib.steps.cli.container import search_nodes_with_container
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
|
@ -184,26 +185,3 @@ def search_nodes_with_bucket(
|
||||||
break
|
break
|
||||||
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
|
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
|
||||||
return nodes_list
|
return nodes_list
|
||||||
|
|
||||||
|
|
||||||
def get_bytes_relative_to_object(value: int | str, object_size: int = None, part_size: int = None) -> int:
|
|
||||||
if isinstance(value, int):
|
|
||||||
return value
|
|
||||||
|
|
||||||
if "part" not in value and "object" not in value:
|
|
||||||
return int(value)
|
|
||||||
|
|
||||||
if object_size is not None:
|
|
||||||
value = value.replace("object", str(object_size))
|
|
||||||
|
|
||||||
if part_size is not None:
|
|
||||||
value = value.replace("part", str(part_size))
|
|
||||||
|
|
||||||
return int(eval(value))
|
|
||||||
|
|
||||||
|
|
||||||
def get_range_relative_to_object(rng: str, object_size: int = None, part_size: int = None, int_values: bool = False) -> str | int:
|
|
||||||
start, end = rng.split(":")
|
|
||||||
start = get_bytes_relative_to_object(start, object_size, part_size)
|
|
||||||
end = get_bytes_relative_to_object(end, object_size, part_size)
|
|
||||||
return (start, end) if int_values else f"bytes {start}-{end}/*"
|
|
|
@ -91,10 +91,10 @@ class ClusterNode:
|
||||||
config_str = yaml.dump(new_config)
|
config_str = yaml.dump(new_config)
|
||||||
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
||||||
|
|
||||||
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
|
def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
|
||||||
return self.service(service_type).config
|
return self.service(service_type).config
|
||||||
|
|
||||||
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
|
def service(self, service_type: ServiceClass) -> ServiceClass:
|
||||||
"""
|
"""
|
||||||
Get a service cluster node of specified type.
|
Get a service cluster node of specified type.
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,4 @@ class PlacementRule:
|
||||||
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
|
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||||
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
|
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||||
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
|
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
|
||||||
REP_1_FOR_2_NODES_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 2 FROM * AS X"
|
|
||||||
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
|
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
|
||||||
EC_1_1_FOR_2_NODES_PLACEMENT_RULE = "EC 1.1 IN X CBF 1 SELECT 2 FROM * AS X"
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import datetime
|
import datetime
|
||||||
import itertools
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from typing import TypeVar
|
from typing import TypeVar
|
||||||
|
@ -19,7 +18,7 @@ from frostfs_testlib.steps.node_management import include_node_to_network_map, r
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
|
||||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeStatus
|
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
|
||||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
from frostfs_testlib.testing import parallel
|
from frostfs_testlib.testing import parallel
|
||||||
from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for_success
|
from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for_success
|
||||||
|
@ -40,7 +39,7 @@ class ClusterStateController:
|
||||||
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
|
def __init__(self, shell: Shell, cluster: Cluster, healthcheck: Healthcheck) -> None:
|
||||||
self.stopped_nodes: list[ClusterNode] = []
|
self.stopped_nodes: list[ClusterNode] = []
|
||||||
self.detached_disks: dict[str, DiskController] = {}
|
self.detached_disks: dict[str, DiskController] = {}
|
||||||
self.dropped_traffic: set[ClusterNode] = set()
|
self.dropped_traffic: list[ClusterNode] = []
|
||||||
self.excluded_from_netmap: list[StorageNode] = []
|
self.excluded_from_netmap: list[StorageNode] = []
|
||||||
self.stopped_services: set[NodeBase] = set()
|
self.stopped_services: set[NodeBase] = set()
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
|
@ -326,22 +325,22 @@ class ClusterStateController:
|
||||||
|
|
||||||
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
|
@reporter.step("Drop traffic to {node}, nodes - {block_nodes}")
|
||||||
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
|
def drop_traffic(self, node: ClusterNode, wakeup_timeout: int, name_interface: str, block_nodes: list[ClusterNode] = None) -> None:
|
||||||
interfaces_tables = self._parse_interfaces(block_nodes, name_interface)
|
list_ip = self._parse_interfaces(block_nodes, name_interface)
|
||||||
IpHelper.drop_input_traffic_to_node(node, interfaces_tables)
|
IpHelper.drop_input_traffic_to_node(node, list_ip)
|
||||||
time.sleep(wakeup_timeout)
|
time.sleep(wakeup_timeout)
|
||||||
self.dropped_traffic.add(node)
|
self.dropped_traffic.append(node)
|
||||||
|
|
||||||
@reporter.step("Start traffic to {node}")
|
@reporter.step("Start traffic to {node}")
|
||||||
def restore_traffic(self, node: ClusterNode) -> None:
|
def restore_traffic(self, node: ClusterNode) -> None:
|
||||||
IpHelper.restore_input_traffic_to_node(node=node)
|
IpHelper.restore_input_traffic_to_node(node=node)
|
||||||
self.dropped_traffic.discard(node)
|
index = self.dropped_traffic.index(node)
|
||||||
|
self.dropped_traffic.pop(index)
|
||||||
|
|
||||||
@reporter.step("Restore blocked nodes")
|
@reporter.step("Restore blocked nodes")
|
||||||
def restore_all_traffic(self):
|
def restore_all_traffic(self):
|
||||||
if not self.dropped_traffic:
|
if not self.dropped_traffic:
|
||||||
return
|
return
|
||||||
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
||||||
self.dropped_traffic.clear()
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step("Hard reboot host {node} via magic SysRq option")
|
@reporter.step("Hard reboot host {node} via magic SysRq option")
|
||||||
|
@ -454,11 +453,9 @@ class ClusterStateController:
|
||||||
if not checker_node:
|
if not checker_node:
|
||||||
checker_node = cluster_node
|
checker_node = cluster_node
|
||||||
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
|
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
|
||||||
netmap = [node for node in netmap if cluster_node.get_interface(Interfaces.MGMT) == node.node]
|
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
||||||
if status == NodeStatus.OFFLINE:
|
if status == NodeStatus.OFFLINE:
|
||||||
assert (
|
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
||||||
cluster_node.get_interface(Interfaces.MGMT) not in netmap
|
|
||||||
), f"{cluster_node.get_interface(Interfaces.MGMT)} not in Offline"
|
|
||||||
else:
|
else:
|
||||||
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
|
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
|
||||||
|
|
||||||
|
@ -519,31 +516,17 @@ class ClusterStateController:
|
||||||
|
|
||||||
return disk_controller
|
return disk_controller
|
||||||
|
|
||||||
@reporter.step("Restore traffic {node}")
|
|
||||||
def _restore_traffic_to_node(self, node):
|
def _restore_traffic_to_node(self, node):
|
||||||
IpHelper.restore_input_traffic_to_node(node)
|
IpHelper.restore_input_traffic_to_node(node)
|
||||||
|
|
||||||
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str) -> list[tuple]:
|
def _parse_interfaces(self, nodes: list[ClusterNode], name_interface: str):
|
||||||
interfaces_and_tables = set()
|
interfaces = []
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
shell = node.host.get_shell()
|
dict_interfaces = node.host.config.interfaces
|
||||||
lines = shell.exec(f"ip r l table all | grep '{name_interface}'").stdout.splitlines()
|
for type, ip in dict_interfaces.items():
|
||||||
|
if name_interface in type:
|
||||||
ips = []
|
interfaces.append(ip)
|
||||||
tables = []
|
return interfaces
|
||||||
|
|
||||||
for line in lines:
|
|
||||||
if "src" not in line or "table local" in line:
|
|
||||||
continue
|
|
||||||
parts = line.split()
|
|
||||||
ips.append(parts[-1])
|
|
||||||
if "table" in line:
|
|
||||||
tables.append(parts[parts.index("table") + 1])
|
|
||||||
tables.append(None)
|
|
||||||
|
|
||||||
[interfaces_and_tables.add((ip, table)) for ip, table in itertools.product(ips, tables)]
|
|
||||||
|
|
||||||
return interfaces_and_tables
|
|
||||||
|
|
||||||
@reporter.step("Ping node")
|
@reporter.step("Ping node")
|
||||||
def _ping_host(self, node: ClusterNode):
|
def _ping_host(self, node: ClusterNode):
|
||||||
|
|
|
@ -13,7 +13,6 @@ FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
|
||||||
|
|
||||||
class ObjectOperations(HumanReadableEnum):
|
class ObjectOperations(HumanReadableEnum):
|
||||||
PUT = "object.put"
|
PUT = "object.put"
|
||||||
PATCH = "object.patch"
|
|
||||||
GET = "object.get"
|
GET = "object.get"
|
||||||
HEAD = "object.head"
|
HEAD = "object.head"
|
||||||
GET_RANGE = "object.range"
|
GET_RANGE = "object.range"
|
||||||
|
@ -27,18 +26,6 @@ class ObjectOperations(HumanReadableEnum):
|
||||||
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
|
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
|
||||||
|
|
||||||
|
|
||||||
class ContainerOperations(HumanReadableEnum):
|
|
||||||
PUT = "container.put"
|
|
||||||
GET = "container.get"
|
|
||||||
LIST = "container.list"
|
|
||||||
DELETE = "container.delete"
|
|
||||||
WILDCARD_ALL = "container.*"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_all():
|
|
||||||
return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL]
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Operations:
|
class Operations:
|
||||||
GET_CONTAINER = "GetContainer"
|
GET_CONTAINER = "GetContainer"
|
||||||
|
@ -52,7 +39,6 @@ class Operations:
|
||||||
SEARCH_OBJECT = "SearchObject"
|
SEARCH_OBJECT = "SearchObject"
|
||||||
HEAD_OBJECT = "HeadObject"
|
HEAD_OBJECT = "HeadObject"
|
||||||
PUT_OBJECT = "PutObject"
|
PUT_OBJECT = "PutObject"
|
||||||
PATCH_OBJECT = "PatchObject"
|
|
||||||
|
|
||||||
|
|
||||||
class Verb(HumanReadableEnum):
|
class Verb(HumanReadableEnum):
|
||||||
|
@ -138,7 +124,7 @@ class Rule:
|
||||||
|
|
||||||
if not operations:
|
if not operations:
|
||||||
self.operations = []
|
self.operations = []
|
||||||
elif isinstance(operations, (ObjectOperations, ContainerOperations)):
|
elif isinstance(operations, ObjectOperations):
|
||||||
self.operations = [operations]
|
self.operations = [operations]
|
||||||
else:
|
else:
|
||||||
self.operations = operations
|
self.operations = operations
|
||||||
|
|
|
@ -6,7 +6,7 @@ from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
|
||||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher
|
from frostfs_testlib.storage.controllers.shards_watcher import ShardsWatcher
|
||||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, Interfaces, NodeNetmapInfo
|
from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk, NodeNetmapInfo
|
||||||
from frostfs_testlib.storage.grpc_operations import interfaces
|
from frostfs_testlib.storage.grpc_operations import interfaces
|
||||||
from frostfs_testlib.testing.test_control import wait_for_success
|
from frostfs_testlib.testing.test_control import wait_for_success
|
||||||
from frostfs_testlib.utils.cli_utils import parse_netmap_output
|
from frostfs_testlib.utils.cli_utils import parse_netmap_output
|
||||||
|
@ -30,7 +30,7 @@ class ChunksOperations(interfaces.ChunksInterface):
|
||||||
result = []
|
result = []
|
||||||
for node_info in netmap:
|
for node_info in netmap:
|
||||||
for cluster_node in cluster.cluster_nodes:
|
for cluster_node in cluster.cluster_nodes:
|
||||||
if node_info.node == cluster_node.get_interface(Interfaces.MGMT):
|
if node_info.node == cluster_node.host_ip:
|
||||||
result.append(cluster_node)
|
result.append(cluster_node)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ class ChunksOperations(interfaces.ChunksInterface):
|
||||||
for node_info in netmap:
|
for node_info in netmap:
|
||||||
if node_info.node_id in chunk.confirmed_nodes:
|
if node_info.node_id in chunk.confirmed_nodes:
|
||||||
for cluster_node in cluster.cluster_nodes:
|
for cluster_node in cluster.cluster_nodes:
|
||||||
if cluster_node.get_interface(Interfaces.MGMT) == node_info.node:
|
if cluster_node.host_ip == node_info.node:
|
||||||
return (cluster_node, node_info)
|
return (cluster_node, node_info)
|
||||||
|
|
||||||
@wait_for_success(300, 5, fail_testcase=None)
|
@wait_for_success(300, 5, fail_testcase=None)
|
||||||
|
|
|
@ -5,9 +5,9 @@ from typing import List, Optional, Union
|
||||||
|
|
||||||
from frostfs_testlib import reporter
|
from frostfs_testlib import reporter
|
||||||
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
|
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
|
||||||
from frostfs_testlib.clients.s3 import BucketContainerResolver
|
|
||||||
from frostfs_testlib.plugins import load_plugin
|
from frostfs_testlib.plugins import load_plugin
|
||||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
||||||
|
from frostfs_testlib.s3.interfaces import BucketContainerResolver
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
from frostfs_testlib.storage.grpc_operations import interfaces
|
from frostfs_testlib.storage.grpc_operations import interfaces
|
||||||
from frostfs_testlib.utils import json_utils
|
from frostfs_testlib.utils import json_utils
|
||||||
|
@ -181,17 +181,20 @@ class ContainerOperations(interfaces.ContainerInterface):
|
||||||
force: bool = False,
|
force: bool = False,
|
||||||
trace: bool = False,
|
trace: bool = False,
|
||||||
):
|
):
|
||||||
return self.cli.container.delete(
|
try:
|
||||||
rpc_endpoint=endpoint,
|
return self.cli.container.delete(
|
||||||
cid=cid,
|
rpc_endpoint=endpoint,
|
||||||
address=address,
|
cid=cid,
|
||||||
await_mode=await_mode,
|
address=address,
|
||||||
session=session,
|
await_mode=await_mode,
|
||||||
ttl=ttl,
|
session=session,
|
||||||
xhdr=xhdr,
|
ttl=ttl,
|
||||||
force=force,
|
xhdr=xhdr,
|
||||||
trace=trace,
|
force=force,
|
||||||
).stdout
|
trace=trace,
|
||||||
|
).stdout
|
||||||
|
except RuntimeError as e:
|
||||||
|
print(f"Error request:\n{e}")
|
||||||
|
|
||||||
@reporter.step("Get container")
|
@reporter.step("Get container")
|
||||||
def get(
|
def get(
|
||||||
|
|
|
@ -11,7 +11,6 @@ from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
||||||
from frostfs_testlib.resources.common import ASSETS_DIR
|
from frostfs_testlib.resources.common import ASSETS_DIR
|
||||||
from frostfs_testlib.shell.interfaces import CommandResult
|
from frostfs_testlib.shell.interfaces import CommandResult
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
|
||||||
from frostfs_testlib.storage.grpc_operations import interfaces
|
from frostfs_testlib.storage.grpc_operations import interfaces
|
||||||
from frostfs_testlib.storage.grpc_operations.implementations.chunks import ChunksOperations
|
from frostfs_testlib.storage.grpc_operations.implementations.chunks import ChunksOperations
|
||||||
from frostfs_testlib.testing.test_control import wait_for_success
|
from frostfs_testlib.testing.test_control import wait_for_success
|
||||||
|
@ -207,11 +206,6 @@ class ObjectOperations(interfaces.ObjectInterface):
|
||||||
hash_type=hash_type,
|
hash_type=hash_type,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
if range:
|
|
||||||
# Cut off the range and return only hash
|
|
||||||
return result.stdout.split(":")[1].strip()
|
|
||||||
|
|
||||||
return result.stdout
|
return result.stdout
|
||||||
|
|
||||||
@reporter.step("Head object")
|
@reporter.step("Head object")
|
||||||
|
@ -413,57 +407,6 @@ class ObjectOperations(interfaces.ObjectInterface):
|
||||||
oid = id_str.split(":")[1]
|
oid = id_str.split(":")[1]
|
||||||
return oid.strip()
|
return oid.strip()
|
||||||
|
|
||||||
@reporter.step("Patch object")
|
|
||||||
def patch(
|
|
||||||
self,
|
|
||||||
cid: str,
|
|
||||||
oid: str,
|
|
||||||
endpoint: str,
|
|
||||||
ranges: list[str] = None,
|
|
||||||
payloads: list[str] = None,
|
|
||||||
new_attrs: Optional[str] = None,
|
|
||||||
replace_attrs: bool = False,
|
|
||||||
bearer: str = "",
|
|
||||||
xhdr: Optional[dict] = None,
|
|
||||||
session: Optional[str] = None,
|
|
||||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
|
||||||
trace: bool = False,
|
|
||||||
) -> str:
|
|
||||||
"""
|
|
||||||
PATCH an object.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
cid: ID of Container where we get the Object from
|
|
||||||
oid: Object ID
|
|
||||||
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
|
|
||||||
ranges: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
|
|
||||||
payloads: An array of file paths to be applied in each range
|
|
||||||
new_attrs: Attributes to be changed in the format "key1=value1,key2=value2"
|
|
||||||
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
|
|
||||||
bearer: Path to Bearer Token file, appends to `--bearer` key
|
|
||||||
xhdr: Request X-Headers in form of Key=Value
|
|
||||||
session: Path to a JSON-encoded container session token
|
|
||||||
timeout: Timeout for the operation
|
|
||||||
trace: Generate trace ID and print it
|
|
||||||
Returns:
|
|
||||||
(str): ID of patched Object
|
|
||||||
"""
|
|
||||||
result = self.cli.object.patch(
|
|
||||||
rpc_endpoint=endpoint,
|
|
||||||
cid=cid,
|
|
||||||
oid=oid,
|
|
||||||
range=ranges,
|
|
||||||
payload=payloads,
|
|
||||||
new_attrs=new_attrs,
|
|
||||||
replace_attrs=replace_attrs,
|
|
||||||
bearer=bearer,
|
|
||||||
xhdr=xhdr,
|
|
||||||
session=session,
|
|
||||||
timeout=timeout,
|
|
||||||
trace=trace,
|
|
||||||
)
|
|
||||||
return result.stdout.split(":")[1].strip()
|
|
||||||
|
|
||||||
@reporter.step("Put object to random node")
|
@reporter.step("Put object to random node")
|
||||||
def put_to_random_node(
|
def put_to_random_node(
|
||||||
self,
|
self,
|
||||||
|
@ -675,34 +618,7 @@ class ObjectOperations(interfaces.ObjectInterface):
|
||||||
cluster_node
|
cluster_node
|
||||||
for netmap_node in netmap_nodes
|
for netmap_node in netmap_nodes
|
||||||
for cluster_node in cluster.cluster_nodes
|
for cluster_node in cluster.cluster_nodes
|
||||||
if netmap_node.node == cluster_node.get_interface(Interfaces.MGMT)
|
if netmap_node.node == cluster_node.host_ip
|
||||||
]
|
]
|
||||||
|
|
||||||
return object_nodes
|
return object_nodes
|
||||||
|
|
||||||
@reporter.step("Search parts of object")
|
|
||||||
def parts(
|
|
||||||
self,
|
|
||||||
cid: str,
|
|
||||||
oid: str,
|
|
||||||
alive_node: ClusterNode,
|
|
||||||
bearer: str = "",
|
|
||||||
xhdr: Optional[dict] = None,
|
|
||||||
is_direct: bool = False,
|
|
||||||
verify_presence_all: bool = False,
|
|
||||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
|
||||||
) -> list[str]:
|
|
||||||
endpoint = alive_node.storage_node.get_rpc_endpoint()
|
|
||||||
response = self.cli.object.nodes(
|
|
||||||
rpc_endpoint=endpoint,
|
|
||||||
cid=cid,
|
|
||||||
oid=oid,
|
|
||||||
bearer=bearer,
|
|
||||||
ttl=1 if is_direct else None,
|
|
||||||
json=True,
|
|
||||||
xhdr=xhdr,
|
|
||||||
timeout=timeout,
|
|
||||||
verify_presence_all=verify_presence_all,
|
|
||||||
)
|
|
||||||
response_json = json.loads(response.stdout)
|
|
||||||
return [data_object["object_id"] for data_object in response_json["data_objects"]]
|
|
||||||
|
|
|
@ -198,24 +198,6 @@ class ObjectInterface(ABC):
|
||||||
) -> str:
|
) -> str:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def patch(
|
|
||||||
self,
|
|
||||||
cid: str,
|
|
||||||
oid: str,
|
|
||||||
endpoint: str,
|
|
||||||
ranges: Optional[list[str]] = None,
|
|
||||||
payloads: Optional[list[str]] = None,
|
|
||||||
new_attrs: Optional[str] = None,
|
|
||||||
replace_attrs: bool = False,
|
|
||||||
bearer: Optional[str] = None,
|
|
||||||
xhdr: Optional[dict] = None,
|
|
||||||
session: Optional[str] = None,
|
|
||||||
timeout: Optional[str] = None,
|
|
||||||
trace: bool = False,
|
|
||||||
) -> str:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def put_to_random_node(
|
def put_to_random_node(
|
||||||
self,
|
self,
|
||||||
|
@ -282,20 +264,6 @@ class ObjectInterface(ABC):
|
||||||
) -> List[ClusterNode]:
|
) -> List[ClusterNode]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def parts(
|
|
||||||
self,
|
|
||||||
cid: str,
|
|
||||||
oid: str,
|
|
||||||
alive_node: ClusterNode,
|
|
||||||
bearer: str = "",
|
|
||||||
xhdr: Optional[dict] = None,
|
|
||||||
is_direct: bool = False,
|
|
||||||
verify_presence_all: bool = False,
|
|
||||||
timeout: Optional[str] = None,
|
|
||||||
) -> List[str]:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ContainerInterface(ABC):
|
class ContainerInterface(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
|
|
@ -80,9 +80,6 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d
|
||||||
if not params:
|
if not params:
|
||||||
params = {}
|
params = {}
|
||||||
|
|
||||||
if params.get("Body") and len(params.get("Body")) > 1000:
|
|
||||||
params["Body"] = "<large text data>"
|
|
||||||
|
|
||||||
output_params = params
|
output_params = params
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -2,7 +2,7 @@ from typing import Any
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from frostfs_testlib.clients import AwsCliClient, Boto3ClientWrapper
|
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper
|
||||||
from frostfs_testlib.storage.dataclasses.acl import EACLRole
|
from frostfs_testlib.storage.dataclasses.acl import EACLRole
|
||||||
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
||||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||||
|
|
Loading…
Reference in a new issue