frostfs-testcases/pytest_tests/helpers/object_access.py

246 lines
6.9 KiB
Python
Raw Permalink Normal View History

from typing import Optional
from frostfs_testlib import reporter
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.error_patterns import OBJECT_ACCESS_DENIED
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.object import (
delete_object,
get_object_from_random_node,
get_range,
get_range_hash,
head_object,
put_object_to_random_node,
search_object,
)
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.utils import string_utils
from frostfs_testlib.utils.file_utils import get_file_hash
OPERATION_ERROR_TYPE = RuntimeError
def can_get_object(
wallet: str,
cid: str,
oid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> bool:
with reporter.step("Try get object from container"):
try:
got_file_path = get_object_from_random_node(
wallet,
cid,
oid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
cluster=cluster,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
assert get_file_hash(file_name) == get_file_hash(got_file_path)
return True
def can_put_object(
wallet: str,
cid: str,
file_name: str,
shell: Shell,
cluster: Cluster,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
attributes: Optional[dict] = None,
) -> bool:
with reporter.step("Try put object to container"):
try:
put_object_to_random_node(
wallet,
file_name,
cid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
attributes=attributes,
shell=shell,
cluster=cluster,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
return True
def can_delete_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
) -> bool:
with reporter.step("Try delete object from container"):
try:
delete_object(
wallet,
cid,
oid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
return True
def can_get_head_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> bool:
with reporter.step("Try get head of object"):
try:
head_object(
wallet,
cid,
oid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
timeout=timeout,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
return True
def can_get_range_of_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> bool:
with reporter.step("Try get range of object"):
try:
get_range(
wallet,
cid,
oid,
bearer=bearer,
range_cut="0:10",
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
timeout=timeout,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
return True
def can_get_range_hash_of_object(
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> bool:
with reporter.step("Try get range hash of object"):
try:
get_range_hash(
wallet,
cid,
oid,
bearer=bearer,
range_cut="0:10",
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
timeout=timeout,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
return True
def can_search_object(
wallet: str,
cid: str,
shell: Shell,
endpoint: str,
oid: Optional[str] = None,
bearer: Optional[str] = None,
wallet_config: Optional[str] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> bool:
with reporter.step("Try search object in container"):
try:
oids = search_object(
wallet,
cid,
bearer=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
endpoint=endpoint,
timeout=timeout,
)
except OPERATION_ERROR_TYPE as err:
assert string_utils.is_str_match_pattern(
err, OBJECT_ACCESS_DENIED
), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
return False
if oid:
return oid in oids
return True