218 lines
6.2 KiB
Python
218 lines
6.2 KiB
Python
from typing import Optional
|
|
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
|
from frostfs_testlib.resources.error_patterns import OBJECT_ACCESS_DENIED
|
|
from frostfs_testlib.shell import Shell
|
|
from frostfs_testlib.steps.cli.object import (
|
|
delete_object,
|
|
get_object_from_random_node,
|
|
get_range,
|
|
get_range_hash,
|
|
head_object,
|
|
put_object_to_random_node,
|
|
search_object,
|
|
)
|
|
from frostfs_testlib.storage.cluster import Cluster
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.utils import string_utils
|
|
from frostfs_testlib.utils.file_utils import get_file_hash
|
|
|
|
OPERATION_ERROR_TYPE = RuntimeError
|
|
|
|
|
|
def can_get_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
oid: str,
|
|
file_name: str,
|
|
shell: Shell,
|
|
cluster: Cluster,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
) -> bool:
|
|
with reporter.step("Try get object from container"):
|
|
try:
|
|
got_file_path = get_object_from_random_node(
|
|
wallet,
|
|
cid,
|
|
oid,
|
|
bearer=bearer,
|
|
xhdr=xhdr,
|
|
shell=shell,
|
|
cluster=cluster,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
assert get_file_hash(file_name) == get_file_hash(got_file_path)
|
|
return True
|
|
|
|
|
|
def can_put_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
file_name: str,
|
|
shell: Shell,
|
|
cluster: Cluster,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
attributes: Optional[dict] = None,
|
|
) -> bool:
|
|
with reporter.step("Try put object to container"):
|
|
try:
|
|
put_object_to_random_node(
|
|
wallet,
|
|
file_name,
|
|
cid,
|
|
bearer=bearer,
|
|
xhdr=xhdr,
|
|
attributes=attributes,
|
|
shell=shell,
|
|
cluster=cluster,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
return True
|
|
|
|
|
|
def can_delete_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
oid: str,
|
|
shell: Shell,
|
|
endpoint: str,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
) -> bool:
|
|
with reporter.step("Try delete object from container"):
|
|
try:
|
|
delete_object(
|
|
wallet,
|
|
cid,
|
|
oid,
|
|
bearer=bearer,
|
|
xhdr=xhdr,
|
|
shell=shell,
|
|
endpoint=endpoint,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
return True
|
|
|
|
|
|
def can_get_head_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
oid: str,
|
|
shell: Shell,
|
|
endpoint: str,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
|
) -> bool:
|
|
with reporter.step("Try get head of object"):
|
|
try:
|
|
head_object(
|
|
wallet,
|
|
cid,
|
|
oid,
|
|
bearer=bearer,
|
|
xhdr=xhdr,
|
|
shell=shell,
|
|
endpoint=endpoint,
|
|
timeout=timeout,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
return True
|
|
|
|
|
|
def can_get_range_of_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
oid: str,
|
|
shell: Shell,
|
|
endpoint: str,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
|
) -> bool:
|
|
with reporter.step("Try get range of object"):
|
|
try:
|
|
get_range(
|
|
wallet,
|
|
cid,
|
|
oid,
|
|
bearer=bearer,
|
|
range_cut="0:10",
|
|
xhdr=xhdr,
|
|
shell=shell,
|
|
endpoint=endpoint,
|
|
timeout=timeout,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
return True
|
|
|
|
|
|
def can_get_range_hash_of_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
oid: str,
|
|
shell: Shell,
|
|
endpoint: str,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
|
) -> bool:
|
|
with reporter.step("Try get range hash of object"):
|
|
try:
|
|
get_range_hash(
|
|
wallet,
|
|
cid,
|
|
oid,
|
|
bearer=bearer,
|
|
range_cut="0:10",
|
|
xhdr=xhdr,
|
|
shell=shell,
|
|
endpoint=endpoint,
|
|
timeout=timeout,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
return True
|
|
|
|
|
|
def can_search_object(
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
shell: Shell,
|
|
endpoint: str,
|
|
oid: Optional[str] = None,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
|
) -> bool:
|
|
with reporter.step("Try search object in container"):
|
|
try:
|
|
oids = search_object(
|
|
wallet,
|
|
cid,
|
|
bearer=bearer,
|
|
xhdr=xhdr,
|
|
shell=shell,
|
|
endpoint=endpoint,
|
|
timeout=timeout,
|
|
)
|
|
except OPERATION_ERROR_TYPE as err:
|
|
assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}"
|
|
return False
|
|
if oid:
|
|
return oid in oids
|
|
return True
|