frostfs-testcases/pytest_tests/testsuites/object/test_object_lock.py
a.berezin a9e0ddfe66 [] Add marks
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-11-06 19:18:47 +03:00

717 lines
26 KiB
Python
Executable file

import logging
import re
from datetime import datetime
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.credentials.interfaces import CredentialsProvider, User
from frostfs_testlib.resources.common import STORAGE_GC_TIME
from frostfs_testlib.resources.error_patterns import (
LIFETIME_REQUIRED,
LOCK_NON_REGULAR_OBJECT,
LOCK_OBJECT_EXPIRATION,
LOCK_OBJECT_REMOVAL,
OBJECT_ALREADY_REMOVED,
OBJECT_IS_LOCKED,
OBJECT_NOT_FOUND,
)
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container
from frostfs_testlib.steps.cli.object import delete_object, head_object, lock_object
from frostfs_testlib.steps.complex_object_actions import get_link_object, get_storage_object_chunks
from frostfs_testlib.steps.epoch import ensure_fresh_epoch, get_epoch, tick_epoch
from frostfs_testlib.steps.node_management import drop_object
from frostfs_testlib.steps.storage_object import delete_objects
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import LockObjectInfo, StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import expect_not_raises, wait_for_success
from frostfs_testlib.utils import datetime_utils
from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes
logger = logging.getLogger("NeoLogger")
FIXTURE_LOCK_LIFETIME = 5
FIXTURE_OBJECT_LIFETIME = 10
@pytest.fixture(scope="module")
def user_wallet(credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo:
with reporter.step("Create user wallet with container"):
user = User(f"user_{hex(int(datetime.now().timestamp() * 1000000))}")
return credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0])
@pytest.fixture(scope="module")
def user_container(user_wallet: WalletInfo, client_shell: Shell, cluster: Cluster):
container_id = create_container(user_wallet, shell=client_shell, endpoint=cluster.default_rpc_endpoint)
return StorageContainer(StorageContainerInfo(container_id, user_wallet), client_shell, cluster)
@pytest.fixture(scope="module")
def locked_storage_object(
user_container: StorageContainer,
client_shell: Shell,
cluster: Cluster,
object_size: ObjectSize,
):
"""
Intention of this fixture is to provide storage object which is NOT expected to be deleted during test act phase
"""
with reporter.step("Creating locked object"):
current_epoch = ensure_fresh_epoch(client_shell, cluster)
expiration_epoch = current_epoch + FIXTURE_LOCK_LIFETIME
storage_object = user_container.generate_object(
object_size.value, expire_at=current_epoch + FIXTURE_OBJECT_LIFETIME
)
lock_object_id = lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
client_shell,
cluster.default_rpc_endpoint,
lifetime=FIXTURE_LOCK_LIFETIME,
)
storage_object.locks = [
LockObjectInfo(storage_object.cid, lock_object_id, FIXTURE_LOCK_LIFETIME, expiration_epoch)
]
yield storage_object
with reporter.step("Delete created locked object"):
current_epoch = get_epoch(client_shell, cluster)
epoch_diff = expiration_epoch - current_epoch + 1
if epoch_diff > 0:
with reporter.step(f"Tick {epoch_diff} epochs"):
for _ in range(epoch_diff):
tick_epoch(client_shell, cluster)
try:
delete_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
client_shell,
cluster.default_rpc_endpoint,
)
except Exception as ex:
ex_message = str(ex)
# It's okay if object already removed
if not re.search(OBJECT_NOT_FOUND, ex_message) and not re.search(OBJECT_ALREADY_REMOVED, ex_message):
raise ex
logger.debug(ex_message)
@wait_for_success(datetime_utils.parse_time(STORAGE_GC_TIME))
def check_object_not_found(wallet: WalletInfo, cid: str, oid: str, shell: Shell, rpc_endpoint: str):
with pytest.raises(Exception, match=OBJECT_NOT_FOUND):
head_object(
wallet,
cid,
oid,
shell,
rpc_endpoint,
)
def verify_object_available(wallet: WalletInfo, cid: str, oid: str, shell: Shell, rpc_endpoint: str):
with expect_not_raises():
head_object(
wallet,
cid,
oid,
shell,
rpc_endpoint,
)
@pytest.mark.nightly
@pytest.mark.grpc_object_lock
class TestObjectLockWithGrpc(ClusterTestBase):
@pytest.fixture()
def new_locked_storage_object(self, user_container: StorageContainer, object_size: ObjectSize) -> StorageObjectInfo:
"""
Intention of this fixture is to provide new storage object for tests which may delete or corrupt the object or it's complementary objects
So we need a new one each time we ask for it
"""
with reporter.step("Creating locked object"):
current_epoch = self.get_epoch()
storage_object = user_container.generate_object(
object_size.value, expire_at=current_epoch + FIXTURE_OBJECT_LIFETIME
)
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
lifetime=FIXTURE_LOCK_LIFETIME,
)
return storage_object
@allure.title("Locked object is protected from deletion (obj_size={object_size})")
def test_locked_object_cannot_be_deleted(
self,
locked_storage_object: StorageObjectInfo,
):
"""
Locked object should be protected from deletion
"""
with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
delete_object(
locked_storage_object.wallet,
locked_storage_object.cid,
locked_storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Lock object itself is protected from deletion")
# We operate with only lock object here so no complex object needed in this test
@pytest.mark.parametrize("object_size", ["simple"], indirect=True)
def test_lock_object_itself_cannot_be_deleted(
self,
locked_storage_object: StorageObjectInfo,
):
"""
Lock object itself should be protected from deletion
"""
lock_object = locked_storage_object.locks[0]
wallet_path = locked_storage_object.wallet
with pytest.raises(Exception, match=LOCK_OBJECT_REMOVAL):
delete_object(
wallet_path,
lock_object.cid,
lock_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Lock object itself cannot be locked")
# We operate with only lock object here so no complex object needed in this test
@pytest.mark.parametrize("object_size", ["simple"], indirect=True)
def test_lock_object_cannot_be_locked(
self,
locked_storage_object: StorageObjectInfo,
):
"""
Lock object itself cannot be locked
"""
lock_object_info = locked_storage_object.locks[0]
wallet_path = locked_storage_object.wallet
with pytest.raises(Exception, match=LOCK_NON_REGULAR_OBJECT):
lock_object(
wallet_path,
lock_object_info.cid,
lock_object_info.oid,
self.shell,
self.cluster.default_rpc_endpoint,
1,
)
@allure.title(
"Lock must contain valid lifetime or expire_at field: (lifetime={wrong_lifetime}, expire-at={wrong_expire_at})"
)
# We operate with only lock object here so no complex object needed in this test
@pytest.mark.parametrize("object_size", ["simple"], indirect=True)
@pytest.mark.parametrize(
"wrong_lifetime,wrong_expire_at,expected_error",
[
(None, None, LIFETIME_REQUIRED),
(0, 0, LIFETIME_REQUIRED),
(0, None, LIFETIME_REQUIRED),
(None, 0, LIFETIME_REQUIRED),
(-1, None, 'invalid argument "-1" for "--lifetime" flag'),
(None, -1, 'invalid argument "-1" for "-e, --expire-at" flag'),
],
)
def test_cannot_lock_object_without_lifetime(
self,
locked_storage_object: StorageObjectInfo,
wrong_lifetime: int,
wrong_expire_at: int,
expected_error: str,
):
"""
Cannot lock object without lifetime and expire_at fields
"""
lock_object_info = locked_storage_object.locks[0]
wallet_path = locked_storage_object.wallet
with pytest.raises(Exception, match=expected_error):
lock_object(
wallet_path,
lock_object_info.cid,
lock_object_info.oid,
self.shell,
self.cluster.default_rpc_endpoint,
lifetime=wrong_lifetime,
expire_at=wrong_expire_at,
)
@pytest.mark.sanity
@allure.title("Expired object is deleted when locks are expired (obj_size={object_size})")
def test_expired_object_should_be_deleted_after_locks_are_expired(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
"""
Expired object should be deleted after locks are expired
"""
current_epoch = self.ensure_fresh_epoch()
storage_object = user_container.generate_object(object_size.value, expire_at=current_epoch + 1)
with reporter.step("Lock object for couple epochs"):
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
lifetime=2,
)
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 2,
)
with reporter.step("Check object is not deleted at expiration time"):
self.tick_epochs(2)
# Must wait to ensure object is not deleted
wait_for_gc_pass_on_storage_nodes()
with expect_not_raises():
head_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
with reporter.step("Wait for object to be deleted after third epoch"):
self.tick_epoch()
check_object_not_found(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Lock multiple objects at once (obj_size={object_size})")
def test_should_be_possible_to_lock_multiple_objects_at_once(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
"""
Should be possible to lock multiple objects at once
"""
current_epoch = ensure_fresh_epoch(self.shell, self.cluster)
storage_objects: list[StorageObjectInfo] = []
with reporter.step("Generate three objects"):
for _ in range(3):
storage_objects.append(user_container.generate_object(object_size.value, expire_at=current_epoch + 5))
lock_object(
storage_objects[0].wallet,
storage_objects[0].cid,
",".join([storage_object.oid for storage_object in storage_objects]),
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 1,
)
for storage_object in storage_objects:
with reporter.step(f"Try to delete object {storage_object.oid}"):
with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
delete_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
with reporter.step("Tick two epochs"):
self.tick_epoch()
self.tick_epoch()
with expect_not_raises():
delete_objects(storage_objects, self.shell, self.cluster)
@allure.title("Outdated lock cannot be applied (obj_size={object_size})")
def test_already_outdated_lock_should_not_be_applied(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
"""
Already outdated lock should not be applied
"""
current_epoch = self.ensure_fresh_epoch()
storage_object = user_container.generate_object(object_size.value, expire_at=current_epoch + 1)
expiration_epoch = current_epoch - 1
with pytest.raises(
Exception,
match=LOCK_OBJECT_EXPIRATION.format(expiration_epoch=expiration_epoch, current_epoch=current_epoch),
):
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=expiration_epoch,
)
@pytest.mark.sanity
@allure.title("Delete object when lock is expired by lifetime (obj_size={object_size})")
@expect_not_raises()
def test_after_lock_expiration_with_lifetime_user_should_be_able_to_delete_object(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
"""
After lock expiration with lifetime user should be able to delete object
"""
current_epoch = self.ensure_fresh_epoch()
storage_object = user_container.generate_object(object_size.value, expire_at=current_epoch + 5)
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
lifetime=1,
)
self.tick_epochs(2)
with expect_not_raises():
delete_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Delete object when lock is expired by expire_at (obj_size={object_size})")
@expect_not_raises()
def test_after_lock_expiration_with_expire_at_user_should_be_able_to_delete_object(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
"""
After lock expiration with expire_at user should be able to delete object
"""
current_epoch = self.ensure_fresh_epoch()
storage_object = user_container.generate_object(object_size.value, expire_at=current_epoch + 5)
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
endpoint=self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 1,
)
self.tick_epochs(2)
with expect_not_raises():
delete_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Complex object chunks are protected from deletion")
@pytest.mark.parametrize(
# Only complex objects are required for this test
"object_size",
["complex"],
indirect=True,
)
def test_complex_object_chunks_should_also_be_protected_from_deletion(
self,
locked_storage_object: StorageObjectInfo,
):
"""
Complex object chunks should also be protected from deletion
"""
chunk_object_ids = get_storage_object_chunks(locked_storage_object, self.shell, self.cluster)
for chunk_object_id in chunk_object_ids:
with reporter.step(f"Try to delete chunk object {chunk_object_id}"):
with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
delete_object(
locked_storage_object.wallet,
locked_storage_object.cid,
chunk_object_id,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Drop link object of locked complex object")
@pytest.mark.grpc_control
@pytest.mark.parametrize(
"object_size",
# Only complex object is required
["complex"],
indirect=True,
)
def test_link_object_of_locked_complex_object_can_be_dropped(self, new_locked_storage_object: StorageObjectInfo):
link_object_id = get_link_object(
new_locked_storage_object.wallet,
new_locked_storage_object.cid,
new_locked_storage_object.oid,
self.shell,
self.cluster.storage_nodes,
)
with reporter.step(f"Drop link object with id {link_object_id} from nodes"):
nodes_with_object = get_nodes_with_object(
new_locked_storage_object.cid,
link_object_id,
shell=self.shell,
nodes=self.cluster.storage_nodes,
)
for node in nodes_with_object:
with expect_not_raises():
drop_object(node, new_locked_storage_object.cid, link_object_id)
@allure.title("Drop chunks of locked complex object")
@pytest.mark.grpc_control
@pytest.mark.parametrize(
"object_size",
# Only complex object is required
["complex"],
indirect=True,
)
def test_chunks_of_locked_complex_object_can_be_dropped(self, new_locked_storage_object: StorageObjectInfo):
chunk_objects = get_storage_object_chunks(new_locked_storage_object, self.shell, self.cluster)
for chunk_object_id in chunk_objects:
with reporter.step(f"Drop chunk object with id {chunk_object_id} from nodes"):
nodes_with_object = get_nodes_with_object(
new_locked_storage_object.cid,
chunk_object_id,
shell=self.shell,
nodes=self.cluster.storage_nodes,
)
for node in nodes_with_object:
with expect_not_raises():
drop_object(node, new_locked_storage_object.cid, chunk_object_id)
@allure.title("Drop locked object (obj_size={object_size})")
@pytest.mark.grpc_control
def test_locked_object_can_be_dropped(self, new_locked_storage_object: StorageObjectInfo):
nodes_with_object = get_nodes_with_object(
new_locked_storage_object.cid,
new_locked_storage_object.oid,
shell=self.shell,
nodes=self.cluster.storage_nodes,
)
for node in nodes_with_object:
with expect_not_raises():
drop_object(node, new_locked_storage_object.cid, new_locked_storage_object.oid)
@allure.title("Link object of complex object is protected from deletion")
@pytest.mark.parametrize(
# Only complex objects are required for this test
"object_size",
["complex"],
indirect=True,
)
def test_link_object_of_complex_object_should_also_be_protected_from_deletion(
self,
locked_storage_object: StorageObjectInfo,
):
"""
Link object of complex object should also be protected from deletion
"""
link_object_id = get_link_object(
locked_storage_object.wallet,
locked_storage_object.cid,
locked_storage_object.oid,
self.shell,
self.cluster.storage_nodes,
is_direct=False,
)
with reporter.step(f"Try to delete link object {link_object_id}"):
with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
delete_object(
locked_storage_object.wallet,
locked_storage_object.cid,
link_object_id,
self.shell,
self.cluster.default_rpc_endpoint,
)
@allure.title("Expired object is removed after all locks are expired (obj_size={object_size})")
def test_expired_object_should_be_removed_after_relocks_expare_at(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
current_epoch = self.ensure_fresh_epoch()
storage_object = user_container.generate_object(object_size.value, expire_at=current_epoch + 1)
with reporter.step("Apply first lock to object for 3 epochs"):
lock_object_id_0 = lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 3,
)
self.tick_epochs(2)
with reporter.step("Check first lock is still available"):
verify_object_available(
storage_object.wallet,
storage_object.cid,
lock_object_id_0,
self.shell,
self.cluster.default_rpc_endpoint,
)
with reporter.step("Apply second lock to object for 3 more epochs"):
lock_object_id_1 = lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 5,
)
self.tick_epochs(2)
with reporter.step("Verify first lock is expired and removed"):
check_object_not_found(
storage_object.wallet,
storage_object.cid,
lock_object_id_0,
self.shell,
self.cluster.default_rpc_endpoint,
)
with reporter.step("Verify second lock is still available"):
verify_object_available(
storage_object.wallet,
storage_object.cid,
lock_object_id_1,
self.shell,
self.cluster.default_rpc_endpoint,
)
with reporter.step("Apply third lock to object for 3 more epochs"):
lock_object(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 7,
)
with reporter.step("Verify object is deleted after all locks are expired"):
self.tick_epochs(4)
check_object_not_found(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
@pytest.mark.sanity
@allure.title("Two expired objects with one lock are deleted after lock expiration (obj_size={object_size})")
def test_two_objects_expiration_with_one_lock(
self,
user_container: StorageContainer,
object_size: ObjectSize,
):
current_epoch = self.ensure_fresh_epoch()
storage_objects: list[StorageObjectInfo] = []
with reporter.step("Generate two objects"):
for epoch_i in range(2):
storage_objects.append(
user_container.generate_object(object_size.value, expire_at=current_epoch + epoch_i + 3)
)
self.tick_epoch()
with reporter.step("Lock objects for 4 epochs"):
lock_object(
storage_objects[0].wallet,
storage_objects[0].cid,
",".join([storage_object.oid for storage_object in storage_objects]),
self.shell,
self.cluster.default_rpc_endpoint,
expire_at=current_epoch + 4,
)
with reporter.step("Verify objects are available during next three epochs"):
for epoch_i in range(3):
self.tick_epoch()
with reporter.step(f"Check objects at epoch {current_epoch + epoch_i + 2}"):
for storage_object in storage_objects:
verify_object_available(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)
with reporter.step("Verify objects are deleted after lock was expired"):
self.tick_epoch()
for storage_object in storage_objects:
check_object_not_found(
storage_object.wallet,
storage_object.cid,
storage_object.oid,
self.shell,
self.cluster.default_rpc_endpoint,
)