frostfs-testcases/pytest_tests/testsuites/object/test_object_api.py
a.berezin b8d5706515
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
[#250] Add range tests for container and non-container node endpoints
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-06-06 21:32:36 +03:00

600 lines
24 KiB
Python
Executable file

import logging
import random
import sys
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.error_patterns import (
INVALID_LENGTH_SPECIFIER,
INVALID_OFFSET_SPECIFIER,
INVALID_RANGE_OVERFLOW,
INVALID_RANGE_ZERO_LENGTH,
OBJECT_ALREADY_REMOVED,
OUT_OF_RANGE,
)
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
from frostfs_testlib.steps.cli.object import (
get_object_from_random_node,
get_range,
get_range_hash,
head_object,
put_object,
put_object_to_random_node,
search_object,
)
from frostfs_testlib.steps.complex_object_actions import get_complex_object_split_ranges
from frostfs_testlib.steps.storage_object import delete_object, delete_objects
from frostfs_testlib.steps.storage_policy import get_complex_object_copies, get_simple_object_copies
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.file_utils import generate_file, get_file_content, get_file_hash
logger = logging.getLogger("NeoLogger")
CLEANUP_TIMEOUT = 10
COMMON_ATTRIBUTE = {"common_key": "common_value"}
# Will upload object for each attribute set
OBJECT_ATTRIBUTES = [
None,
{"key1": 1, "key2": "abc", "common_key": "common_value"},
{"key1": 2, "common_key": "common_value"},
]
# Config for Range tests
RANGES_COUNT = 4 # by quarters
RANGE_MIN_LEN = 10
RANGE_MAX_LEN = 500
# Used for static ranges found with issues
STATIC_RANGES = {}
def generate_ranges(
storage_object: StorageObjectInfo, max_object_size: int, shell: Shell, cluster: Cluster
) -> list[(int, int)]:
file_range_step = storage_object.size / RANGES_COUNT
file_ranges = []
file_ranges_to_test = []
for i in range(0, RANGES_COUNT):
file_ranges.append((int(file_range_step * i), int(file_range_step)))
# For simple object we can read all file ranges without too much time for testing
if storage_object.size < max_object_size:
file_ranges_to_test.extend(file_ranges)
# For complex object we need to fetch multiple child objects from different nodes.
else:
assert (
storage_object.size >= RANGE_MAX_LEN + max_object_size
), f"Complex object size should be at least {max_object_size + RANGE_MAX_LEN}. Current: {storage_object.size}"
file_ranges_to_test.append((RANGE_MAX_LEN, max_object_size - RANGE_MAX_LEN))
file_ranges_to_test.extend(get_complex_object_split_ranges(storage_object, shell, cluster))
# Special cases to read some bytes from start and some bytes from end of object
file_ranges_to_test.append((0, RANGE_MIN_LEN))
file_ranges_to_test.append((storage_object.size - RANGE_MIN_LEN, RANGE_MIN_LEN))
for offset, length in file_ranges:
range_length = random.randint(RANGE_MIN_LEN, RANGE_MAX_LEN)
range_start = random.randint(offset, offset + length)
file_ranges_to_test.append((range_start, min(range_length, storage_object.size - range_start)))
file_ranges_to_test.extend(STATIC_RANGES.get(storage_object.size, []))
return file_ranges_to_test
@pytest.fixture(scope="module")
def common_container(default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster) -> str:
rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
with reporter.step(f"Create container with {rule} and put object"):
cid = create_container(default_wallet, client_shell, cluster.default_rpc_endpoint, rule)
return cid
@pytest.fixture(scope="module")
def container_nodes(
default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster, common_container: str
) -> list[ClusterNode]:
return search_nodes_with_container(
default_wallet, common_container, client_shell, cluster.default_rpc_endpoint, cluster
)
@pytest.fixture(scope="module")
def non_container_nodes(cluster: Cluster, container_nodes: list[ClusterNode]) -> list[ClusterNode]:
return list(set(cluster.cluster_nodes) - set(container_nodes))
@pytest.fixture(
# Scope session to upload/delete each files set only once
scope="module"
)
def storage_objects(
default_wallet: WalletInfo,
client_shell: Shell,
cluster: Cluster,
object_size: ObjectSize,
placement_policy: PlacementPolicy,
) -> list[StorageObjectInfo]:
wallet = default_wallet
# Separate containers for complex/simple objects to avoid side-effects
cid = create_container(
wallet, shell=client_shell, rule=placement_policy.value, endpoint=cluster.default_rpc_endpoint
)
file_path = generate_file(object_size.value)
file_hash = get_file_hash(file_path)
storage_objects = []
with reporter.step("Put objects"):
# We need to upload objects multiple times with different attributes
for attributes in OBJECT_ATTRIBUTES:
storage_object_id = put_object_to_random_node(
wallet=wallet,
path=file_path,
cid=cid,
shell=client_shell,
cluster=cluster,
attributes=attributes,
)
storage_object = StorageObjectInfo(cid, storage_object_id)
storage_object.size = object_size.value
storage_object.wallet = wallet
storage_object.file_path = file_path
storage_object.file_hash = file_hash
storage_object.attributes = attributes
storage_objects.append(storage_object)
yield storage_objects
# Teardown after all tests done with current param
delete_objects(storage_objects, client_shell, cluster)
@pytest.fixture()
def expected_object_copies(placement_policy: PlacementPolicy) -> int:
if placement_policy.name == "rep":
return 2
return 4
@pytest.mark.sanity
@pytest.mark.grpc_api
class TestObjectApi(ClusterTestBase):
@allure.title("Storage policy by native API (obj_size={object_size}, policy={placement_policy})")
def test_object_storage_policies(
self,
storage_objects: list[StorageObjectInfo],
simple_object_size: ObjectSize,
expected_object_copies: int,
):
"""
Validate object storage policy
"""
with reporter.step("Validate storage policy for objects"):
for storage_object in storage_objects:
if storage_object.size == simple_object_size.value:
copies = get_simple_object_copies(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
shell=self.shell,
nodes=self.cluster.storage_nodes,
)
else:
copies = get_complex_object_copies(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
shell=self.shell,
nodes=self.cluster.storage_nodes,
)
assert copies == expected_object_copies, f"Expected {expected_object_copies} copies"
@allure.title("Get object by native API (obj_size={object_size}, policy={placement_policy})")
def test_get_object_api(self, storage_objects: list[StorageObjectInfo]):
"""
Validate get object native API
"""
with reporter.step("Get objects and compare hashes"):
for storage_object in storage_objects:
file_path = get_object_from_random_node(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
cluster=self.cluster,
)
file_hash = get_file_hash(file_path)
assert storage_object.file_hash == file_hash
@allure.title("Head object by native API (obj_size={object_size}, policy={placement_policy})")
def test_head_object_api(self, storage_objects: list[StorageObjectInfo]):
"""
Validate head object native API
"""
storage_object_1 = storage_objects[0]
storage_object_2 = storage_objects[1]
with reporter.step("Head object and validate"):
head_object(
storage_object_1.wallet,
storage_object_1.cid,
storage_object_1.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
head_info = head_object(
storage_object_2.wallet,
storage_object_2.cid,
storage_object_2.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
self.check_header_is_presented(head_info, storage_object_2.attributes)
@allure.title("Head deleted object with --raw arg (obj_size={object_size}, policy={placement_policy})")
def test_object_head_raw(self, default_wallet: str, object_size: ObjectSize, placement_policy: PlacementPolicy):
with reporter.step("Create container"):
cid = create_container(
default_wallet, self.shell, self.cluster.default_rpc_endpoint, placement_policy.value
)
with reporter.step("Upload object"):
file_path = generate_file(object_size.value)
oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster)
with reporter.step("Delete object"):
delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Call object head --raw and expect error"):
with pytest.raises(Exception, match=OBJECT_ALREADY_REMOVED):
head_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint, is_raw=True)
@allure.title("Search objects by native API (obj_size={object_size}, policy={placement_policy})")
def test_search_object_api(self, storage_objects: list[StorageObjectInfo]):
"""
Validate object search by native API
"""
oids = [storage_object.oid for storage_object in storage_objects]
wallet = storage_objects[0].wallet
cid = storage_objects[0].cid
test_table = [
(OBJECT_ATTRIBUTES[1], oids[1:2]),
(OBJECT_ATTRIBUTES[2], oids[2:3]),
(COMMON_ATTRIBUTE, oids[1:3]),
]
with reporter.step("Search objects"):
# Search with no attributes
result = search_object(
wallet,
cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
expected_objects_list=oids,
root=True,
)
assert sorted(oids) == sorted(result)
# search by test table
for filter, expected_oids in test_table:
result = search_object(
wallet,
cid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
filters=filter,
expected_objects_list=expected_oids,
root=True,
)
assert sorted(expected_oids) == sorted(result)
@allure.title("Search objects with removed items (obj_size={object_size})")
def test_object_search_should_return_tombstone_items(self, default_wallet: WalletInfo, object_size: ObjectSize):
"""
Validate object search with removed items
"""
wallet = default_wallet
cid = create_container(wallet, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Upload file"):
file_path = generate_file(object_size.value)
file_hash = get_file_hash(file_path)
storage_object = StorageObjectInfo(
cid=cid,
oid=put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster),
size=object_size.value,
wallet=wallet,
file_path=file_path,
file_hash=file_hash,
)
with reporter.step("Search object"):
# Root Search object should return root object oid
result = search_object(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, root=True)
assert result == [storage_object.oid]
with reporter.step("Delete file"):
delete_objects([storage_object], self.shell, self.cluster)
with reporter.step("Search deleted object with --root"):
# Root Search object should return nothing
result = search_object(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, root=True)
assert len(result) == 0
with reporter.step("Search deleted object with --phy should return only tombstones"):
# Physical Search object should return only tombstones
result = search_object(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, phy=True)
assert storage_object.tombstone in result, "Search result should contain tombstone of removed object"
assert storage_object.oid not in result, "Search result should not contain ObjectId of removed object"
for tombstone_oid in result:
header = head_object(
wallet,
cid,
tombstone_oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)["header"]
object_type = header["objectType"]
assert (
object_type == "TOMBSTONE"
), f"Object wasn't deleted properly. Found object {tombstone_oid} with type {object_type}"
@allure.title("Get range hash by native API (obj_size={object_size}, policy={placement_policy})")
@pytest.mark.grpc_api
def test_object_get_range_hash(self, storage_objects: list[StorageObjectInfo], max_object_size):
"""
Validate get_range_hash for object by native gRPC API
"""
wallet = storage_objects[0].wallet
cid = storage_objects[0].cid
oids = [storage_object.oid for storage_object in storage_objects[:2]]
file_path = storage_objects[0].file_path
file_ranges_to_test = generate_ranges(storage_objects[0], max_object_size, self.shell, self.cluster)
logging.info(f"Ranges used in test {file_ranges_to_test}")
for range_start, range_len in file_ranges_to_test:
range_cut = f"{range_start}:{range_len}"
with reporter.step(f"Get range hash ({range_cut})"):
for oid in oids:
range_hash = get_range_hash(
wallet,
cid,
oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
range_cut=range_cut,
)
assert (
get_file_hash(file_path, range_len, range_start) == range_hash
), f"Expected range hash to match {range_cut} slice of file payload"
@allure.title("Get range by native API (obj_size={object_size}, policy={placement_policy})")
@pytest.mark.grpc_api
def test_object_get_range(self, storage_objects: list[StorageObjectInfo], max_object_size):
"""
Validate get_range for object by native gRPC API
"""
wallet = storage_objects[0].wallet
cid = storage_objects[0].cid
oids = [storage_object.oid for storage_object in storage_objects[:2]]
file_path = storage_objects[0].file_path
file_ranges_to_test = generate_ranges(storage_objects[0], max_object_size, self.shell, self.cluster)
logging.info(f"Ranges used in test {file_ranges_to_test}")
for range_start, range_len in file_ranges_to_test:
range_cut = f"{range_start}:{range_len}"
with reporter.step(f"Get range ({range_cut})"):
for oid in oids:
_, range_content = get_range(
wallet,
cid,
oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
range_cut=range_cut,
)
assert (
get_file_content(file_path, content_len=range_len, mode="rb", offset=range_start)
== range_content
), f"Expected range content to match {range_cut} slice of file payload"
@allure.title("[NEGATIVE] Get invalid range by native API (obj_size={object_size}, policy={placement_policy})")
@pytest.mark.grpc_api
def test_object_get_range_negatives(
self,
storage_objects: list[StorageObjectInfo],
):
"""
Validate get_range negative for object by native gRPC API
"""
wallet = storage_objects[0].wallet
cid = storage_objects[0].cid
oids = [storage_object.oid for storage_object in storage_objects[:2]]
file_size = storage_objects[0].size
assert (
RANGE_MIN_LEN < file_size
), f"Incorrect test setup. File size ({file_size}) is less than RANGE_MIN_LEN ({RANGE_MIN_LEN})"
file_ranges_to_test: list[tuple(int, int, str)] = [
# Offset is bigger than the file size, the length is small.
(file_size + 1, RANGE_MIN_LEN, OUT_OF_RANGE),
# Offset is ok, but offset+length is too big.
(file_size - RANGE_MIN_LEN, RANGE_MIN_LEN * 2, OUT_OF_RANGE),
# Offset is ok, and length is very-very big (e.g. MaxUint64) so that offset+length is wrapped and still "valid".
(RANGE_MIN_LEN, sys.maxsize * 2 + 1, INVALID_RANGE_OVERFLOW),
# Length is zero
(10, 0, INVALID_RANGE_ZERO_LENGTH),
# Negative values
(-1, 1, INVALID_OFFSET_SPECIFIER),
(10, -5, INVALID_LENGTH_SPECIFIER),
]
for range_start, range_len, expected_error in file_ranges_to_test:
range_cut = f"{range_start}:{range_len}"
expected_error = expected_error.format(range=range_cut) if "{range}" in expected_error else expected_error
with reporter.step(f"Get range ({range_cut})"):
for oid in oids:
with pytest.raises(Exception, match=expected_error):
get_range(
wallet,
cid,
oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
range_cut=range_cut,
)
@allure.title("[NEGATIVE] Get invalid range hash by native API (obj_size={object_size}, policy={placement_policy})")
def test_object_get_range_hash_negatives(
self,
storage_objects: list[StorageObjectInfo],
):
"""
Validate get_range_hash negative for object by native gRPC API
"""
wallet = storage_objects[0].wallet
cid = storage_objects[0].cid
oids = [storage_object.oid for storage_object in storage_objects[:2]]
file_size = storage_objects[0].size
assert (
RANGE_MIN_LEN < file_size
), f"Incorrect test setup. File size ({file_size}) is less than RANGE_MIN_LEN ({RANGE_MIN_LEN})"
file_ranges_to_test: list[tuple(int, int, str)] = [
# Offset is bigger than the file size, the length is small.
(file_size + 1, RANGE_MIN_LEN, OUT_OF_RANGE),
# Offset is ok, but offset+length is too big.
(file_size - RANGE_MIN_LEN, RANGE_MIN_LEN * 2, OUT_OF_RANGE),
# Offset is ok, and length is very-very big (e.g. MaxUint64) so that offset+length is wrapped and still "valid".
(RANGE_MIN_LEN, sys.maxsize * 2 + 1, INVALID_RANGE_OVERFLOW),
# Length is zero
(10, 0, INVALID_RANGE_ZERO_LENGTH),
# Negative values
(-1, 1, INVALID_OFFSET_SPECIFIER),
(10, -5, INVALID_LENGTH_SPECIFIER),
]
for range_start, range_len, expected_error in file_ranges_to_test:
range_cut = f"{range_start}:{range_len}"
expected_error = expected_error.format(range=range_cut) if "{range}" in expected_error else expected_error
with reporter.step(f"Get range hash ({range_cut})"):
for oid in oids:
with pytest.raises(Exception, match=expected_error):
get_range_hash(
wallet,
cid,
oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
range_cut=range_cut,
)
@allure.title("Get range from container and non-container nodes (object_size={object_size})")
def test_get_range_from_different_node(
self,
default_wallet: str,
common_container: str,
container_nodes: list[ClusterNode],
non_container_nodes: list[ClusterNode],
file_path: str,
):
with reporter.step("Put object to container"):
container_node = random.choice(container_nodes)
oid = put_object(
default_wallet, file_path, common_container, self.shell, container_node.storage_node.get_rpc_endpoint()
)
with reporter.step("Get range from container node endpoint"):
get_range(
default_wallet,
common_container,
oid,
"0:10",
self.shell,
container_node.storage_node.get_rpc_endpoint(),
)
with reporter.step("Get range from non-container node endpoint"):
non_container_node = random.choice(non_container_nodes)
get_range(
default_wallet,
common_container,
oid,
"0:10",
self.shell,
non_container_node.storage_node.get_rpc_endpoint(),
)
@allure.title("Get range hash from container and non-container nodes (object_size={object_size})")
def test_get_range_hash_from_different_node(
self,
default_wallet: str,
common_container: str,
container_nodes: list[ClusterNode],
non_container_nodes: list[ClusterNode],
file_path: str,
):
with reporter.step("Put object to container"):
container_node = random.choice(container_nodes)
oid = put_object(
default_wallet, file_path, common_container, self.shell, container_node.storage_node.get_rpc_endpoint()
)
with reporter.step("Get range hash from container node endpoint"):
get_range_hash(
default_wallet,
common_container,
oid,
"0:10",
self.shell,
container_node.storage_node.get_rpc_endpoint(),
)
with reporter.step("Get range hash from non-container node endpoint"):
non_container_node = random.choice(non_container_nodes)
get_range_hash(
default_wallet,
common_container,
oid,
"0:10",
self.shell,
non_container_node.storage_node.get_rpc_endpoint(),
)
def check_header_is_presented(self, head_info: dict, object_header: dict) -> None:
for key_to_check, val_to_check in object_header.items():
assert key_to_check in head_info["header"]["attributes"], f"Key {key_to_check} is found in {head_object}"
assert head_info["header"]["attributes"].get(key_to_check) == str(
val_to_check
), f"Value {val_to_check} is equal"