forked from TrueCloudLab/frostfs-testlib
Compare commits
19 commits
master
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
c40b637768 | |||
1880f96277 | |||
c1e5dd1007 | |||
f4d71b664d | |||
da1a4d0099 | |||
3e36defb90 | |||
6810765d46 | |||
2cffff3ffe | |||
d9f4e88f94 | |||
deb2f12bec | |||
f236c1b083 | |||
cc13a43bec | |||
a74d1bff4f | |||
547f6106ec | |||
c2aa41e5dc | |||
8e446ccb96 | |||
9c9fb7878a | |||
3a799afdcf | |||
b610e04a7b |
19 changed files with 134 additions and 236 deletions
|
@ -362,7 +362,6 @@ class FrostfsCliObject(CliCommand):
|
|||
trace: bool = False,
|
||||
root: bool = False,
|
||||
verify_presence_all: bool = False,
|
||||
json: bool = False,
|
||||
ttl: Optional[int] = None,
|
||||
xhdr: Optional[dict] = None,
|
||||
timeout: Optional[str] = None,
|
||||
|
|
|
@ -15,8 +15,6 @@ class NetmapParser:
|
|||
"epoch_duration": r"Epoch duration: (?P<epoch_duration>\d+)",
|
||||
"inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P<inner_ring_candidate_fee>\d+)",
|
||||
"maximum_object_size": r"Maximum object size: (?P<maximum_object_size>\d+)",
|
||||
"maximum_count_of_data_shards": r"Maximum count of data shards: (?P<maximum_count_of_data_shards>\d+)",
|
||||
"maximum_count_of_parity_shards": r"Maximum count of parity shards: (?P<maximum_count_of_parity_shards>\d+)",
|
||||
"withdrawal_fee": r"Withdrawal fee: (?P<withdrawal_fee>\d+)",
|
||||
"homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?P<homomorphic_hashing_disabled>true|false)",
|
||||
"maintenance_mode_allowed": r"Maintenance mode allowed: (?P<maintenance_mode_allowed>true|false)",
|
||||
|
|
|
@ -25,16 +25,6 @@ def convert_time_to_seconds(time: int | str | None) -> int:
|
|||
return seconds
|
||||
|
||||
|
||||
def force_list(input: str | list[str]):
|
||||
if input is None:
|
||||
return None
|
||||
|
||||
if isinstance(input, list):
|
||||
return list(map(str.strip, input))
|
||||
|
||||
return [input.strip()]
|
||||
|
||||
|
||||
class LoadType(Enum):
|
||||
gRPC = "grpc"
|
||||
S3 = "s3"
|
||||
|
@ -152,29 +142,8 @@ class K6ProcessAllocationStrategy(Enum):
|
|||
PER_ENDPOINT = "PER_ENDPOINT"
|
||||
|
||||
|
||||
class MetaConfig:
|
||||
def _get_field_formatter(self, field_name: str) -> Callable | None:
|
||||
data_fields = fields(self)
|
||||
formatters = [
|
||||
field.metadata["formatter"]
|
||||
for field in data_fields
|
||||
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
|
||||
]
|
||||
if formatters:
|
||||
return formatters[0]
|
||||
|
||||
return None
|
||||
|
||||
def __setattr__(self, field_name, value):
|
||||
formatter = self._get_field_formatter(field_name)
|
||||
if formatter:
|
||||
value = formatter(value)
|
||||
|
||||
super().__setattr__(field_name, value)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Preset(MetaConfig):
|
||||
class Preset:
|
||||
# ------ COMMON ------
|
||||
# Amount of objects which should be created
|
||||
objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False)
|
||||
|
@ -189,13 +158,13 @@ class Preset(MetaConfig):
|
|||
# Amount of containers which should be created
|
||||
containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False)
|
||||
# Container placement policy for containers for gRPC
|
||||
container_placement_policy: Optional[list[str]] = metadata_field(grpc_preset_scenarios, "policy", None, False, formatter=force_list)
|
||||
container_placement_policy: Optional[str] = metadata_field(grpc_preset_scenarios, "policy", None, False)
|
||||
|
||||
# ------ S3 ------
|
||||
# Amount of buckets which should be created
|
||||
buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False)
|
||||
# S3 region (AKA placement policy for S3 buckets)
|
||||
s3_location: Optional[list[str]] = metadata_field(s3_preset_scenarios, "location", None, False, formatter=force_list)
|
||||
s3_location: Optional[str] = metadata_field(s3_preset_scenarios, "location", None, False)
|
||||
|
||||
# Delay between containers creation and object upload for preset
|
||||
object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False)
|
||||
|
@ -208,7 +177,7 @@ class Preset(MetaConfig):
|
|||
|
||||
|
||||
@dataclass
|
||||
class PrometheusParams(MetaConfig):
|
||||
class PrometheusParams:
|
||||
# Prometheus server URL
|
||||
server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False)
|
||||
# Prometheus trend stats
|
||||
|
@ -218,7 +187,7 @@ class PrometheusParams(MetaConfig):
|
|||
|
||||
|
||||
@dataclass
|
||||
class LoadParams(MetaConfig):
|
||||
class LoadParams:
|
||||
# ------- CONTROL PARAMS -------
|
||||
# Load type can be gRPC, HTTP, S3.
|
||||
load_type: LoadType
|
||||
|
@ -443,11 +412,6 @@ class LoadParams(MetaConfig):
|
|||
# For preset calls, bool values are passed with just --<argument_name> if the value is True
|
||||
return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else ""
|
||||
|
||||
if isinstance(meta_field.value, list):
|
||||
return (
|
||||
" ".join(f"--{meta_field.metadata['preset_argument']} '{value}'" for value in meta_field.value) if meta_field.value else ""
|
||||
)
|
||||
|
||||
return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'"
|
||||
|
||||
@staticmethod
|
||||
|
@ -467,6 +431,25 @@ class LoadParams(MetaConfig):
|
|||
|
||||
return fields_with_data or []
|
||||
|
||||
def _get_field_formatter(self, field_name: str) -> Callable | None:
|
||||
data_fields = fields(self)
|
||||
formatters = [
|
||||
field.metadata["formatter"]
|
||||
for field in data_fields
|
||||
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
|
||||
]
|
||||
if formatters:
|
||||
return formatters[0]
|
||||
|
||||
return None
|
||||
|
||||
def __setattr__(self, field_name, value):
|
||||
formatter = self._get_field_formatter(field_name)
|
||||
if formatter:
|
||||
value = formatter(value)
|
||||
|
||||
super().__setattr__(field_name, value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
load_type_str = self.scenario.value if self.scenario else self.load_type.value
|
||||
# TODO: migrate load_params defaults to testlib
|
||||
|
|
|
@ -4,6 +4,6 @@ ALL_USERS_GROUP_READ_GRANT = {"Grantee": {"Type": "Group", "URI": ALL_USERS_GROU
|
|||
CANONICAL_USER_FULL_CONTROL_GRANT = {"Grantee": {"Type": "CanonicalUser"}, "Permission": "FULL_CONTROL"}
|
||||
|
||||
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl
|
||||
PRIVATE_GRANTS = []
|
||||
PUBLIC_READ_GRANTS = [ALL_USERS_GROUP_READ_GRANT]
|
||||
PUBLIC_READ_WRITE_GRANTS = [ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT]
|
||||
PRIVATE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT]
|
||||
PUBLIC_READ_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_READ_GRANT]
|
||||
PUBLIC_READ_WRITE_GRANTS = [CANONICAL_USER_FULL_CONTROL_GRANT, ALL_USERS_GROUP_WRITE_GRANT, ALL_USERS_GROUP_READ_GRANT]
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
from typing import Literal, Optional, Union
|
||||
|
@ -10,7 +11,6 @@ from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, R
|
|||
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||
from frostfs_testlib.shell import CommandOptions
|
||||
from frostfs_testlib.shell.local_shell import LocalShell
|
||||
from frostfs_testlib.utils import string_utils
|
||||
|
||||
# TODO: Refactor this code to use shell instead of _cmd_run
|
||||
from frostfs_testlib.utils.cli_utils import _configure_aws_cli
|
||||
|
@ -68,7 +68,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
location_constraint: Optional[str] = None,
|
||||
) -> str:
|
||||
if bucket is None:
|
||||
bucket = string_utils.unique_name("bucket-")
|
||||
bucket = str(uuid.uuid4())
|
||||
|
||||
if object_lock_enabled_for_bucket is None:
|
||||
object_lock = ""
|
||||
|
@ -91,6 +91,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
if location_constraint:
|
||||
cmd += f" --create-bucket-configuration LocationConstraint={location_constraint}"
|
||||
self.local_shell.exec(cmd)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
|
||||
return bucket
|
||||
|
||||
|
@ -105,6 +106,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
def delete_bucket(self, bucket: str) -> None:
|
||||
cmd = f"aws {self.common_flags} s3api delete-bucket --bucket {bucket} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
self.local_shell.exec(cmd, command_options)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
|
||||
@reporter.step("Head bucket S3")
|
||||
def head_bucket(self, bucket: str) -> None:
|
||||
|
@ -229,7 +231,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
if bucket is None:
|
||||
bucket = source_bucket
|
||||
if key is None:
|
||||
key = string_utils.unique_name("copy-object-")
|
||||
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
|
||||
copy_source = f"{source_bucket}/{source_key}"
|
||||
|
||||
cmd = (
|
||||
|
@ -315,7 +317,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
object_range: Optional[tuple[int, int]] = None,
|
||||
full_output: bool = False,
|
||||
) -> dict | TestFile:
|
||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
|
||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
|
||||
version = f" --version-id {version_id}" if version_id else ""
|
||||
cmd = (
|
||||
f"aws {self.common_flags} s3api get-object --bucket {bucket} --key {key} "
|
||||
|
@ -395,6 +397,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
)
|
||||
output = self.local_shell.exec(cmd, command_options).stdout
|
||||
response = self._to_json(output)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return response
|
||||
|
||||
@reporter.step("Delete object S3")
|
||||
|
@ -405,6 +408,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
)
|
||||
output = self.local_shell.exec(cmd, command_options).stdout
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return self._to_json(output)
|
||||
|
||||
@reporter.step("Delete object versions S3")
|
||||
|
@ -431,6 +435,7 @@ class AwsCliClient(S3ClientWrapper):
|
|||
f"--delete file://{file_path} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
|
||||
)
|
||||
output = self.local_shell.exec(cmd, command_options).stdout
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return self._to_json(output)
|
||||
|
||||
@reporter.step("Delete object versions S3 without delete markers")
|
||||
|
@ -1219,40 +1224,3 @@ class AwsCliClient(S3ClientWrapper):
|
|||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Adds one or more tags to an IAM user")
|
||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
||||
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
||||
cmd = (
|
||||
f"aws {self.common_flags} iam tag-user --user-name {user_name} --tags '{json.dumps(tags_json)}' --endpoint {self.iam_endpoint}"
|
||||
)
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("List tags of IAM user")
|
||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
||||
cmd = f"aws {self.common_flags} iam list-user-tags --user-name {user_name} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
||||
@reporter.step("Removes the specified tags from the user")
|
||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
tag_keys_joined = " ".join(tag_keys)
|
||||
cmd = f"aws {self.common_flags} iam untag-user --user-name {user_name} --tag-keys {tag_keys_joined} --endpoint {self.iam_endpoint}"
|
||||
if self.profile:
|
||||
cmd += f" --profile {self.profile}"
|
||||
|
||||
output = self.local_shell.exec(cmd).stdout
|
||||
response = self._to_json(output)
|
||||
|
||||
return response
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
from time import sleep
|
||||
|
@ -15,7 +16,6 @@ from mypy_boto3_s3 import S3Client
|
|||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, RETRY_MODE, S3_SYNC_WAIT_TIME
|
||||
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
|
||||
from frostfs_testlib.utils import string_utils
|
||||
|
||||
# TODO: Refactor this code to use shell instead of _cmd_run
|
||||
from frostfs_testlib.utils.cli_utils import _configure_aws_cli, log_command_execution
|
||||
|
@ -115,7 +115,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
location_constraint: Optional[str] = None,
|
||||
) -> str:
|
||||
if bucket is None:
|
||||
bucket = string_utils.unique_name("bucket-")
|
||||
bucket = str(uuid.uuid4())
|
||||
|
||||
params = {"Bucket": bucket}
|
||||
if object_lock_enabled_for_bucket is not None:
|
||||
|
@ -134,6 +134,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
|
||||
s3_bucket = self.boto3_client.create_bucket(**params)
|
||||
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return bucket
|
||||
|
||||
@reporter.step("List buckets S3")
|
||||
|
@ -154,6 +155,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
def delete_bucket(self, bucket: str) -> None:
|
||||
response = self.boto3_client.delete_bucket(Bucket=bucket)
|
||||
log_command_execution("S3 Delete bucket result", response)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
|
||||
@reporter.step("Head bucket S3")
|
||||
@report_error
|
||||
|
@ -362,6 +364,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
||||
response = self.boto3_client.delete_object(**params)
|
||||
log_command_execution("S3 Delete object result", response)
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return response
|
||||
|
||||
@reporter.step("Delete objects S3")
|
||||
|
@ -372,6 +375,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
assert (
|
||||
"Errors" not in response
|
||||
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
|
||||
sleep(S3_SYNC_WAIT_TIME)
|
||||
return response
|
||||
|
||||
@reporter.step("Delete object versions S3")
|
||||
|
@ -409,10 +413,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
grant_write: Optional[str] = None,
|
||||
grant_read: Optional[str] = None,
|
||||
) -> list:
|
||||
params = {self._to_s3_param(param): value for param, value in locals().items() if param not in ["self"] and value is not None}
|
||||
response = self.boto3_client.put_object_acl(**params)
|
||||
log_command_execution("S3 put object ACL", response)
|
||||
return response.get("Grants")
|
||||
# pytest.skip("Method put_object_acl is not supported by boto3 client")
|
||||
raise NotImplementedError("Unsupported for boto3 client")
|
||||
|
||||
@reporter.step("Get object ACL")
|
||||
@report_error
|
||||
|
@ -439,7 +441,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
if bucket is None:
|
||||
bucket = source_bucket
|
||||
if key is None:
|
||||
key = string_utils.unique_name("copy-object-")
|
||||
key = os.path.join(os.getcwd(), str(uuid.uuid4()))
|
||||
copy_source = f"{source_bucket}/{source_key}"
|
||||
|
||||
params = {
|
||||
|
@ -476,7 +478,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
if full_output:
|
||||
return response
|
||||
|
||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, string_utils.unique_name("dl-object-")))
|
||||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, str(uuid.uuid4())))
|
||||
with open(test_file, "wb") as file:
|
||||
chunk = response["Body"].read(1024)
|
||||
while chunk:
|
||||
|
@ -901,19 +903,3 @@ class Boto3ClientWrapper(S3ClientWrapper):
|
|||
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
|
||||
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath="/")
|
||||
return response
|
||||
|
||||
@reporter.step("Adds one or more tags to an IAM user")
|
||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
||||
tags_json = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
|
||||
response = self.boto3_iam_client.tag_user(UserName=user_name, Tags=tags_json)
|
||||
return response
|
||||
|
||||
@reporter.step("List tags of IAM user")
|
||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
||||
response = self.boto3_iam_client.list_user_tags(UserName=user_name)
|
||||
return response
|
||||
|
||||
@reporter.step("Removes the specified tags from the user")
|
||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
response = self.boto3_iam_client.untag_user(UserName=user_name, TagKeys=tag_keys)
|
||||
return response
|
||||
|
|
|
@ -550,15 +550,3 @@ class S3ClientWrapper(HumanReadableABC):
|
|||
@abstractmethod
|
||||
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
|
||||
"""Updates the name and/or the path of the specified IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_tag_user(self, user_name: str, tags: list) -> dict:
|
||||
"""Adds one or more tags to an IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_list_user_tags(self, user_name: str) -> dict:
|
||||
"""List tags of IAM user"""
|
||||
|
||||
@abstractmethod
|
||||
def iam_untag_user(self, user_name: str, tag_keys: list) -> dict:
|
||||
"""Removes the specified tags from the user"""
|
||||
|
|
|
@ -616,11 +616,6 @@ def head_object(
|
|||
fst_line_idx = result.stdout.find("\n")
|
||||
decoded = json.loads(result.stdout[fst_line_idx:])
|
||||
|
||||
# if response
|
||||
if "chunks" in decoded.keys():
|
||||
logger.info("decoding ec chunks")
|
||||
return decoded["chunks"]
|
||||
|
||||
# If response is Complex Object header, it has `splitId` key
|
||||
if "splitId" in decoded.keys():
|
||||
logger.info("decoding split header")
|
||||
|
@ -722,27 +717,21 @@ def get_object_nodes(
|
|||
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config)
|
||||
|
||||
response = cli.object.nodes(
|
||||
result_object_nodes = cli.object.nodes(
|
||||
rpc_endpoint=endpoint,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
bearer=bearer,
|
||||
ttl=1 if is_direct else None,
|
||||
json=True,
|
||||
xhdr=xhdr,
|
||||
timeout=timeout,
|
||||
verify_presence_all=verify_presence_all,
|
||||
)
|
||||
|
||||
response_json = json.loads(response.stdout)
|
||||
# Currently, the command will show expected and confirmed nodes.
|
||||
# And we (currently) count only nodes which are both expected and confirmed
|
||||
object_nodes_id = {
|
||||
required_node
|
||||
for data_object in response_json["data_objects"]
|
||||
for required_node in data_object["required_nodes"]
|
||||
if required_node in data_object["confirmed_nodes"]
|
||||
}
|
||||
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
|
||||
list_object_nodes = [
|
||||
node for node in parsing_output if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
|
||||
]
|
||||
|
||||
netmap_nodes_list = parse_netmap_output(
|
||||
cli.netmap.snapshot(
|
||||
|
@ -751,11 +740,14 @@ def get_object_nodes(
|
|||
).stdout
|
||||
)
|
||||
netmap_nodes = [
|
||||
netmap_node for object_node in object_nodes_id for netmap_node in netmap_nodes_list if object_node == netmap_node.node_id
|
||||
netmap_node
|
||||
for object_node in list_object_nodes
|
||||
for netmap_node in netmap_nodes_list
|
||||
if object_node["node_id"] == netmap_node.node_id
|
||||
]
|
||||
|
||||
object_nodes = [
|
||||
result = [
|
||||
cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip
|
||||
]
|
||||
|
||||
return object_nodes
|
||||
return result
|
||||
|
|
|
@ -49,7 +49,7 @@ def get_via_http_gate(
|
|||
else:
|
||||
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
||||
|
||||
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
||||
resp = requests.get(request, headers={"Host": node.storage_node.get_http_hostname()[0]}, stream=True, timeout=timeout, verify=False)
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(
|
||||
|
@ -115,6 +115,7 @@ def get_via_http_gate_by_attribute(
|
|||
cid: CID to get object from
|
||||
attribute: attribute {name: attribute} value pair
|
||||
endpoint: http gate endpoint
|
||||
http_hostname: http host name on the node
|
||||
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
|
||||
"""
|
||||
attr_name = list(attribute.keys())[0]
|
||||
|
@ -125,7 +126,7 @@ def get_via_http_gate_by_attribute(
|
|||
else:
|
||||
request = f"{node.http_gate.get_endpoint()}{request_path}"
|
||||
|
||||
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
|
||||
resp = requests.get(request, stream=True, timeout=timeout, verify=False, headers={"Host": node.storage_node.get_http_hostname()[0]})
|
||||
|
||||
if not resp.ok:
|
||||
raise Exception(
|
||||
|
@ -145,6 +146,7 @@ def get_via_http_gate_by_attribute(
|
|||
return test_file
|
||||
|
||||
|
||||
# TODO: pass http_hostname as a header
|
||||
@reporter.step("Upload via HTTP Gate")
|
||||
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300) -> str:
|
||||
"""
|
||||
|
@ -189,6 +191,7 @@ def is_object_large(filepath: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
# TODO: pass http_hostname as a header
|
||||
@reporter.step("Upload via HTTP Gate using Curl")
|
||||
def upload_via_http_gate_curl(
|
||||
cid: str,
|
||||
|
@ -249,7 +252,7 @@ def get_via_http_curl(cid: str, oid: str, node: ClusterNode) -> TestFile:
|
|||
test_file = TestFile(os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}_{str(uuid.uuid4())}"))
|
||||
|
||||
curl = GenericCli("curl", node.host)
|
||||
curl(f"-k ", f"{request} > {test_file}", shell=local_shell)
|
||||
curl(f'-k -H "Host: {node.storage_node.get_http_hostname()[0]}"', f"{request} > {test_file}", shell=local_shell)
|
||||
|
||||
return test_file
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: Versi
|
|||
if status == VersioningStatus.UNDEFINED:
|
||||
return
|
||||
|
||||
s3_client.get_bucket_versioning_status(bucket)
|
||||
s3_client.put_bucket_versioning(bucket, status=status)
|
||||
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
||||
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"
|
||||
|
|
|
@ -144,16 +144,30 @@ class ClusterNode:
|
|||
return self.host.config.interfaces[interface.value]
|
||||
|
||||
def get_data_interfaces(self) -> list[str]:
|
||||
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface]
|
||||
return [
|
||||
ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface
|
||||
]
|
||||
|
||||
def get_data_interface(self, search_interface: str) -> list[str]:
|
||||
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_interface == interface]
|
||||
return [
|
||||
self.host.config.interfaces[interface]
|
||||
for interface in self.host.config.interfaces.keys()
|
||||
if search_interface == interface
|
||||
]
|
||||
|
||||
def get_internal_interfaces(self) -> list[str]:
|
||||
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "internal" in name_interface]
|
||||
return [
|
||||
ip_address
|
||||
for name_interface, ip_address in self.host.config.interfaces.items()
|
||||
if "internal" in name_interface
|
||||
]
|
||||
|
||||
def get_internal_interface(self, search_internal: str) -> list[str]:
|
||||
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_internal == interface]
|
||||
return [
|
||||
self.host.config.interfaces[interface]
|
||||
for interface in self.host.config.interfaces.keys()
|
||||
if search_internal == interface
|
||||
]
|
||||
|
||||
|
||||
class Cluster:
|
||||
|
@ -164,6 +178,8 @@ class Cluster:
|
|||
default_rpc_endpoint: str
|
||||
default_s3_gate_endpoint: str
|
||||
default_http_gate_endpoint: str
|
||||
default_http_hostname: str
|
||||
default_s3_hostname: str
|
||||
|
||||
def __init__(self, hosting: Hosting) -> None:
|
||||
self._hosting = hosting
|
||||
|
@ -172,6 +188,8 @@ class Cluster:
|
|||
self.default_rpc_endpoint = self.services(StorageNode)[0].get_rpc_endpoint()
|
||||
self.default_s3_gate_endpoint = self.services(S3Gate)[0].get_endpoint()
|
||||
self.default_http_gate_endpoint = self.services(HTTPGate)[0].get_endpoint()
|
||||
self.default_http_hostname = self.services(StorageNode)[0].get_http_hostname()
|
||||
self.default_s3_hostname = self.services(StorageNode)[0].get_s3_hostname()
|
||||
|
||||
@property
|
||||
def hosts(self) -> list[Host]:
|
||||
|
|
|
@ -16,3 +16,5 @@ class ConfigAttributes:
|
|||
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
|
||||
CONTROL_ENDPOINT = "control_endpoint"
|
||||
UN_LOCODE = "un_locode"
|
||||
HTTP_HOSTNAME = "http_hostname"
|
||||
S3_HOSTNAME = "s3_hostname"
|
||||
|
|
|
@ -440,11 +440,9 @@ class ClusterStateController:
|
|||
self.await_node_status(status, wallet, cluster_node)
|
||||
|
||||
@wait_for_success(80, 8, title="Wait for node status become {status}")
|
||||
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode, checker_node: ClusterNode = None):
|
||||
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode):
|
||||
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
||||
if not checker_node:
|
||||
checker_node = cluster_node
|
||||
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(checker_node.storage_node.get_rpc_endpoint()).stdout)
|
||||
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(cluster_node.storage_node.get_rpc_endpoint()).stdout)
|
||||
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
|
||||
if status == NodeStatus.OFFLINE:
|
||||
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
|
||||
|
|
|
@ -154,6 +154,15 @@ class StorageNode(NodeBase):
|
|||
def get_data_directory(self) -> str:
|
||||
return self.host.get_data_directory(self.name)
|
||||
|
||||
def get_storage_config(self) -> str:
|
||||
return self.host.get_storage_config(self.name)
|
||||
|
||||
def get_http_hostname(self) -> list[str]:
|
||||
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
|
||||
|
||||
def get_s3_hostname(self) -> list[str]:
|
||||
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
|
||||
|
||||
def delete_blobovnicza(self):
|
||||
self.host.delete_blobovnicza(self.name)
|
||||
|
||||
|
|
|
@ -70,8 +70,6 @@ class NodeNetInfo:
|
|||
epoch_duration: str = None
|
||||
inner_ring_candidate_fee: str = None
|
||||
maximum_object_size: str = None
|
||||
maximum_count_of_data_shards: str = None
|
||||
maximum_count_of_parity_shards: str = None
|
||||
withdrawal_fee: str = None
|
||||
homomorphic_hashing_disabled: str = None
|
||||
maintenance_mode_allowed: str = None
|
||||
|
|
|
@ -2,8 +2,6 @@ import itertools
|
|||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from typing import Callable, Collection, Optional, Union
|
||||
|
||||
MAX_WORKERS = 50
|
||||
|
||||
|
||||
def parallel(
|
||||
fn: Union[Callable, list[Callable]],
|
||||
|
@ -56,7 +54,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
|||
|
||||
futures: list[Future] = []
|
||||
|
||||
with ThreadPoolExecutor(max_workers=min(len(fn_list), MAX_WORKERS)) as executor:
|
||||
with ThreadPoolExecutor(max_workers=len(fn_list)) as executor:
|
||||
for fn in fn_list:
|
||||
task_args = _get_args(*args)
|
||||
task_kwargs = _get_kwargs(**kwargs)
|
||||
|
@ -69,7 +67,7 @@ def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]:
|
|||
def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]:
|
||||
futures: list[Future] = []
|
||||
|
||||
with ThreadPoolExecutor(max_workers=min(len(parallel_items), MAX_WORKERS)) as executor:
|
||||
with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor:
|
||||
for item in parallel_items:
|
||||
task_args = _get_args(*args)
|
||||
task_kwargs = _get_kwargs(**kwargs)
|
||||
|
|
|
@ -6,7 +6,6 @@ from typing import Any, Optional
|
|||
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.resources.common import ASSETS_DIR
|
||||
from frostfs_testlib.utils import string_utils
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
||||
|
@ -42,9 +41,7 @@ def ensure_directory_opener(path, flags):
|
|||
return os.open(path, flags)
|
||||
|
||||
|
||||
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
|
||||
# Use object_size dt in future as argument
|
||||
@reporter.step("Generate file")
|
||||
@reporter.step("Generate file with size {size}")
|
||||
def generate_file(size: int) -> TestFile:
|
||||
"""Generates a binary file with the specified size in bytes.
|
||||
|
||||
|
@ -54,7 +51,7 @@ def generate_file(size: int) -> TestFile:
|
|||
Returns:
|
||||
The path to the generated file.
|
||||
"""
|
||||
test_file = TestFile(os.path.join(ASSETS_DIR, string_utils.unique_name("object-")))
|
||||
test_file = TestFile(os.path.join(ASSETS_DIR, str(uuid.uuid4())))
|
||||
with open(test_file, "wb", opener=ensure_directory_opener) as file:
|
||||
file.write(os.urandom(size))
|
||||
logger.info(f"File with size {size} bytes has been generated: {test_file}")
|
||||
|
@ -62,9 +59,7 @@ def generate_file(size: int) -> TestFile:
|
|||
return test_file
|
||||
|
||||
|
||||
# TODO: Do not add {size} to title yet, since it produces dynamic info in top level steps
|
||||
# Use object_size dt in future as argument
|
||||
@reporter.step("Generate file with content")
|
||||
@reporter.step("Generate file with content of size {size}")
|
||||
def generate_file_with_content(
|
||||
size: int,
|
||||
file_path: Optional[str | TestFile] = None,
|
||||
|
|
|
@ -1,26 +1,12 @@
|
|||
import random
|
||||
import re
|
||||
import string
|
||||
from datetime import datetime
|
||||
|
||||
ONLY_ASCII_LETTERS = string.ascii_letters
|
||||
DIGITS_AND_ASCII_LETTERS = string.ascii_letters + string.digits
|
||||
NON_DIGITS_AND_LETTERS = string.punctuation
|
||||
|
||||
|
||||
def unique_name(prefix: str = ""):
|
||||
"""
|
||||
Generate unique short name of anything with prefix.
|
||||
This should be unique in scope of multiple runs
|
||||
|
||||
Args:
|
||||
prefix: prefix for unique name generation
|
||||
Returns:
|
||||
unique name string
|
||||
"""
|
||||
return f"{prefix}{hex(int(datetime.now().timestamp() * 1000000))}"
|
||||
|
||||
|
||||
def random_string(length: int = 5, source: str = ONLY_ASCII_LETTERS):
|
||||
"""
|
||||
Generate random string from source letters list
|
||||
|
|
|
@ -3,7 +3,14 @@ from typing import Any, get_args
|
|||
|
||||
import pytest
|
||||
|
||||
from frostfs_testlib.load.load_config import EndpointSelectionStrategy, LoadParams, LoadScenario, LoadType, Preset, ReadFrom
|
||||
from frostfs_testlib.load.load_config import (
|
||||
EndpointSelectionStrategy,
|
||||
LoadParams,
|
||||
LoadScenario,
|
||||
LoadType,
|
||||
Preset,
|
||||
ReadFrom,
|
||||
)
|
||||
from frostfs_testlib.load.runners import DefaultRunner
|
||||
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
|
||||
from frostfs_testlib.storage.cluster import ClusterNode
|
||||
|
@ -92,7 +99,9 @@ class TestLoadConfig:
|
|||
def test_load_controller_string_representation(self, load_params: LoadParams):
|
||||
load_params.endpoint_selection_strategy = EndpointSelectionStrategy.ALL
|
||||
load_params.object_size = 512
|
||||
background_load_controller = BackgroundLoadController("tmp", load_params, None, None, DefaultRunner(None))
|
||||
background_load_controller = BackgroundLoadController(
|
||||
"tmp", load_params, "wallet", None, None, DefaultRunner(None)
|
||||
)
|
||||
expected = "grpc 512 KiB, writers=7, readers=7, deleters=8"
|
||||
assert f"{background_load_controller}" == expected
|
||||
assert repr(background_load_controller) == expected
|
||||
|
@ -132,7 +141,7 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--local",
|
||||
|
@ -164,7 +173,7 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--local",
|
||||
|
@ -205,7 +214,7 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--buckets '13'",
|
||||
"--location 's3_location' --location 's3_location_2'",
|
||||
"--location 's3_location'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
|
@ -239,7 +248,7 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--buckets '13'",
|
||||
"--location 's3_location' --location 's3_location_2'",
|
||||
"--location 's3_location'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
|
@ -279,7 +288,7 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--buckets '13'",
|
||||
"--location 's3_location' --location 's3_location_2'",
|
||||
"--location 's3_location'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
|
@ -320,7 +329,7 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
|
@ -353,13 +362,12 @@ class TestLoadConfig:
|
|||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy' --policy 'container_placement_policy_2'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"CONFIG_DIR": "config_dir",
|
||||
"CONFIG_FILE": "config_file",
|
||||
"DURATION": 9,
|
||||
"WRITE_OBJ_SIZE": 11,
|
||||
|
@ -372,49 +380,12 @@ class TestLoadConfig:
|
|||
"DELETERS": 8,
|
||||
"READ_AGE": 8,
|
||||
"STREAMING": 9,
|
||||
"MAX_TOTAL_SIZE_GB": 17,
|
||||
"PREGEN_JSON": "pregen_json",
|
||||
}
|
||||
|
||||
self._check_preset_params(load_params, expected_preset_args)
|
||||
self._check_env_vars(load_params, expected_env_vars)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"input, value, params",
|
||||
[
|
||||
(["A C ", " B"], ["A C", "B"], [f"--policy 'A C' --policy 'B'"]),
|
||||
(" A ", ["A"], ["--policy 'A'"]),
|
||||
(" A , B ", ["A , B"], ["--policy 'A , B'"]),
|
||||
([" A", "B "], ["A", "B"], ["--policy 'A' --policy 'B'"]),
|
||||
(None, None, []),
|
||||
],
|
||||
)
|
||||
def test_grpc_list_parsing_formatter(self, input, value, params):
|
||||
load_params = LoadParams(LoadType.gRPC)
|
||||
load_params.preset = Preset()
|
||||
load_params.preset.container_placement_policy = input
|
||||
assert load_params.preset.container_placement_policy == value
|
||||
|
||||
self._check_preset_params(load_params, params)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"input, value, params",
|
||||
[
|
||||
(["A C ", " B"], ["A C", "B"], [f"--location 'A C' --location 'B'"]),
|
||||
(" A ", ["A"], ["--location 'A'"]),
|
||||
(" A , B ", ["A , B"], ["--location 'A , B'"]),
|
||||
([" A", "B "], ["A", "B"], ["--location 'A' --location 'B'"]),
|
||||
(None, None, []),
|
||||
],
|
||||
)
|
||||
def test_s3_list_parsing_formatter(self, input, value, params):
|
||||
load_params = LoadParams(LoadType.S3)
|
||||
load_params.preset = Preset()
|
||||
load_params.preset.s3_location = input
|
||||
assert load_params.preset.s3_location == value
|
||||
|
||||
self._check_preset_params(load_params, params)
|
||||
|
||||
@pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.S3)], indirect=True)
|
||||
def test_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams):
|
||||
expected_env_vars = {
|
||||
|
@ -621,7 +592,6 @@ class TestLoadConfig:
|
|||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"CONFIG_DIR": "",
|
||||
"CONFIG_FILE": "",
|
||||
"DURATION": 0,
|
||||
"WRITE_OBJ_SIZE": 0,
|
||||
|
@ -629,7 +599,6 @@ class TestLoadConfig:
|
|||
"K6_OUT": "",
|
||||
"K6_MIN_ITERATION_DURATION": "",
|
||||
"K6_SETUP_TIMEOUT": "",
|
||||
"MAX_TOTAL_SIZE_GB": 0,
|
||||
"WRITERS": 0,
|
||||
"READERS": 0,
|
||||
"DELETERS": 0,
|
||||
|
@ -720,7 +689,9 @@ class TestLoadConfig:
|
|||
value = getattr(dataclass, field.name)
|
||||
assert value is not None, f"{field.name} is not None"
|
||||
|
||||
def _get_filled_load_params(self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False) -> LoadParams:
|
||||
def _get_filled_load_params(
|
||||
self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False
|
||||
) -> LoadParams:
|
||||
load_type_map = {
|
||||
LoadScenario.S3: LoadType.S3,
|
||||
LoadScenario.S3_CAR: LoadType.S3,
|
||||
|
@ -737,12 +708,13 @@ class TestLoadConfig:
|
|||
|
||||
meta_fields = self._get_meta_fields(load_params)
|
||||
for field in meta_fields:
|
||||
if getattr(field.instance, field.field.name) is None and load_params.scenario in field.field.metadata["applicable_scenarios"]:
|
||||
if (
|
||||
getattr(field.instance, field.field.name) is None
|
||||
and load_params.scenario in field.field.metadata["applicable_scenarios"]
|
||||
):
|
||||
value_to_set_map = {
|
||||
int: 0 if set_emtpy else len(field.field.name),
|
||||
float: 0 if set_emtpy else len(field.field.name),
|
||||
str: "" if set_emtpy else field.field.name,
|
||||
list[str]: "" if set_emtpy else [field.field.name, f"{field.field.name}_2"],
|
||||
bool: False if set_emtpy else True,
|
||||
}
|
||||
value_to_set = value_to_set_map[field.field_type]
|
||||
|
@ -755,7 +727,11 @@ class TestLoadConfig:
|
|||
|
||||
def _get_meta_fields(self, instance):
|
||||
data_fields = fields(instance)
|
||||
fields_with_data = [MetaTestField(field, self._get_actual_field_type(field), instance) for field in data_fields if field.metadata]
|
||||
fields_with_data = [
|
||||
MetaTestField(field, self._get_actual_field_type(field), instance)
|
||||
for field in data_fields
|
||||
if field.metadata
|
||||
]
|
||||
|
||||
for field in data_fields:
|
||||
actual_field_type = self._get_actual_field_type(field)
|
||||
|
|
Loading…
Reference in a new issue