forked from TrueCloudLab/frostfs-testlib
Compare commits
10 commits
3dc7a5bdb0
...
890e1288a9
Author | SHA1 | Date | |
---|---|---|---|
890e1288a9 | |||
cd15be3b7c | |||
8ff1e72499 | |||
0ebb845329 | |||
ee7d9df4a9 | |||
61353cb38c | |||
b3d05c5c28 | |||
8ec7e21e84 | |||
0e040d2722 | |||
7d6768c83f |
16 changed files with 435 additions and 42 deletions
|
@ -463,3 +463,26 @@ class FrostfsAdmMorph(CliCommand):
|
||||||
"morph ape rm-rule-chain",
|
"morph ape rm-rule-chain",
|
||||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_nns_records(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
type: Optional[str] = None,
|
||||||
|
rpc_endpoint: Optional[str] = None,
|
||||||
|
alphabet_wallets: Optional[str] = None,
|
||||||
|
) -> CommandResult:
|
||||||
|
"""Returns domain record of the specified type
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Domain name
|
||||||
|
type: Domain name service record type(A|CNAME|SOA|TXT)
|
||||||
|
rpc_endpoint: N3 RPC node endpoint
|
||||||
|
alphabet_wallets: path to alphabet wallets dir
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Command's result
|
||||||
|
"""
|
||||||
|
return self._execute(
|
||||||
|
"morph nns get-records",
|
||||||
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
|
)
|
||||||
|
|
|
@ -276,6 +276,53 @@ class FrostfsCliObject(CliCommand):
|
||||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def patch(
|
||||||
|
self,
|
||||||
|
rpc_endpoint: str,
|
||||||
|
cid: str,
|
||||||
|
oid: str,
|
||||||
|
range: list[str] = None,
|
||||||
|
payload: list[str] = None,
|
||||||
|
new_attrs: Optional[str] = None,
|
||||||
|
replace_attrs: bool = False,
|
||||||
|
address: Optional[str] = None,
|
||||||
|
bearer: Optional[str] = None,
|
||||||
|
generate_key: Optional[bool] = None,
|
||||||
|
session: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
trace: bool = False,
|
||||||
|
ttl: Optional[int] = None,
|
||||||
|
wallet: Optional[str] = None,
|
||||||
|
xhdr: Optional[dict] = None,
|
||||||
|
) -> CommandResult:
|
||||||
|
"""
|
||||||
|
PATCH an object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>')
|
||||||
|
cid: Container ID
|
||||||
|
oid: Object ID
|
||||||
|
range: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
|
||||||
|
payload: An array of file paths to be applied in each range
|
||||||
|
new_attrs: Attributes to be changed in the format Key1=Value1,Key2=Value2
|
||||||
|
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
|
||||||
|
address: Address of wallet account
|
||||||
|
bearer: File with signed JSON or binary encoded bearer token
|
||||||
|
generate_key: Generate new private key
|
||||||
|
session: Filepath to a JSON- or binary-encoded token of the object RANGE session
|
||||||
|
timeout: Timeout for the operation
|
||||||
|
trace: Generate trace ID and print it
|
||||||
|
ttl: TTL value in request meta header (default 2)
|
||||||
|
wallet: WIF (NEP-2) string or path to the wallet or binary key
|
||||||
|
xhdr: Dict with request X-Headers
|
||||||
|
Returns:
|
||||||
|
(str): ID of patched Object
|
||||||
|
"""
|
||||||
|
return self._execute(
|
||||||
|
"object patch",
|
||||||
|
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||||
|
)
|
||||||
|
|
||||||
def range(
|
def range(
|
||||||
self,
|
self,
|
||||||
rpc_endpoint: str,
|
rpc_endpoint: str,
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
import io
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import logging.config
|
import logging.config
|
||||||
|
from typing import Mapping, Sequence
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
|
@ -40,58 +42,103 @@ class HttpClient:
|
||||||
client = httpx.Client(timeout=timeout, transport=transport)
|
client = httpx.Client(timeout=timeout, transport=transport)
|
||||||
response = client.request(method, url, **kwargs)
|
response = client.request(method, url, **kwargs)
|
||||||
|
|
||||||
self._attach_response(response)
|
self._attach_response(response, **kwargs)
|
||||||
logger.info(f"Response: {response.status_code} => {response.text}")
|
logger.info(f"Response: {response.status_code} => {response.text}")
|
||||||
|
|
||||||
if expected_status_code:
|
if expected_status_code:
|
||||||
assert response.status_code == expected_status_code, (
|
assert (
|
||||||
f"Got {response.status_code} response code" f" while {expected_status_code} expected"
|
response.status_code == expected_status_code
|
||||||
)
|
), f"Got {response.status_code} response code while {expected_status_code} expected"
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _attach_response(cls, response: httpx.Response):
|
def _parse_body(cls, readable: httpx.Request | httpx.Response) -> str | None:
|
||||||
request = response.request
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
request_headers = json.dumps(dict(request.headers), indent=4)
|
content = readable.read()
|
||||||
except json.JSONDecodeError:
|
|
||||||
request_headers = str(request.headers)
|
|
||||||
|
|
||||||
try:
|
|
||||||
request_body = request.read()
|
|
||||||
try:
|
|
||||||
request_body = request_body.decode("utf-8")
|
|
||||||
except UnicodeDecodeError as e:
|
|
||||||
request_body = f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}"
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
request_body = f"Error reading request body: {str(e)}"
|
logger.warning(f"Unable to read file: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
request_body = "" if request_body is None else request_body
|
if not content:
|
||||||
|
return None
|
||||||
|
|
||||||
|
request_body = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response_headers = json.dumps(dict(response.headers), indent=4)
|
request_body = json.loads(content)
|
||||||
except json.JSONDecodeError:
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||||
response_headers = str(response.headers)
|
logger.warning(f"Unable to convert body to json: {str(e)}")
|
||||||
|
|
||||||
|
if request_body is not None:
|
||||||
|
return json.dumps(request_body, default=str, indent=4)
|
||||||
|
|
||||||
|
try:
|
||||||
|
request_body = content.decode()
|
||||||
|
except UnicodeDecodeError as e:
|
||||||
|
logger.warning(f"Unable to decode binary data to text using UTF-8 encoding: {str(e)}")
|
||||||
|
|
||||||
|
request_body = content if request_body is None else request_body
|
||||||
|
request_body = "<large text data>" if len(request_body) > 1000 else request_body
|
||||||
|
|
||||||
|
return request_body
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _parse_files(cls, files: Mapping | Sequence | None) -> dict:
|
||||||
|
filepaths = {}
|
||||||
|
|
||||||
|
if not files:
|
||||||
|
return filepaths
|
||||||
|
|
||||||
|
if isinstance(files, Sequence):
|
||||||
|
items = files
|
||||||
|
elif isinstance(files, Mapping):
|
||||||
|
items = files.items()
|
||||||
|
else:
|
||||||
|
raise TypeError(f"'files' must be either Sequence or Mapping, got: {type(files).__name__}")
|
||||||
|
|
||||||
|
for name, file in items:
|
||||||
|
if isinstance(file, io.IOBase):
|
||||||
|
filepaths[name] = file.name
|
||||||
|
elif isinstance(file, Sequence):
|
||||||
|
filepaths[name] = file[1].name
|
||||||
|
|
||||||
|
return filepaths
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _attach_response(cls, response: httpx.Response, **kwargs):
|
||||||
|
request = response.request
|
||||||
|
request_headers = json.dumps(dict(request.headers), default=str, indent=4)
|
||||||
|
request_body = cls._parse_body(request)
|
||||||
|
|
||||||
|
files = kwargs.get("files")
|
||||||
|
request_files = cls._parse_files(files)
|
||||||
|
|
||||||
|
response_headers = json.dumps(dict(response.headers), default=str, indent=4)
|
||||||
|
response_body = cls._parse_body(response)
|
||||||
|
|
||||||
report = (
|
report = (
|
||||||
f"Method: {request.method}\n\n"
|
f"Method: {request.method}\n\n"
|
||||||
f"URL: {request.url}\n\n"
|
+ f"URL: {request.url}\n\n"
|
||||||
f"Request Headers: {request_headers}\n\n"
|
+ f"Request Headers: {request_headers}\n\n"
|
||||||
f"Request Body: {request_body}\n\n"
|
+ (f"Request Body: {request_body}\n\n" if request_body else "")
|
||||||
f"Response Status Code: {response.status_code}\n\n"
|
+ (f"Request Files: {request_files}\n\n" if request_files else "")
|
||||||
f"Response Headers: {response_headers}\n\n"
|
+ f"Response Status Code: {response.status_code}\n\n"
|
||||||
f"Response Body: {response.text}\n\n"
|
+ f"Response Headers: {response_headers}\n\n"
|
||||||
|
+ (f"Response Body: {response_body}\n\n" if response_body else "")
|
||||||
)
|
)
|
||||||
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body)
|
curl_request = cls._create_curl_request(request.url, request.method, request.headers, request_body, request_files)
|
||||||
|
|
||||||
reporter.attach(report, "Requests Info")
|
reporter.attach(report, "Requests Info")
|
||||||
reporter.attach(curl_request, "CURL")
|
reporter.attach(curl_request, "CURL")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str) -> str:
|
def _create_curl_request(cls, url: str, method: str, headers: httpx.Headers, data: str, files: dict) -> str:
|
||||||
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
headers = " ".join(f'-H "{name.title()}: {value}"' for name, value in headers.items())
|
||||||
data = f" -d '{data}'" if data else ""
|
data = f" -d '{data}'" if data else ""
|
||||||
|
|
||||||
|
for name, path in files.items():
|
||||||
|
data += f' -F "{name}=@{path}"'
|
||||||
|
|
||||||
# Option -k means no verify SSL
|
# Option -k means no verify SSL
|
||||||
return f"curl {url} -X {method} {headers}{data} -k"
|
return f"curl {url} -X {method} {headers}{data} -k"
|
||||||
|
|
|
@ -9,6 +9,7 @@ OBJECT_ALREADY_REMOVED = "code = 2052.*message = object already removed"
|
||||||
SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
|
SESSION_NOT_FOUND = "code = 4096.*message = session token not found"
|
||||||
OUT_OF_RANGE = "code = 2053.*message = out of range"
|
OUT_OF_RANGE = "code = 2053.*message = out of range"
|
||||||
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
|
EXPIRED_SESSION_TOKEN = "code = 4097.*message = expired session token"
|
||||||
|
ADD_CHAIN_ERROR = "code = 5120 message = apemanager access denied"
|
||||||
# TODO: Change to codes with message
|
# TODO: Change to codes with message
|
||||||
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
|
# OBJECT_IS_LOCKED = "code = 2050.*message = object is locked"
|
||||||
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
|
# LOCK_NON_REGULAR_OBJECT = "code = 2051.*message = ..." will be available once 2092 is fixed
|
||||||
|
|
|
@ -171,7 +171,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
return response.get("TagSet")
|
return response.get("TagSet")
|
||||||
|
|
||||||
@reporter.step("Get bucket acl")
|
@reporter.step("Get bucket acl")
|
||||||
def get_bucket_acl(self, bucket: str) -> list:
|
def get_bucket_acl(self, bucket: str) -> dict:
|
||||||
if bucket.startswith("-") or " " in bucket:
|
if bucket.startswith("-") or " " in bucket:
|
||||||
bucket = f'"{bucket}"'
|
bucket = f'"{bucket}"'
|
||||||
|
|
||||||
|
@ -179,8 +179,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
f"aws {self.common_flags} s3api get-bucket-acl --bucket {bucket} " f"--endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||||
)
|
)
|
||||||
output = self.local_shell.exec(cmd).stdout
|
output = self.local_shell.exec(cmd).stdout
|
||||||
response = self._to_json(output)
|
return self._to_json(output)
|
||||||
return response.get("Grants")
|
|
||||||
|
|
||||||
@reporter.step("Get bucket location")
|
@reporter.step("Get bucket location")
|
||||||
def get_bucket_location(self, bucket: str) -> dict:
|
def get_bucket_location(self, bucket: str) -> dict:
|
||||||
|
@ -861,7 +860,7 @@ class AwsCliClient(S3ClientWrapper):
|
||||||
return response["Parts"]
|
return response["Parts"]
|
||||||
|
|
||||||
@reporter.step("Complete multipart upload S3")
|
@reporter.step("Complete multipart upload S3")
|
||||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
||||||
if bucket.startswith("-") or " " in bucket:
|
if bucket.startswith("-") or " " in bucket:
|
||||||
bucket = f'"{bucket}"'
|
bucket = f'"{bucket}"'
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
service_name="iam",
|
service_name="iam",
|
||||||
aws_access_key_id=self.access_key_id,
|
aws_access_key_id=self.access_key_id,
|
||||||
aws_secret_access_key=self.secret_access_key,
|
aws_secret_access_key=self.secret_access_key,
|
||||||
|
region_name=self.region,
|
||||||
endpoint_url=self.iam_endpoint,
|
endpoint_url=self.iam_endpoint,
|
||||||
verify=False,
|
verify=False,
|
||||||
)
|
)
|
||||||
|
@ -229,14 +230,13 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
return response.get("TagSet")
|
return response.get("TagSet")
|
||||||
|
|
||||||
@reporter.step("Get bucket acl")
|
@reporter.step("Get bucket acl")
|
||||||
def get_bucket_acl(self, bucket: str) -> list:
|
def get_bucket_acl(self, bucket: str) -> dict:
|
||||||
response = self._exec_request(
|
return self._exec_request(
|
||||||
self.boto3_client.get_bucket_acl,
|
self.boto3_client.get_bucket_acl,
|
||||||
params={"Bucket": bucket},
|
params={"Bucket": bucket},
|
||||||
endpoint=self.s3gate_endpoint,
|
endpoint=self.s3gate_endpoint,
|
||||||
profile=self.profile,
|
profile=self.profile,
|
||||||
)
|
)
|
||||||
return response.get("Grants")
|
|
||||||
|
|
||||||
@reporter.step("Delete bucket tagging")
|
@reporter.step("Delete bucket tagging")
|
||||||
def delete_bucket_tagging(self, bucket: str) -> None:
|
def delete_bucket_tagging(self, bucket: str) -> None:
|
||||||
|
@ -704,7 +704,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
||||||
return response["Parts"]
|
return response["Parts"]
|
||||||
|
|
||||||
@reporter.step("Complete multipart upload S3")
|
@reporter.step("Complete multipart upload S3")
|
||||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
||||||
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
|
parts = [{"ETag": etag, "PartNumber": part_num} for part_num, etag in parts]
|
||||||
params = self._convert_to_s3_params(locals(), exclude=["parts"])
|
params = self._convert_to_s3_params(locals(), exclude=["parts"])
|
||||||
params["MultipartUpload"] = {"Parts": parts}
|
params["MultipartUpload"] = {"Parts": parts}
|
||||||
|
|
|
@ -128,7 +128,7 @@ class S3ClientWrapper(HumanReadableABC):
|
||||||
"""Deletes the tags from the bucket."""
|
"""Deletes the tags from the bucket."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_bucket_acl(self, bucket: str) -> list:
|
def get_bucket_acl(self, bucket: str) -> dict:
|
||||||
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
|
"""This implementation of the GET action uses the acl subresource to return the access control list (ACL) of a bucket."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
@ -336,7 +336,7 @@ class S3ClientWrapper(HumanReadableABC):
|
||||||
"""Lists the parts that have been uploaded for a specific multipart upload."""
|
"""Lists the parts that have been uploaded for a specific multipart upload."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> None:
|
def complete_multipart_upload(self, bucket: str, key: str, upload_id: str, parts: list) -> dict:
|
||||||
"""Completes a multipart upload by assembling previously uploaded parts."""
|
"""Completes a multipart upload by assembling previously uploaded parts."""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
|
127
src/frostfs_testlib/s3/s3_http_client.py
Normal file
127
src/frostfs_testlib/s3/s3_http_client.py
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from botocore.auth import SigV4Auth
|
||||||
|
from botocore.awsrequest import AWSRequest
|
||||||
|
from botocore.credentials import Credentials
|
||||||
|
|
||||||
|
from frostfs_testlib import reporter
|
||||||
|
from frostfs_testlib.http.http_client import HttpClient
|
||||||
|
from frostfs_testlib.utils.file_utils import TestFile
|
||||||
|
|
||||||
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
DEFAULT_TIMEOUT = 60.0
|
||||||
|
|
||||||
|
|
||||||
|
class S3HttpClient:
|
||||||
|
def __init__(
|
||||||
|
self, s3gate_endpoint: str, access_key_id: str, secret_access_key: str, profile: str = "default", region: str = "us-east-1"
|
||||||
|
) -> None:
|
||||||
|
self.http_client = HttpClient()
|
||||||
|
self.s3gate_endpoint = s3gate_endpoint
|
||||||
|
self.credentials = Credentials(access_key_id, secret_access_key)
|
||||||
|
self.profile = profile
|
||||||
|
self.region = region
|
||||||
|
self.service = "s3"
|
||||||
|
self.signature = SigV4Auth(self.credentials, self.service, self.region)
|
||||||
|
|
||||||
|
def _to_s3_header(self, header: str) -> dict:
|
||||||
|
replacement_map = {
|
||||||
|
"Acl": "ACL",
|
||||||
|
"_": "-",
|
||||||
|
}
|
||||||
|
|
||||||
|
result = header
|
||||||
|
if not header.startswith("x_amz"):
|
||||||
|
result = header.title()
|
||||||
|
|
||||||
|
for find, replace in replacement_map.items():
|
||||||
|
result = result.replace(find, replace)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _convert_to_s3_headers(self, scope: dict, exclude: list[str] = None):
|
||||||
|
exclude = ["self", "cls"] if not exclude else exclude + ["self", "cls"]
|
||||||
|
return {self._to_s3_header(header): value for header, value in scope.items() if header not in exclude and value is not None}
|
||||||
|
|
||||||
|
def _create_aws_request(
|
||||||
|
self, method: str, url: str, headers: dict, content: str | bytes | TestFile = None, params: dict = None
|
||||||
|
) -> AWSRequest:
|
||||||
|
data = b""
|
||||||
|
|
||||||
|
if content is not None:
|
||||||
|
if isinstance(content, TestFile):
|
||||||
|
with open(content, "rb") as io_content:
|
||||||
|
data = io_content.read()
|
||||||
|
elif isinstance(content, str):
|
||||||
|
data = bytes(content, encoding="utf-8")
|
||||||
|
elif isinstance(content, bytes):
|
||||||
|
data = content
|
||||||
|
else:
|
||||||
|
raise TypeError(f"Content expected as a string, bytes or TestFile object, got: {content}")
|
||||||
|
|
||||||
|
headers["X-Amz-Content-SHA256"] = hashlib.sha256(data).hexdigest()
|
||||||
|
aws_request = AWSRequest(method, url, headers, data, params)
|
||||||
|
self.signature.add_auth(aws_request)
|
||||||
|
|
||||||
|
return aws_request
|
||||||
|
|
||||||
|
def _exec_request(
|
||||||
|
self,
|
||||||
|
method: str,
|
||||||
|
url: str,
|
||||||
|
headers: dict,
|
||||||
|
content: str | bytes | TestFile = None,
|
||||||
|
params: dict = None,
|
||||||
|
timeout: float = DEFAULT_TIMEOUT,
|
||||||
|
) -> dict:
|
||||||
|
aws_request = self._create_aws_request(method, url, headers, content, params)
|
||||||
|
response = self.http_client.send(
|
||||||
|
aws_request.method,
|
||||||
|
aws_request.url,
|
||||||
|
headers=dict(aws_request.headers),
|
||||||
|
data=aws_request.data,
|
||||||
|
params=aws_request.params,
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response.raise_for_status()
|
||||||
|
except httpx.HTTPStatusError:
|
||||||
|
raise httpx.HTTPStatusError(response.text, request=response.request, response=response)
|
||||||
|
|
||||||
|
root = ET.fromstring(response.read())
|
||||||
|
data = {
|
||||||
|
"LastModified": root.find(".//LastModified").text,
|
||||||
|
"ETag": root.find(".//ETag").text,
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.headers.get("x-amz-version-id"):
|
||||||
|
data["VersionId"] = response.headers.get("x-amz-version-id")
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
@reporter.step("Patch object S3")
|
||||||
|
def patch_object(
|
||||||
|
self,
|
||||||
|
bucket: str,
|
||||||
|
key: str,
|
||||||
|
content: str | bytes | TestFile,
|
||||||
|
content_range: str,
|
||||||
|
version_id: str = None,
|
||||||
|
if_match: str = None,
|
||||||
|
if_unmodified_since: str = None,
|
||||||
|
x_amz_expected_bucket_owner: str = None,
|
||||||
|
timeout: float = DEFAULT_TIMEOUT,
|
||||||
|
) -> dict:
|
||||||
|
if content_range and not content_range.startswith("bytes"):
|
||||||
|
content_range = f"bytes {content_range}/*"
|
||||||
|
|
||||||
|
url = f"{self.s3gate_endpoint}/{bucket}/{key}"
|
||||||
|
headers = self._convert_to_s3_headers(locals(), exclude=["bucket", "key", "content", "version_id", "timeout"])
|
||||||
|
params = {"VersionId": version_id} if version_id is not None else None
|
||||||
|
|
||||||
|
return self._exec_request("PATCH", url, headers, content, params, timeout=timeout)
|
|
@ -111,6 +111,8 @@ def create_container(
|
||||||
options: Optional[dict] = None,
|
options: Optional[dict] = None,
|
||||||
await_mode: bool = True,
|
await_mode: bool = True,
|
||||||
wait_for_creation: bool = True,
|
wait_for_creation: bool = True,
|
||||||
|
nns_zone: str = None,
|
||||||
|
nns_name: str = None,
|
||||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
|
@ -143,6 +145,8 @@ def create_container(
|
||||||
result = cli.container.create(
|
result = cli.container.create(
|
||||||
rpc_endpoint=endpoint,
|
rpc_endpoint=endpoint,
|
||||||
policy=rule,
|
policy=rule,
|
||||||
|
nns_name=nns_name,
|
||||||
|
nns_zone=nns_zone,
|
||||||
basic_acl=basic_acl,
|
basic_acl=basic_acl,
|
||||||
attributes=attributes,
|
attributes=attributes,
|
||||||
name=name,
|
name=name,
|
||||||
|
|
|
@ -12,6 +12,7 @@ from frostfs_testlib.shell import Shell
|
||||||
from frostfs_testlib.steps.cli.container import search_nodes_with_container
|
from frostfs_testlib.steps.cli.container import search_nodes_with_container
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||||
|
from frostfs_testlib.utils.file_utils import TestFile, get_file_hash
|
||||||
|
|
||||||
logger = logging.getLogger("NeoLogger")
|
logger = logging.getLogger("NeoLogger")
|
||||||
|
|
||||||
|
@ -185,3 +186,26 @@ def search_nodes_with_bucket(
|
||||||
break
|
break
|
||||||
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
|
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
|
||||||
return nodes_list
|
return nodes_list
|
||||||
|
|
||||||
|
|
||||||
|
def get_bytes_relative_to_object(value: int | str, object_size: int = None, part_size: int = None) -> int:
|
||||||
|
if isinstance(value, int):
|
||||||
|
return value
|
||||||
|
|
||||||
|
if "part" not in value and "object" not in value:
|
||||||
|
return int(value)
|
||||||
|
|
||||||
|
if object_size is not None:
|
||||||
|
value = value.replace("object", str(object_size))
|
||||||
|
|
||||||
|
if part_size is not None:
|
||||||
|
value = value.replace("part", str(part_size))
|
||||||
|
|
||||||
|
return int(eval(value))
|
||||||
|
|
||||||
|
|
||||||
|
def get_range_relative_to_object(rng: str, object_size: int = None, part_size: int = None, int_values: bool = False) -> str | int:
|
||||||
|
start, end = rng.split(":")
|
||||||
|
start = get_bytes_relative_to_object(start, object_size, part_size)
|
||||||
|
end = get_bytes_relative_to_object(end, object_size, part_size)
|
||||||
|
return (start, end) if int_values else f"bytes {start}-{end}/*"
|
||||||
|
|
|
@ -91,10 +91,10 @@ class ClusterNode:
|
||||||
config_str = yaml.dump(new_config)
|
config_str = yaml.dump(new_config)
|
||||||
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
shell.exec(f"echo '{config_str}' | sudo tee {config_file_path}")
|
||||||
|
|
||||||
def config(self, service_type: ServiceClass) -> ServiceConfigurationYml:
|
def config(self, service_type: type[ServiceClass]) -> ServiceConfigurationYml:
|
||||||
return self.service(service_type).config
|
return self.service(service_type).config
|
||||||
|
|
||||||
def service(self, service_type: ServiceClass) -> ServiceClass:
|
def service(self, service_type: type[ServiceClass]) -> ServiceClass:
|
||||||
"""
|
"""
|
||||||
Get a service cluster node of specified type.
|
Get a service cluster node of specified type.
|
||||||
|
|
||||||
|
|
|
@ -23,4 +23,6 @@ class PlacementRule:
|
||||||
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
|
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||||
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
|
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||||
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
|
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
|
||||||
|
REP_1_FOR_2_NODES_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 2 FROM * AS X"
|
||||||
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
|
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
|
||||||
|
EC_1_1_FOR_2_NODES_PLACEMENT_RULE = "EC 1.1 IN X CBF 1 SELECT 2 FROM * AS X"
|
||||||
|
|
|
@ -13,6 +13,7 @@ FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
|
||||||
|
|
||||||
class ObjectOperations(HumanReadableEnum):
|
class ObjectOperations(HumanReadableEnum):
|
||||||
PUT = "object.put"
|
PUT = "object.put"
|
||||||
|
PATCH = "object.patch"
|
||||||
GET = "object.get"
|
GET = "object.get"
|
||||||
HEAD = "object.head"
|
HEAD = "object.head"
|
||||||
GET_RANGE = "object.range"
|
GET_RANGE = "object.range"
|
||||||
|
|
|
@ -206,6 +206,11 @@ class ObjectOperations(interfaces.ObjectInterface):
|
||||||
hash_type=hash_type,
|
hash_type=hash_type,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if range:
|
||||||
|
# Cut off the range and return only hash
|
||||||
|
return result.stdout.split(":")[1].strip()
|
||||||
|
|
||||||
return result.stdout
|
return result.stdout
|
||||||
|
|
||||||
@reporter.step("Head object")
|
@reporter.step("Head object")
|
||||||
|
@ -407,6 +412,57 @@ class ObjectOperations(interfaces.ObjectInterface):
|
||||||
oid = id_str.split(":")[1]
|
oid = id_str.split(":")[1]
|
||||||
return oid.strip()
|
return oid.strip()
|
||||||
|
|
||||||
|
@reporter.step("Patch object")
|
||||||
|
def patch(
|
||||||
|
self,
|
||||||
|
cid: str,
|
||||||
|
oid: str,
|
||||||
|
endpoint: str,
|
||||||
|
ranges: list[str] = None,
|
||||||
|
payloads: list[str] = None,
|
||||||
|
new_attrs: Optional[str] = None,
|
||||||
|
replace_attrs: bool = False,
|
||||||
|
bearer: str = "",
|
||||||
|
xhdr: Optional[dict] = None,
|
||||||
|
session: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||||
|
trace: bool = False,
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
PATCH an object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cid: ID of Container where we get the Object from
|
||||||
|
oid: Object ID
|
||||||
|
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
|
||||||
|
ranges: An array of ranges in which to replace data in the format [offset1:length1, offset2:length2]
|
||||||
|
payloads: An array of file paths to be applied in each range
|
||||||
|
new_attrs: Attributes to be changed in the format "key1=value1,key2=value2"
|
||||||
|
replace_attrs: Replace all attributes completely with new ones specified in new_attrs
|
||||||
|
bearer: Path to Bearer Token file, appends to `--bearer` key
|
||||||
|
xhdr: Request X-Headers in form of Key=Value
|
||||||
|
session: Path to a JSON-encoded container session token
|
||||||
|
timeout: Timeout for the operation
|
||||||
|
trace: Generate trace ID and print it
|
||||||
|
Returns:
|
||||||
|
(str): ID of patched Object
|
||||||
|
"""
|
||||||
|
result = self.cli.object.patch(
|
||||||
|
rpc_endpoint=endpoint,
|
||||||
|
cid=cid,
|
||||||
|
oid=oid,
|
||||||
|
range=ranges,
|
||||||
|
payload=payloads,
|
||||||
|
new_attrs=new_attrs,
|
||||||
|
replace_attrs=replace_attrs,
|
||||||
|
bearer=bearer,
|
||||||
|
xhdr=xhdr,
|
||||||
|
session=session,
|
||||||
|
timeout=timeout,
|
||||||
|
trace=trace,
|
||||||
|
)
|
||||||
|
return result.stdout.split(":")[1].strip()
|
||||||
|
|
||||||
@reporter.step("Put object to random node")
|
@reporter.step("Put object to random node")
|
||||||
def put_to_random_node(
|
def put_to_random_node(
|
||||||
self,
|
self,
|
||||||
|
@ -622,3 +678,30 @@ class ObjectOperations(interfaces.ObjectInterface):
|
||||||
]
|
]
|
||||||
|
|
||||||
return object_nodes
|
return object_nodes
|
||||||
|
|
||||||
|
@reporter.step("Search parts of object")
|
||||||
|
def parts(
|
||||||
|
self,
|
||||||
|
cid: str,
|
||||||
|
oid: str,
|
||||||
|
alive_node: ClusterNode,
|
||||||
|
bearer: str = "",
|
||||||
|
xhdr: Optional[dict] = None,
|
||||||
|
is_direct: bool = False,
|
||||||
|
verify_presence_all: bool = False,
|
||||||
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||||
|
) -> list[str]:
|
||||||
|
endpoint = alive_node.storage_node.get_rpc_endpoint()
|
||||||
|
response = self.cli.object.nodes(
|
||||||
|
rpc_endpoint=endpoint,
|
||||||
|
cid=cid,
|
||||||
|
oid=oid,
|
||||||
|
bearer=bearer,
|
||||||
|
ttl=1 if is_direct else None,
|
||||||
|
json=True,
|
||||||
|
xhdr=xhdr,
|
||||||
|
timeout=timeout,
|
||||||
|
verify_presence_all=verify_presence_all,
|
||||||
|
)
|
||||||
|
response_json = json.loads(response.stdout)
|
||||||
|
return [data_object["object_id"] for data_object in response_json["data_objects"]]
|
||||||
|
|
|
@ -198,6 +198,24 @@ class ObjectInterface(ABC):
|
||||||
) -> str:
|
) -> str:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def patch(
|
||||||
|
self,
|
||||||
|
cid: str,
|
||||||
|
oid: str,
|
||||||
|
endpoint: str,
|
||||||
|
ranges: Optional[list[str]] = None,
|
||||||
|
payloads: Optional[list[str]] = None,
|
||||||
|
new_attrs: Optional[str] = None,
|
||||||
|
replace_attrs: bool = False,
|
||||||
|
bearer: Optional[str] = None,
|
||||||
|
xhdr: Optional[dict] = None,
|
||||||
|
session: Optional[str] = None,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
trace: bool = False,
|
||||||
|
) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def put_to_random_node(
|
def put_to_random_node(
|
||||||
self,
|
self,
|
||||||
|
@ -264,6 +282,20 @@ class ObjectInterface(ABC):
|
||||||
) -> List[ClusterNode]:
|
) -> List[ClusterNode]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def parts(
|
||||||
|
self,
|
||||||
|
cid: str,
|
||||||
|
oid: str,
|
||||||
|
alive_node: ClusterNode,
|
||||||
|
bearer: str = "",
|
||||||
|
xhdr: Optional[dict] = None,
|
||||||
|
is_direct: bool = False,
|
||||||
|
verify_presence_all: bool = False,
|
||||||
|
timeout: Optional[str] = None,
|
||||||
|
) -> List[str]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ContainerInterface(ABC):
|
class ContainerInterface(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
|
|
@ -80,6 +80,9 @@ def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[d
|
||||||
if not params:
|
if not params:
|
||||||
params = {}
|
params = {}
|
||||||
|
|
||||||
|
if params.get("Body") and len(params.get("Body")) > 1000:
|
||||||
|
params["Body"] = "<large text data>"
|
||||||
|
|
||||||
output_params = params
|
output_params = params
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in a new issue