Compare commits

...

16 commits

Author SHA1 Message Date
c0e37c8138 [#210] Return response in complete_multipart_upload function 2024-04-23 23:51:42 +03:00
80c65b454e [#203] Remove hostnames cludges
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-04-22 12:31:35 +00:00
541a3e0636 [#208] Add await for search func
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-04-17 11:03:47 +03:00
70f0357960 [#207] Fix shards for disabled write_cache
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-04-15 16:50:54 +03:00
a85070e957 [#206] Change epoch in func set status node, to 2
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-04-15 12:35:33 +03:00
82a8f9bab3 [#205] Propagate SETUP_TIMEOUT option
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-04-11 11:46:04 +03:00
65ec50391e Interfaces for IAM in S3 client 2024-04-11 07:51:40 +00:00
863e74f161 [#204] Fix custom_registry for verify scenario
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-04-09 12:10:02 +03:00
6629b9bbaa [#202] .forgejo: Replace old DCO action
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-04-04 12:23:00 +00:00
e2a170d66e [#190] Introduce default EC placement policy
The default policy which is similar to REP 2, but uses EC instead.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-04-04 11:21:15 +00:00
338584069d [#190] Add PlacementPolicy dataclass
Allow to parametrize tests with placement policy.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-04-04 11:21:15 +00:00
9cfaf1a618 [#201] Add more time for node return
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-04-03 01:02:21 +03:00
076e444f84 [#198] Check only data disks for local safe-stopper
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-03-22 12:19:53 +03:00
653621fb7e [#197] Allow config_dir for local scenario 2024-03-20 18:59:22 +03:00
2dc5aa8a1e [#195] Update netmap parser and status check message
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-03-19 12:48:04 +00:00
11487e983d [#196] Removed profile name from Boto3 client 2024-03-18 20:12:40 +03:00
22 changed files with 1147 additions and 151 deletions

View file

@ -0,0 +1,21 @@
name: DCO action
on: [pull_request]
jobs:
dco:
name: DCO
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
- name: Run commit format checker
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v3
with:
from: 'origin/${{ github.event.pull_request.base.ref }}'

1
.github/CODEOWNERS vendored
View file

@ -1 +0,0 @@
* @aprasolova @vdomnich-yadro @dansingjulia @yadro-vavdeev @abereziny

View file

@ -1,21 +0,0 @@
name: DCO check
on:
pull_request:
branches:
- master
jobs:
commits_check_job:
runs-on: ubuntu-latest
name: Commits Check
steps:
- name: Get PR Commits
id: 'get-pr-commits'
uses: tim-actions/get-pr-commits@master
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: DCO Check
uses: tim-actions/dco@master
with:
commits: ${{ steps.get-pr-commits.outputs.commits }}

View file

@ -1,7 +1,7 @@
import re
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo, NodeStatus
class NetmapParser:
@ -44,7 +44,7 @@ class NetmapParser:
regexes = {
"node_id": r"\d+: (?P<node_id>\w+)",
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
"node_status": r"(?P<node_status>ONLINE|OFFLINE)",
"node_status": r"(?P<node_status>ONLINE|MAINTENANCE|OFFLINE)",
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
"continent": r"Continent: (?P<continent>\w+)",
"country": r"Country: (?P<country>\w+)",
@ -62,14 +62,17 @@ class NetmapParser:
for node in netmap_nodes:
for key, regex in regexes.items():
search_result = re.search(regex, node, flags=re.MULTILINE)
if search_result == None:
result_netmap[key] = None
continue
if key == "node_data_ips":
result_netmap[key] = search_result[key].strip().split(" ")
continue
if key == "external_address":
result_netmap[key] = search_result[key].strip().split(",")
continue
if search_result == None:
result_netmap[key] = None
if key == "node_status":
result_netmap[key] = NodeStatus(search_result[key].strip().lower())
continue
result_netmap[key] = search_result[key].strip()

View file

@ -4,6 +4,7 @@ import math
import os
from dataclasses import dataclass
from datetime import datetime
from threading import Event
from time import sleep
from typing import Any
from urllib.parse import urlparse
@ -69,7 +70,7 @@ class K6:
self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, remote_user, process_id)
def _get_fill_percents(self):
fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs").stdout.split("\n")
fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs | grep data").stdout.split("\n")
return [line.split() for line in fill_percents][:-1]
def check_fill_percent(self):
@ -149,7 +150,7 @@ class K6:
with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"):
self._k6_process.start()
def wait_until_finished(self, event, soft_timeout: int = 0) -> None:
def wait_until_finished(self, event: Event, soft_timeout: int = 0) -> None:
with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"):
if self.load_params.scenario == LoadScenario.VERIFY:
timeout = self.load_params.verify_time or 0

View file

@ -177,13 +177,9 @@ class Preset:
@dataclass
class PrometheusParams:
# Prometheus server URL
server_url: Optional[str] = metadata_field(
all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False
)
server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False)
# Prometheus trend stats
trend_stats: Optional[str] = metadata_field(
all_load_scenarios, env_variable="K6_PROMETHEUS_RW_TREND_STATS", string_repr=False
)
trend_stats: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_TREND_STATS", string_repr=False)
# Additional tags
metrics_tags: Optional[str] = metadata_field(all_load_scenarios, None, "METRIC_TAGS", False)
@ -246,9 +242,7 @@ class LoadParams:
# ------- COMMON SCENARIO PARAMS -------
# Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value.
load_time: Optional[int] = metadata_field(
all_load_scenarios, None, "DURATION", False, formatter=convert_time_to_seconds
)
load_time: Optional[int] = metadata_field(all_load_scenarios, None, "DURATION", False, formatter=convert_time_to_seconds)
# Object size in KB for load and preset.
object_size: Optional[int] = metadata_field(all_load_scenarios, "size", "WRITE_OBJ_SIZE", False)
# For read operations, controls from which set get objects to read
@ -266,9 +260,7 @@ class LoadParams:
# sleep for the remainder of the time until the specified minimum duration is reached.
min_iteration_duration: Optional[str] = metadata_field(all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False)
# Prepare/cut objects locally on client before sending
prepare_locally: Optional[bool] = metadata_field(
[LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False
)
prepare_locally: Optional[bool] = metadata_field([LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False)
# Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)
@ -298,35 +290,25 @@ class LoadParams:
delete_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "DELETE_RATE", True, True)
# Amount of preAllocatedVUs for write operations.
preallocated_writers: Optional[int] = metadata_field(
constant_arrival_rate_scenarios, None, "PRE_ALLOC_WRITERS", True, True
)
preallocated_writers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_WRITERS", True, True)
# Amount of maxVUs for write operations.
max_writers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_WRITERS", False, True)
# Amount of preAllocatedVUs for read operations.
preallocated_readers: Optional[int] = metadata_field(
constant_arrival_rate_scenarios, None, "PRE_ALLOC_READERS", True, True
)
preallocated_readers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_READERS", True, True)
# Amount of maxVUs for read operations.
max_readers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_READERS", False, True)
# Amount of preAllocatedVUs for read operations.
preallocated_deleters: Optional[int] = metadata_field(
constant_arrival_rate_scenarios, None, "PRE_ALLOC_DELETERS", True, True
)
preallocated_deleters: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_DELETERS", True, True)
# Amount of maxVUs for delete operations.
max_deleters: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True)
# Multipart
# Number of parts to upload in parallel
writers_multipart: Optional[int] = metadata_field(
[LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True
)
writers_multipart: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True)
# part size must be greater than (5 MB)
write_object_part_size: Optional[int] = metadata_field(
[LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False
)
write_object_part_size: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False)
# Period of time to apply the rate value.
time_unit: Optional[str] = metadata_field(constant_arrival_rate_scenarios, None, "TIME_UNIT", False)
@ -341,7 +323,7 @@ class LoadParams:
# Config file location (filled automatically)
config_file: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_FILE", False)
# Config directory location (filled automatically)
config_dir: Optional[str] = metadata_field([LoadScenario.S3_LOCAL], None, "CONFIG_DIR", False)
config_dir: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_DIR", False)
def set_id(self, load_id):
self.load_id = load_id
@ -474,9 +456,7 @@ class LoadParams:
static_params = [f"{load_type_str}"]
dynamic_params = [
f"{meta_field.name}={meta_field.value}"
for meta_field in self._get_applicable_fields()
if meta_field.metadata["string_repr"]
f"{meta_field.name}={meta_field.value}" for meta_field in self._get_applicable_fields() if meta_field.metadata["string_repr"]
]
params = ", ".join(static_params + dynamic_params)

View file

@ -29,13 +29,17 @@ class AwsCliClient(S3ClientWrapper):
@reporter.step("Configure S3 client (aws cli)")
def __init__(
self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default"
self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.s3gate_endpoint = s3gate_endpoint
self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key
self.profile = profile
self.local_shell = LocalShell()
self.region = region
self.iam_endpoint = None
try:
_configure_aws_cli(f"aws configure --profile {profile}", access_key_id, secret_access_key)
_configure_aws_cli(f"aws configure --profile {profile}", access_key_id, secret_access_key, region)
self.local_shell.exec(f"aws configure set max_attempts {MAX_REQUEST_ATTEMPTS} --profile {profile}")
self.local_shell.exec(
f"aws configure set retry_mode {RETRY_MODE} --profile {profile}",
@ -43,10 +47,14 @@ class AwsCliClient(S3ClientWrapper):
except Exception as err:
raise RuntimeError("Error while configuring AwsCliClient") from err
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
@reporter.step("Set S3 endpoint to {s3gate_endpoint}")
def set_endpoint(self, s3gate_endpoint: str):
self.s3gate_endpoint = s3gate_endpoint
@reporter.step("Set IAM endpoint to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str):
self.iam_endpoint = iam_endpoint
@reporter.step("Create bucket S3")
def create_bucket(
self,
@ -565,12 +573,13 @@ class AwsCliClient(S3ClientWrapper):
self.local_shell.exec(cmd)
@reporter.step("Put object tagging")
def put_object_tagging(self, bucket: str, key: str, tags: list) -> None:
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = '') -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api put-object-tagging --bucket {bucket} --key {key} "
f"--tagging '{json.dumps(tagging)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
f"{version} --tagging '{json.dumps(tagging)}' --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@ -586,10 +595,11 @@ class AwsCliClient(S3ClientWrapper):
return response.get("TagSet")
@reporter.step("Delete object tagging")
def delete_object_tagging(self, bucket: str, key: str) -> None:
def delete_object_tagging(self, bucket: str, key: str, version_id: Optional[str] = None) -> None:
version = f" --version-id {version_id}" if version_id else ""
cmd = (
f"aws {self.common_flags} s3api delete-object-tagging --bucket {bucket} "
f"--key {key} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
f"--key {key} {version} --endpoint {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
@ -719,7 +729,10 @@ class AwsCliClient(S3ClientWrapper):
f"--key {key} --upload-id {upload_id} --multipart-upload file://{file_path} "
f"--endpoint-url {self.s3gate_endpoint} --profile {self.profile}"
)
self.local_shell.exec(cmd)
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Put object lock configuration")
def put_object_lock_configuration(self, bucket: str, configuration: dict) -> dict:
@ -750,3 +763,563 @@ class AwsCliClient(S3ClientWrapper):
json_output = json.loads(output[output.index("{") :])
return json_output
# IAM METHODS #
# Some methods don't have checks because AWS is silent in some cases (delete, attach, etc.)
@reporter.step("Adds the specified user to the specified group")
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam add-user-to-group --user-name {user_name} --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Attaches the specified managed policy to the specified IAM group")
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam attach-group-policy --group-name {group_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Attaches the specified managed policy to the specified user")
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam attach-user-policy --user-name {user_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
def iam_create_access_key(self, user_name: Optional[str] = None) -> dict:
cmd = (
f"aws {self.common_flags} iam create-access-key --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
if user_name:
cmd += f" --user-name {user_name}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
access_key_id = response["AccessKey"].get("AccessKeyId")
secret_access_key = response["AccessKey"].get("SecretAccessKey")
assert access_key_id, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
return access_key_id, secret_access_key
@reporter.step("Creates a new group")
def iam_create_group(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam create-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Group"), f"Expected Group in response:\n{response}"
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Creates a new managed policy for your AWS account")
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
cmd = (
f"aws {self.common_flags} iam create-policy --endpoint {self.iam_endpoint}"
f" --policy-name {policy_name} --policy-document '{json.dumps(policy_document)}'"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Creates a new IAM user for your AWS account")
def iam_create_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam create-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Deletes the access key pair associated with the specified IAM user")
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-access-key --access-key-id {access_key_id} --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified IAM group")
def iam_delete_group(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-group-policy --group-name {group_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified managed policy")
def iam_delete_policy(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified IAM user")
def iam_delete_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam delete-user-policy --user-name {user_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Removes the specified managed policy from the specified IAM group")
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam detach-group-policy --group-name {group_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified managed policy from the specified user")
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam detach-user-policy --user-name {user_name} --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Returns a list of IAM users that are in the specified IAM group")
def iam_get_group(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert "Users" in response.keys(), f"Expected Users in response:\n{response}"
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-group-policy --group-name {group_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Retrieves information about the specified managed policy")
def iam_get_policy(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Retrieves information about the specified version of the specified managed policy")
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-policy-version --policy-arn {policy_arn} --version-id {version_id} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
return response
@reporter.step("Retrieves information about the specified IAM user")
def iam_get_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam get-user-policy --user-name {user_name} --policy-name {policy_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("UserName"), f"Expected User in response:\n{response}"
return response
@reporter.step("Returns information about the access key IDs associated with the specified IAM user")
def iam_list_access_keys(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-access-keys --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM group")
def iam_list_attached_group_policies(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-attached-group-policies --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM user")
def iam_list_attached_user_policies(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-attached-user-policies --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-entities-for-policy --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
def iam_list_group_policies(self, group_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-group-policies --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM groups")
def iam_list_groups(self) -> dict:
cmd = (
f"aws {self.common_flags} iam list-groups --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists the IAM groups that the specified IAM user belongs to")
def iam_list_groups_for_user(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-groups-for-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists all the managed policies that are available in your AWS account")
def iam_list_policies(self) -> dict:
cmd = (
f"aws {self.common_flags} iam list-policies --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert 'Policies' in response.keys(), f"Expected Policies in response:\n{response}"
return response
@reporter.step("Lists information about the versions of the specified managed policy")
def iam_list_policy_versions(self, policy_arn: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-policy-versions --policy-arn {policy_arn} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("Versions"), f"Expected Versions in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
def iam_list_user_policies(self, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam list-user-policies --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM users")
def iam_list_users(self) -> dict:
cmd = (
f"aws {self.common_flags} iam list-users --endpoint {self.iam_endpoint}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
assert "Users" in response.keys(), f"Expected Users in response:\n{response}"
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
cmd = (
f"aws {self.common_flags} iam put-group-policy --endpoint {self.iam_endpoint}"
f" --group-name {group_name} --policy-name {policy_name} --policy-document \'{json.dumps(policy_document)}\'"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
cmd = (
f"aws {self.common_flags} iam put-user-policy --endpoint {self.iam_endpoint}"
f" --user-name {user_name} --policy-name {policy_name} --policy-document \'{json.dumps(policy_document)}\'"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified user from the specified group")
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
cmd = (
f"aws {self.common_flags} iam remove-user-from-group --endpoint {self.iam_endpoint}"
f" --group-name {group_name} --user-name {user_name}"
)
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Updates the name and/or the path of the specified IAM group")
def iam_update_group(self, group_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
cmd = (
f"aws {self.common_flags} iam update-group --group-name {group_name} --endpoint {self.iam_endpoint}"
)
if new_name:
cmd += f" --new-group-name {new_name}"
if new_path:
cmd += f" --new-path {new_path}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response
@reporter.step("Updates the name and/or the path of the specified IAM user")
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
cmd = (
f"aws {self.common_flags} iam update-user --user-name {user_name} --endpoint {self.iam_endpoint}"
)
if new_name:
cmd += f" --new-user-name {new_name}"
if new_path:
cmd += f" --new-path {new_path}"
if self.profile:
cmd += f" --profile {self.profile}"
output = self.local_shell.exec(cmd).stdout
response = self._to_json(output)
return response

View file

@ -18,6 +18,9 @@ from frostfs_testlib.resources.common import ASSETS_DIR, MAX_REQUEST_ATTEMPTS, R
from frostfs_testlib.s3.interfaces import S3ClientWrapper, VersioningStatus, _make_objs_dict
from frostfs_testlib.utils.cli_utils import log_command_execution
# TODO: Refactor this code to use shell instead of _cmd_run
from frostfs_testlib.utils.cli_utils import _configure_aws_cli
logger = logging.getLogger("NeoLogger")
# Disable warnings on self-signed certificate which the
@ -43,10 +46,11 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("Configure S3 client (boto3)")
@report_error
def __init__(
self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default"
self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str = "default", region: str = "us-east-1"
) -> None:
self.boto3_client: S3Client = None
self.session = boto3.Session(profile_name=profile)
self.session = boto3.Session()
self.region = region
self.config = Config(
retries={
"max_attempts": MAX_REQUEST_ATTEMPTS,
@ -56,6 +60,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
self.access_key_id: str = access_key_id
self.secret_access_key: str = secret_access_key
self.s3gate_endpoint: str = ""
self.boto3_iam_client: S3Client = None
self.set_endpoint(s3gate_endpoint)
@reporter.step("Set endpoint S3 to {s3gate_endpoint}")
@ -69,11 +74,23 @@ class Boto3ClientWrapper(S3ClientWrapper):
service_name="s3",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
region_name=self.region,
config=self.config,
endpoint_url=s3gate_endpoint,
verify=False,
)
@reporter.step("Set endpoint IAM to {iam_endpoint}")
def set_iam_endpoint(self, iam_endpoint: str):
self.boto3_iam_client = self.session.client(
service_name="iam",
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
endpoint_url=iam_endpoint,
verify=False,)
def _to_s3_param(self, param: str):
replacement_map = {
"Acl": "ACL",
@ -118,7 +135,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
s3_bucket = self.boto3_client.create_bucket(**params)
log_command_execution(f"Created S3 bucket {bucket}", s3_bucket)
sleep(S3_SYNC_WAIT_TIME)
sleep(S3_SYNC_WAIT_TIME * 10)
return bucket
@reporter.step("List buckets S3")
@ -139,7 +156,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
def delete_bucket(self, bucket: str) -> None:
response = self.boto3_client.delete_bucket(Bucket=bucket)
log_command_execution("S3 Delete bucket result", response)
sleep(S3_SYNC_WAIT_TIME)
sleep(S3_SYNC_WAIT_TIME * 10)
@reporter.step("Head bucket S3")
@report_error
@ -355,7 +372,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
}
response = self.boto3_client.delete_object(**params)
log_command_execution("S3 Delete object result", response)
sleep(S3_SYNC_WAIT_TIME)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Delete objects S3")
@ -366,7 +383,7 @@ class Boto3ClientWrapper(S3ClientWrapper):
assert (
"Errors" not in response
), f'The following objects have not been deleted: {[err_info["Key"] for err_info in response["Errors"]]}.\nError Message: {response["Errors"]["Message"]}'
sleep(S3_SYNC_WAIT_TIME)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Delete object versions S3")
@ -554,6 +571,8 @@ class Boto3ClientWrapper(S3ClientWrapper):
)
log_command_execution("S3 Complete multipart upload", response)
return response
@reporter.step("Put object retention")
@report_error
def put_object_retention(
@ -592,10 +611,10 @@ class Boto3ClientWrapper(S3ClientWrapper):
@reporter.step("Put object tagging")
@report_error
def put_object_tagging(self, bucket: str, key: str, tags: list) -> None:
def put_object_tagging(self, bucket: str, key: str, tags: list, version_id: Optional[str] = '') -> None:
tags = [{"Key": tag_key, "Value": tag_value} for tag_key, tag_value in tags]
tagging = {"TagSet": tags}
response = self.boto3_client.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging)
response = self.boto3_client.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging, VersionId=version_id)
log_command_execution("S3 Put object tagging", response)
@reporter.step("Get object tagging")
@ -654,3 +673,287 @@ class Boto3ClientWrapper(S3ClientWrapper):
raise NotImplementedError("Cp is not supported for boto3 client")
# END OBJECT METHODS #
# IAM METHODS #
# Some methods don't have checks because boto3 is silent in some cases (delete, attach, etc.)
@reporter.step("Adds the specified user to the specified group")
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
response = self.boto3_iam_client.add_user_to_group(UserName=user_name, GroupName=group_name)
return response
@reporter.step("Attaches the specified managed policy to the specified IAM group")
def iam_attach_group_policy(self, group_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Attaches the specified managed policy to the specified user")
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Creates a new AWS secret access key and access key ID for the specified user")
def iam_create_access_key(self, user_name: str) -> dict:
response = self.boto3_iam_client.create_access_key(UserName=user_name)
access_key_id = response["AccessKey"].get("AccessKeyId")
secret_access_key = response["AccessKey"].get("SecretAccessKey")
assert access_key_id, f"Expected AccessKeyId in response:\n{response}"
assert secret_access_key, f"Expected SecretAccessKey in response:\n{response}"
return access_key_id, secret_access_key
@reporter.step("Creates a new group")
def iam_create_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.create_group(GroupName=group_name)
assert response.get("Group"), f"Expected Group in response:\n{response}"
assert response["Group"].get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Creates a new managed policy for your AWS account")
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.create_policy(PolicyName=policy_name, PolicyDocument=json.dumps(policy_document))
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Creates a new IAM user for your AWS account")
def iam_create_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.create_user(UserName=user_name)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Deletes the access key pair associated with the specified IAM user")
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
response = self.boto3_iam_client.delete_access_key(AccessKeyId=access_key_id, UserName=user_name)
return response
@reporter.step("Deletes the specified IAM group")
def iam_delete_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.delete_group(GroupName=group_name)
return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM group")
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.delete_group_policy(GroupName=group_name, PolicyName=policy_name)
return response
@reporter.step("Deletes the specified managed policy")
def iam_delete_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.delete_policy(PolicyArn=policy_arn)
return response
@reporter.step("Deletes the specified IAM user")
def iam_delete_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.delete_user(UserName=user_name)
return response
@reporter.step("Deletes the specified inline policy that is embedded in the specified IAM user")
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.delete_user_policy(UserName=user_name, PolicyName=policy_name)
return response
@reporter.step("Removes the specified managed policy from the specified IAM group")
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.detach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified managed policy from the specified user")
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
response = self.boto3_iam_client.detach_user_policy(UserName=user_name, PolicyArn=policy_arn)
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Returns a list of IAM users that are in the specified IAM group")
def iam_get_group(self, group_name: str) -> dict:
response = self.boto3_iam_client.get_group(GroupName=group_name)
assert response.get("Group").get("GroupName") == group_name, f"GroupName should be equal to {group_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM group")
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.get_group_policy(GroupName=group_name, PolicyName=policy_name)
return response
@reporter.step("Retrieves information about the specified managed policy")
def iam_get_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.get_policy(PolicyArn=policy_arn)
assert response.get("Policy"), f"Expected Policy in response:\n{response}"
assert response["Policy"].get("PolicyName") == policy_name, f"PolicyName should be equal to {policy_name}"
return response
@reporter.step("Retrieves information about the specified version of the specified managed policy")
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
response = self.boto3_iam_client.get_policy_version(PolicyArn=policy_arn, VersionId=version_id)
assert response.get("PolicyVersion"), f"Expected PolicyVersion in response:\n{response}"
assert response["PolicyVersion"].get("VersionId") == version_id, f"VersionId should be equal to {version_id}"
return response
@reporter.step("Retrieves information about the specified IAM user")
def iam_get_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.get_user(UserName=user_name)
assert response.get("User"), f"Expected User in response:\n{response}"
assert response["User"].get("UserName") == user_name, f"UserName should be equal to {user_name}"
return response
@reporter.step("Retrieves the specified inline policy document that is embedded in the specified IAM user")
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
response = self.boto3_iam_client.get_user_policy(UserName=user_name, PolicyName=policy_name)
assert response.get("UserName"), f"Expected UserName in response:\n{response}"
return response
@reporter.step("Returns information about the access key IDs associated with the specified IAM user")
def iam_list_access_keys(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_access_keys(UserName=user_name)
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM group")
def iam_list_attached_group_policies(self, group_name: str) -> dict:
response = self.boto3_iam_client.list_attached_group_policies(GroupName=group_name)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all managed policies that are attached to the specified IAM user")
def iam_list_attached_user_policies(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_attached_user_policies(UserName=user_name)
assert response.get("AttachedPolicies"), f"Expected AttachedPolicies in response:\n{response}"
return response
@reporter.step("Lists all IAM users, groups, and roles that the specified managed policy is attached to")
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.list_entities_for_policy(PolicyArn=policy_arn)
assert response.get("PolicyGroups"), f"Expected PolicyGroups in response:\n{response}"
assert response.get("PolicyUsers"), f"Expected PolicyUsers in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies that are embedded in the specified IAM group")
def iam_list_group_policies(self, group_name: str) -> dict:
response = self.boto3_iam_client.list_group_policies(GroupName=group_name)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM groups")
def iam_list_groups(self) -> dict:
response = self.boto3_iam_client.list_groups()
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists the IAM groups that the specified IAM user belongs to")
def iam_list_groups_for_user(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_groups_for_user(UserName=user_name)
assert response.get("Groups"), f"Expected Groups in response:\n{response}"
return response
@reporter.step("Lists all the managed policies that are available in your AWS account")
def iam_list_policies(self) -> dict:
response = self.boto3_iam_client.list_policies()
assert response.get("Policies"), f"Expected Policies in response:\n{response}"
return response
@reporter.step("Lists information about the versions of the specified managed policy")
def iam_list_policy_versions(self, policy_arn: str) -> dict:
response = self.boto3_iam_client.list_policy_versions(PolicyArn=policy_arn)
assert response.get("Versions"), f"Expected Versions in response:\n{response}"
return response
@reporter.step("Lists the names of the inline policies embedded in the specified IAM user")
def iam_list_user_policies(self, user_name: str) -> dict:
response = self.boto3_iam_client.list_user_policies(UserName=user_name)
assert response.get("PolicyNames"), f"Expected PolicyNames in response:\n{response}"
return response
@reporter.step("Lists the IAM users")
def iam_list_users(self) -> dict:
response = self.boto3_iam_client.list_users()
assert response.get("Users"), f"Expected Users in response:\n{response}"
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM group")
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.put_group_policy(GroupName=group_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document))
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Adds or updates an inline policy document that is embedded in the specified IAM user")
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
response = self.boto3_iam_client.put_user_policy(UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document))
sleep(S3_SYNC_WAIT_TIME * 10)
return response
@reporter.step("Removes the specified user from the specified group")
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
response = self.boto3_iam_client.remove_user_from_group(GroupName=group_name, UserName=user_name)
return response
@reporter.step("Updates the name and/or the path of the specified IAM group")
def iam_update_group(self, group_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_group(GroupName=group_name, NewGroupName=new_name, NewPath='/')
return response
@reporter.step("Updates the name and/or the path of the specified IAM user")
def iam_update_user(self, user_name: str, new_name: str, new_path: Optional[str] = None) -> dict:
response = self.boto3_iam_client.update_user(UserName=user_name, NewUserName=new_name, NewPath='/')
return response

View file

@ -50,7 +50,7 @@ class BucketContainerResolver(ABC):
class S3ClientWrapper(HumanReadableABC):
@abstractmethod
def __init__(self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str) -> None:
def __init__(self, access_key_id: str, secret_access_key: str, s3gate_endpoint: str, profile: str, region: str) -> None:
pass
@abstractmethod
@ -395,3 +395,154 @@ class S3ClientWrapper(HumanReadableABC):
"""cp directory TODO: Add proper description"""
# END OF OBJECT METHODS #
# IAM METHODS #
@abstractmethod
def iam_add_user_to_group(self, user_name: str, group_name: str) -> dict:
'''Adds the specified user to the specified group'''
@abstractmethod
def iam_attach_group_policy(self, group: str, policy_arn: str) -> dict:
'''Attaches the specified managed policy to the specified IAM group'''
@abstractmethod
def iam_attach_user_policy(self, user_name: str, policy_arn: str) -> dict:
'''Attaches the specified managed policy to the specified user'''
@abstractmethod
def iam_create_access_key(self, user_name: str) -> dict:
'''Creates a new AWS secret access key and access key ID for the specified user'''
@abstractmethod
def iam_create_group(self, group_name: str) -> dict:
'''Creates a new group'''
@abstractmethod
def iam_create_policy(self, policy_name: str, policy_document: dict) -> dict:
'''Creates a new managed policy for your AWS account'''
@abstractmethod
def iam_create_user(self, user_name: str) -> dict:
'''Creates a new IAM user for your AWS account'''
@abstractmethod
def iam_delete_access_key(self, access_key_id: str, user_name: str) -> dict:
'''Deletes the access key pair associated with the specified IAM user'''
@abstractmethod
def iam_delete_group(self, group_name: str) -> dict:
'''Deletes the specified IAM group'''
@abstractmethod
def iam_delete_group_policy(self, group_name: str, policy_name: str) -> dict:
'''Deletes the specified inline policy that is embedded in the specified IAM group'''
@abstractmethod
def iam_delete_policy(self, policy_arn: str) -> dict:
'''Deletes the specified managed policy'''
@abstractmethod
def iam_delete_user(self, user_name: str) -> dict:
'''Deletes the specified IAM user'''
@abstractmethod
def iam_delete_user_policy(self, user_name: str, policy_name: str) -> dict:
'''Deletes the specified inline policy that is embedded in the specified IAM user'''
@abstractmethod
def iam_detach_group_policy(self, group_name: str, policy_arn: str) -> dict:
'''Removes the specified managed policy from the specified IAM group'''
@abstractmethod
def iam_detach_user_policy(self, user_name: str, policy_arn: str) -> dict:
'''Removes the specified managed policy from the specified user'''
@abstractmethod
def iam_get_group(self, group_name: str) -> dict:
'''Returns a list of IAM users that are in the specified IAM group'''
@abstractmethod
def iam_get_group_policy(self, group_name: str, policy_name: str) -> dict:
'''Retrieves the specified inline policy document that is embedded in the specified IAM group'''
@abstractmethod
def iam_get_policy(self, policy_arn: str) -> dict:
'''Retrieves information about the specified managed policy'''
@abstractmethod
def iam_get_policy_version(self, policy_arn: str, version_id: str) -> dict:
'''Retrieves information about the specified version of the specified managed policy'''
@abstractmethod
def iam_get_user(self, user_name: str) -> dict:
'''Retrieves information about the specified IAM user'''
@abstractmethod
def iam_get_user_policy(self, user_name: str, policy_name: str) -> dict:
'''Retrieves the specified inline policy document that is embedded in the specified IAM user'''
@abstractmethod
def iam_list_access_keys(self, user_name: str) -> dict:
'''Returns information about the access key IDs associated with the specified IAM user'''
@abstractmethod
def iam_list_attached_group_policies(self, group_name: str) -> dict:
'''Lists all managed policies that are attached to the specified IAM group'''
@abstractmethod
def iam_list_attached_user_policies(self, user_name: str) -> dict:
'''Lists all managed policies that are attached to the specified IAM user'''
@abstractmethod
def iam_list_entities_for_policy(self, policy_arn: str) -> dict:
'''Lists all IAM users, groups, and roles that the specified managed policy is attached to'''
@abstractmethod
def iam_list_group_policies(self, group_name: str) -> dict:
'''Lists the names of the inline policies that are embedded in the specified IAM group'''
@abstractmethod
def iam_list_groups(self) -> dict:
'''Lists the IAM groups'''
@abstractmethod
def iam_list_groups_for_user(self, user_name: str) -> dict:
'''Lists the IAM groups that the specified IAM user belongs to'''
@abstractmethod
def iam_list_policies(self) -> dict:
'''Lists all the managed policies that are available in your AWS account'''
@abstractmethod
def iam_list_policy_versions(self, policy_arn: str) -> dict:
'''Lists information about the versions of the specified managed policy'''
@abstractmethod
def iam_list_user_policies(self, user_name: str) -> dict:
'''Lists the names of the inline policies embedded in the specified IAM user'''
@abstractmethod
def iam_list_users(self) -> dict:
'''Lists the IAM users'''
@abstractmethod
def iam_put_group_policy(self, group_name: str, policy_name: str, policy_document: dict) -> dict:
'''Adds or updates an inline policy document that is embedded in the specified IAM group'''
@abstractmethod
def iam_put_user_policy(self, user_name: str, policy_name: str, policy_document: dict) -> dict:
'''Adds or updates an inline policy document that is embedded in the specified IAM user'''
@abstractmethod
def iam_remove_user_from_group(self, group_name: str, user_name: str) -> dict:
'''Removes the specified user from the specified group'''
@abstractmethod
def iam_update_group(self, group_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
'''Updates the name and/or the path of the specified IAM group'''
@abstractmethod
def iam_update_user(self, user_name: str, new_name: Optional[str] = None, new_path: Optional[str] = None) -> dict:
'''Updates the name and/or the path of the specified IAM user'''

View file

@ -95,6 +95,7 @@ class StorageContainer:
DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X"
REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X"
DEFAULT_EC_PLACEMENT_RULE = "EC 3.1"
@reporter.step("Create Container")

View file

@ -13,6 +13,7 @@ from frostfs_testlib.resources.common import ASSETS_DIR
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import wait_for_success
from frostfs_testlib.utils import json_utils
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
@ -695,6 +696,7 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
}
@wait_for_success()
@reporter.step("Search object nodes")
def get_object_nodes(
cluster: Cluster,

View file

@ -50,9 +50,7 @@ def get_via_http_gate(
else:
request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(
request, headers={"Host": node.storage_node.get_http_hostname()[0]}, stream=True, timeout=timeout, verify=False
)
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
if not resp.ok:
raise Exception(
@ -118,7 +116,6 @@ def get_via_http_gate_by_attribute(
cid: CID to get object from
attribute: attribute {name: attribute} value pair
endpoint: http gate endpoint
http_hostname: http host name on the node
request_path: (optional) http request path, if ommited - use default [{endpoint}/get_by_attribute/{Key}/{Value}]
"""
attr_name = list(attribute.keys())[0]
@ -129,9 +126,7 @@ def get_via_http_gate_by_attribute(
else:
request = f"{node.http_gate.get_endpoint()}{request_path}"
resp = requests.get(
request, stream=True, timeout=timeout, verify=False, headers={"Host": node.storage_node.get_http_hostname()[0]}
)
resp = requests.get(request, stream=True, timeout=timeout, verify=False)
if not resp.ok:
raise Exception(
@ -151,11 +146,8 @@ def get_via_http_gate_by_attribute(
return file_path
# TODO: pass http_hostname as a header
@reporter.step("Upload via HTTP Gate")
def upload_via_http_gate(
cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300
) -> str:
def upload_via_http_gate(cid: str, path: str, endpoint: str, headers: Optional[dict] = None, timeout: Optional[int] = 300) -> str:
"""
This function upload given object through HTTP gate
cid: CID to get object from
@ -198,7 +190,6 @@ def is_object_large(filepath: str) -> bool:
return False
# TODO: pass http_hostname as a header
@reporter.step("Upload via HTTP Gate using Curl")
def upload_via_http_gate_curl(
cid: str,
@ -259,7 +250,7 @@ def get_via_http_curl(cid: str, oid: str, node: ClusterNode) -> str:
file_path = os.path.join(os.getcwd(), ASSETS_DIR, f"{cid}_{oid}_{str(uuid.uuid4())}")
curl = GenericCli("curl", node.host)
curl(f'-k -H "Host: {node.storage_node.get_http_hostname()[0]}"', f"{request} > {file_path}", shell=local_shell)
curl(f"-k ", f"{request} > {file_path}", shell=local_shell)
return file_path

View file

@ -263,7 +263,7 @@ def check_node_not_in_map(node: StorageNode, shell: Shell, alive_node: Optional[
@reporter.step("Wait for node {node} is ready")
def wait_for_node_to_be_ready(node: StorageNode) -> None:
timeout, attempts = 30, 6
timeout, attempts = 60, 15
for _ in range(attempts):
try:
health_check = storage_node_healthcheck(node)

View file

@ -141,30 +141,16 @@ class ClusterNode:
return self.host.config.interfaces[interface.value]
def get_data_interfaces(self) -> list[str]:
return [
ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface
]
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "data" in name_interface]
def get_data_interface(self, search_interface: str) -> list[str]:
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_interface == interface
]
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_interface == interface]
def get_internal_interfaces(self) -> list[str]:
return [
ip_address
for name_interface, ip_address in self.host.config.interfaces.items()
if "internal" in name_interface
]
return [ip_address for name_interface, ip_address in self.host.config.interfaces.items() if "internal" in name_interface]
def get_internal_interface(self, search_internal: str) -> list[str]:
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_internal == interface
]
return [self.host.config.interfaces[interface] for interface in self.host.config.interfaces.keys() if search_internal == interface]
class Cluster:
@ -175,8 +161,6 @@ class Cluster:
default_rpc_endpoint: str
default_s3_gate_endpoint: str
default_http_gate_endpoint: str
default_http_hostname: str
default_s3_hostname: str
def __init__(self, hosting: Hosting) -> None:
self._hosting = hosting
@ -185,8 +169,6 @@ class Cluster:
self.default_rpc_endpoint = self.services(StorageNode)[0].get_rpc_endpoint()
self.default_s3_gate_endpoint = self.services(S3Gate)[0].get_endpoint()
self.default_http_gate_endpoint = self.services(HTTPGate)[0].get_endpoint()
self.default_http_hostname = self.services(StorageNode)[0].get_http_hostname()
self.default_s3_hostname = self.services(StorageNode)[0].get_s3_hostname()
@property
def hosts(self) -> list[Host]:

View file

@ -16,5 +16,3 @@ class ConfigAttributes:
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
CONTROL_ENDPOINT = "control_endpoint"
UN_LOCODE = "un_locode"
HTTP_HOSTNAME = "http_hostname"
S3_HOSTNAME = "s3_hostname"

View file

@ -187,15 +187,19 @@ class BackgroundLoadController:
read_from=self.load_params.read_from,
registry_file=self.load_params.registry_file,
verify_time=self.load_params.verify_time,
custom_registry=self.load_params.custom_registry,
load_type=self.load_params.load_type,
load_id=self.load_params.load_id,
vu_init_time=0,
working_dir=self.load_params.working_dir,
endpoint_selection_strategy=self.load_params.endpoint_selection_strategy,
k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy,
setup_timeout="1s",
setup_timeout=self.load_params.setup_timeout,
)
if self.verification_params.custom_registry:
self.verification_params.registry_file = self.load_params.custom_registry
if self.verification_params.verify_time is None:
raise RuntimeError("verify_time should not be none")

View file

@ -17,6 +17,7 @@ from frostfs_testlib.steps.network import IpHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import retry, run_optionally, wait_for_success
@ -413,41 +414,40 @@ class ClusterStateController:
)
frostfs_adm.morph.set_config(set_key_value=f"MaintenanceModeAllowed={status}")
@reporter.step("Set mode node to {status}")
def set_mode_node(self, cluster_node: ClusterNode, wallet: WalletInfo, status: str, await_tick: bool = True) -> None:
@reporter.step("Set node status to {status} in CSC")
def set_node_status(self, cluster_node: ClusterNode, wallet: WalletInfo, status: NodeStatus, await_tick: bool = True) -> None:
rpc_endpoint = cluster_node.storage_node.get_rpc_endpoint()
control_endpoint = cluster_node.service(StorageNode).get_control_endpoint()
frostfs_adm, frostfs_cli, frostfs_cli_remote = self._get_cli(local_shell=self.shell, local_wallet=wallet, cluster_node=cluster_node)
node_netinfo = NetmapParser.netinfo(frostfs_cli.netmap.netinfo(rpc_endpoint=rpc_endpoint).stdout)
frostfs_adm, frostfs_cli, frostfs_cli_remote = self._get_cli(self.shell, wallet, cluster_node)
node_netinfo = NetmapParser.netinfo(frostfs_cli.netmap.netinfo(rpc_endpoint).stdout)
with reporter.step("If status maintenance, then check that the option is enabled"):
if node_netinfo.maintenance_mode_allowed == "false":
frostfs_adm.morph.set_config(set_key_value="MaintenanceModeAllowed=true")
if node_netinfo.maintenance_mode_allowed == "false":
with reporter.step("Enable maintenance mode"):
frostfs_adm.morph.set_config("MaintenanceModeAllowed=true")
with reporter.step(f"Change the status to {status}"):
frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status=status)
with reporter.step(f"Set node status to {status} using FrostfsCli"):
frostfs_cli_remote.control.set_status(control_endpoint, status.value)
if not await_tick:
return
with reporter.step("Tick 1 epoch, and await 2 block"):
frostfs_adm.morph.force_new_epoch()
time.sleep(parse_time(MORPH_BLOCK_TIME) * 2)
with reporter.step("Tick 2 epoch with 2 block await."):
for _ in range(2):
frostfs_adm.morph.force_new_epoch()
time.sleep(parse_time(MORPH_BLOCK_TIME) * 2)
self.check_node_status(status=status, wallet=wallet, cluster_node=cluster_node)
self.await_node_status(status, wallet, cluster_node)
@wait_for_success(80, 8, title="Wait for storage status become {status}")
def check_node_status(self, status: str, wallet: WalletInfo, cluster_node: ClusterNode):
@wait_for_success(80, 8, title="Wait for node status become {status}")
def await_node_status(self, status: NodeStatus, wallet: WalletInfo, cluster_node: ClusterNode):
frostfs_cli = FrostfsCli(self.shell, FROSTFS_CLI_EXEC, wallet.config_path)
netmap = NetmapParser.snapshot_all_nodes(
frostfs_cli.netmap.snapshot(rpc_endpoint=cluster_node.storage_node.get_rpc_endpoint()).stdout
)
netmap = NetmapParser.snapshot_all_nodes(frostfs_cli.netmap.snapshot(cluster_node.storage_node.get_rpc_endpoint()).stdout)
netmap = [node for node in netmap if cluster_node.host_ip == node.node]
if status == "offline":
if status == NodeStatus.OFFLINE:
assert cluster_node.host_ip not in netmap, f"{cluster_node.host_ip} not in Offline"
else:
assert netmap[0].node_status == status.upper(), f"Node state - {netmap[0].node_status} != {status} expect"
assert netmap[0].node_status == status, f"Node status should be '{status}', but was '{netmap[0].node_status}'"
def _get_cli(
self, local_shell: Shell, local_wallet: WalletInfo, cluster_node: ClusterNode

View file

@ -154,15 +154,6 @@ class StorageNode(NodeBase):
def get_data_directory(self) -> str:
return self.host.get_data_directory(self.name)
def get_storage_config(self) -> str:
return self.host.get_storage_config(self.name)
def get_http_hostname(self) -> list[str]:
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
def get_s3_hostname(self) -> list[str]:
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)
def delete_blobovnicza(self):
self.host.delete_blobovnicza(self.name)

View file

@ -0,0 +1,13 @@
from dataclasses import dataclass
@dataclass
class PlacementPolicy:
name: str
value: str
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return self.__str__()

View file

@ -56,9 +56,7 @@ class Shard:
var_prefix = f"{SHARD_PREFIX}{shard_id}"
blobstor_count = Shard._get_blobstor_count_from_section(config_object, shard_id)
blobstors = [
Blobstor.from_config_object(config_object, shard_id, blobstor_id) for blobstor_id in range(blobstor_count)
]
blobstors = [Blobstor.from_config_object(config_object, shard_id, blobstor_id) for blobstor_id in range(blobstor_count)]
write_cache_enabled = config_object.as_bool(f"{var_prefix}_WRITECACHE_ENABLED")
@ -71,7 +69,13 @@ class Shard:
@staticmethod
def from_object(shard):
metabase = shard["metabase"]["path"] if "path" in shard["metabase"] else shard["metabase"]
writecache_enabled = True
if "enabled" in shard["writecache"]:
writecache_enabled = shard["writecache"]["enabled"]
writecache = shard["writecache"]["path"] if "path" in shard["writecache"] else shard["writecache"]
if not writecache_enabled:
writecache = ""
# Currently due to issue we need to check if pilorama exists in keys
# TODO: make pilorama mandatory after fix

View file

@ -28,7 +28,7 @@ class StorageObjectInfo(ObjectRef):
locks: Optional[list[LockObjectInfo]] = None
class ModeNode(HumanReadableEnum):
class NodeStatus(HumanReadableEnum):
MAINTENANCE: str = "maintenance"
ONLINE: str = "online"
OFFLINE: str = "offline"
@ -37,7 +37,7 @@ class ModeNode(HumanReadableEnum):
@dataclass
class NodeNetmapInfo:
node_id: str = None
node_status: ModeNode = None
node_status: NodeStatus = None
node_data_ips: list[str] = None
cluster_name: str = None
continent: str = None

View file

@ -41,7 +41,7 @@ def _run_with_passwd(cmd: str) -> str:
return cmd.decode()
def _configure_aws_cli(cmd: str, key_id: str, access_key: str, out_format: str = "json") -> str:
def _configure_aws_cli(cmd: str, key_id: str, access_key: str, region: str, out_format: str = "json") -> str:
child = pexpect.spawn(cmd)
child.delaybeforesend = 1
@ -52,7 +52,7 @@ def _configure_aws_cli(cmd: str, key_id: str, access_key: str, out_format: str =
child.sendline(access_key)
child.expect("Default region name.*")
child.sendline("")
child.sendline("region")
child.expect("Default output format.*")
child.sendline(out_format)