forked from TrueCloudLab/frostfs-testlib
270 lines
10 KiB
Python
270 lines
10 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from dateutil.parser import parse
|
|
|
|
from frostfs_testlib.cli import FrostfsAuthmate
|
|
from frostfs_testlib.reporter import get_reporter
|
|
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
|
|
from frostfs_testlib.resources.common import CREDENTIALS_CREATE_TIMEOUT
|
|
from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus
|
|
from frostfs_testlib.shell import CommandOptions, InteractiveInput, Shell
|
|
from frostfs_testlib.shell.interfaces import SshCredentials
|
|
from frostfs_testlib.steps.cli.container import (
|
|
search_container_by_name,
|
|
search_nodes_with_container,
|
|
)
|
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
|
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.utils.cli_utils import _run_with_passwd
|
|
|
|
reporter = get_reporter()
|
|
logger = logging.getLogger("NeoLogger")
|
|
|
|
|
|
@reporter.step_deco("Expected all objects are presented in the bucket")
|
|
def check_objects_in_bucket(
|
|
s3_client: S3ClientWrapper,
|
|
bucket: str,
|
|
expected_objects: list,
|
|
unexpected_objects: Optional[list] = None,
|
|
) -> None:
|
|
unexpected_objects = unexpected_objects or []
|
|
bucket_objects = s3_client.list_objects(bucket)
|
|
assert len(bucket_objects) == len(
|
|
expected_objects
|
|
), f"Expected {len(expected_objects)} objects in the bucket"
|
|
for bucket_object in expected_objects:
|
|
assert (
|
|
bucket_object in bucket_objects
|
|
), f"Expected object {bucket_object} in objects list {bucket_objects}"
|
|
|
|
for bucket_object in unexpected_objects:
|
|
assert (
|
|
bucket_object not in bucket_objects
|
|
), f"Expected object {bucket_object} not in objects list {bucket_objects}"
|
|
|
|
|
|
@reporter.step_deco("Try to get object and got error")
|
|
def try_to_get_objects_and_expect_error(
|
|
s3_client: S3ClientWrapper, bucket: str, object_keys: list
|
|
) -> None:
|
|
for obj in object_keys:
|
|
try:
|
|
s3_client.get_object(bucket, obj)
|
|
raise AssertionError(f"Object {obj} found in bucket {bucket}")
|
|
except Exception as err:
|
|
assert "The specified key does not exist" in str(
|
|
err
|
|
), f"Expected error in exception {err}"
|
|
|
|
|
|
@reporter.step_deco("Set versioning status to '{status}' for bucket '{bucket}'")
|
|
def set_bucket_versioning(s3_client: S3ClientWrapper, bucket: str, status: VersioningStatus):
|
|
if status == VersioningStatus.UNDEFINED:
|
|
return
|
|
|
|
s3_client.get_bucket_versioning_status(bucket)
|
|
s3_client.put_bucket_versioning(bucket, status=status)
|
|
bucket_status = s3_client.get_bucket_versioning_status(bucket)
|
|
assert bucket_status == status.value, f"Expected {bucket_status} status. Got {status.value}"
|
|
|
|
|
|
def object_key_from_file_path(full_path: str) -> str:
|
|
return os.path.basename(full_path)
|
|
|
|
|
|
def assert_tags(
|
|
actual_tags: list, expected_tags: Optional[list] = None, unexpected_tags: Optional[list] = None
|
|
) -> None:
|
|
expected_tags = (
|
|
[{"Key": key, "Value": value} for key, value in expected_tags] if expected_tags else []
|
|
)
|
|
unexpected_tags = (
|
|
[{"Key": key, "Value": value} for key, value in unexpected_tags] if unexpected_tags else []
|
|
)
|
|
if expected_tags == []:
|
|
assert not actual_tags, f"Expected there is no tags, got {actual_tags}"
|
|
assert len(expected_tags) == len(actual_tags)
|
|
for tag in expected_tags:
|
|
assert tag in actual_tags, f"Tag {tag} must be in {actual_tags}"
|
|
for tag in unexpected_tags:
|
|
assert tag not in actual_tags, f"Tag {tag} should not be in {actual_tags}"
|
|
|
|
|
|
@reporter.step_deco("Expected all tags are presented in object")
|
|
def check_tags_by_object(
|
|
s3_client: S3ClientWrapper,
|
|
bucket: str,
|
|
key: str,
|
|
expected_tags: list,
|
|
unexpected_tags: Optional[list] = None,
|
|
) -> None:
|
|
actual_tags = s3_client.get_object_tagging(bucket, key)
|
|
assert_tags(
|
|
expected_tags=expected_tags, unexpected_tags=unexpected_tags, actual_tags=actual_tags
|
|
)
|
|
|
|
|
|
@reporter.step_deco("Expected all tags are presented in bucket")
|
|
def check_tags_by_bucket(
|
|
s3_client: S3ClientWrapper,
|
|
bucket: str,
|
|
expected_tags: list,
|
|
unexpected_tags: Optional[list] = None,
|
|
) -> None:
|
|
actual_tags = s3_client.get_bucket_tagging(bucket)
|
|
assert_tags(
|
|
expected_tags=expected_tags, unexpected_tags=unexpected_tags, actual_tags=actual_tags
|
|
)
|
|
|
|
|
|
def assert_object_lock_mode(
|
|
s3_client: S3ClientWrapper,
|
|
bucket: str,
|
|
file_name: str,
|
|
object_lock_mode: str,
|
|
retain_until_date: datetime,
|
|
legal_hold_status: str = "OFF",
|
|
retain_period: Optional[int] = None,
|
|
):
|
|
object_dict = s3_client.get_object(bucket, file_name, full_output=True)
|
|
assert (
|
|
object_dict.get("ObjectLockMode") == object_lock_mode
|
|
), f"Expected Object Lock Mode is {object_lock_mode}"
|
|
assert (
|
|
object_dict.get("ObjectLockLegalHoldStatus") == legal_hold_status
|
|
), f"Expected Object Lock Legal Hold Status is {legal_hold_status}"
|
|
object_retain_date = object_dict.get("ObjectLockRetainUntilDate")
|
|
retain_date = (
|
|
parse(object_retain_date) if isinstance(object_retain_date, str) else object_retain_date
|
|
)
|
|
if retain_until_date:
|
|
assert retain_date.strftime("%Y-%m-%dT%H:%M:%S") == retain_until_date.strftime(
|
|
"%Y-%m-%dT%H:%M:%S"
|
|
), f'Expected Object Lock Retain Until Date is {str(retain_until_date.strftime("%Y-%m-%dT%H:%M:%S"))}'
|
|
elif retain_period:
|
|
last_modify_date = object_dict.get("LastModified")
|
|
last_modify = (
|
|
parse(last_modify_date) if isinstance(last_modify_date, str) else last_modify_date
|
|
)
|
|
assert (
|
|
retain_date - last_modify + timedelta(seconds=1)
|
|
).days == retain_period, f"Expected retention period is {retain_period} days"
|
|
|
|
|
|
def assert_s3_acl(acl_grants: list, permitted_users: str):
|
|
if permitted_users == "AllUsers":
|
|
grantees = {"AllUsers": 0, "CanonicalUser": 0}
|
|
for acl_grant in acl_grants:
|
|
if acl_grant.get("Grantee", {}).get("Type") == "Group":
|
|
uri = acl_grant.get("Grantee", {}).get("URI")
|
|
permission = acl_grant.get("Permission")
|
|
assert (uri, permission) == (
|
|
"http://acs.amazonaws.com/groups/global/AllUsers",
|
|
"FULL_CONTROL",
|
|
), "All Groups should have FULL_CONTROL"
|
|
grantees["AllUsers"] += 1
|
|
if acl_grant.get("Grantee", {}).get("Type") == "CanonicalUser":
|
|
permission = acl_grant.get("Permission")
|
|
assert permission == "FULL_CONTROL", "Canonical User should have FULL_CONTROL"
|
|
grantees["CanonicalUser"] += 1
|
|
assert grantees["AllUsers"] >= 1, "All Users should have FULL_CONTROL"
|
|
assert grantees["CanonicalUser"] >= 1, "Canonical User should have FULL_CONTROL"
|
|
|
|
if permitted_users == "CanonicalUser":
|
|
for acl_grant in acl_grants:
|
|
if acl_grant.get("Grantee", {}).get("Type") == "CanonicalUser":
|
|
permission = acl_grant.get("Permission")
|
|
assert permission == "FULL_CONTROL", "Only CanonicalUser should have FULL_CONTROL"
|
|
else:
|
|
logger.error("FULL_CONTROL is given to All Users")
|
|
|
|
|
|
@reporter.step_deco("Init S3 Credentials")
|
|
def init_s3_credentials(
|
|
wallet: WalletInfo,
|
|
shell: Shell,
|
|
cluster: Cluster,
|
|
policy: Optional[dict] = None,
|
|
s3gates: Optional[list[S3Gate]] = None,
|
|
container_placement_policy: Optional[str] = None,
|
|
):
|
|
gate_public_keys = []
|
|
bucket = str(uuid.uuid4())
|
|
if not s3gates:
|
|
s3gates = [cluster.s3_gates[0]]
|
|
for s3gate in s3gates:
|
|
gate_public_keys.append(s3gate.get_wallet_public_key())
|
|
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC)
|
|
issue_secret_output = frostfs_authmate_exec.secret.issue(
|
|
wallet=wallet.path,
|
|
peer=cluster.default_rpc_endpoint,
|
|
gate_public_key=gate_public_keys,
|
|
wallet_password=wallet.password,
|
|
container_policy=policy,
|
|
container_friendly_name=bucket,
|
|
container_placement_policy=container_placement_policy,
|
|
).stdout
|
|
aws_access_key_id = str(
|
|
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
|
|
"aws_access_key_id"
|
|
)
|
|
)
|
|
aws_secret_access_key = str(
|
|
re.search(
|
|
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)", issue_secret_output
|
|
).group("aws_secret_access_key")
|
|
)
|
|
cid = str(
|
|
re.search(r"container_id.*:\s.(?P<container_id>\w*)", issue_secret_output).group(
|
|
"container_id"
|
|
)
|
|
)
|
|
return cid, aws_access_key_id, aws_secret_access_key
|
|
|
|
|
|
@reporter.step_deco("Delete bucket with all objects")
|
|
def delete_bucket_with_objects(s3_client: S3ClientWrapper, bucket: str):
|
|
versioning_status = s3_client.get_bucket_versioning_status(bucket)
|
|
if versioning_status == VersioningStatus.ENABLED.value:
|
|
# From versioned bucket we should delete all versions and delete markers of all objects
|
|
objects_versions = s3_client.list_objects_versions(bucket)
|
|
if objects_versions:
|
|
s3_client.delete_object_versions_without_dm(bucket, objects_versions)
|
|
objects_delete_markers = s3_client.list_delete_markers(bucket)
|
|
if objects_delete_markers:
|
|
s3_client.delete_object_versions_without_dm(bucket, objects_delete_markers)
|
|
|
|
else:
|
|
# From non-versioned bucket it's sufficient to delete objects by key
|
|
objects = s3_client.list_objects(bucket)
|
|
if objects:
|
|
s3_client.delete_objects(bucket, objects)
|
|
objects_delete_markers = s3_client.list_delete_markers(bucket)
|
|
if objects_delete_markers:
|
|
s3_client.delete_object_versions_without_dm(bucket, objects_delete_markers)
|
|
|
|
# Delete the bucket itself
|
|
s3_client.delete_bucket(bucket)
|
|
|
|
|
|
@reporter.step_deco("Search nodes bucket")
|
|
def search_nodes_with_bucket(
|
|
cluster: Cluster,
|
|
bucket_name: str,
|
|
wallet: str,
|
|
shell: Shell,
|
|
endpoint: str,
|
|
) -> list[ClusterNode]:
|
|
cid = search_container_by_name(wallet=wallet, name=bucket_name, shell=shell, endpoint=endpoint)
|
|
nodes_list = search_nodes_with_container(
|
|
wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster
|
|
)
|
|
return nodes_list
|