forked from TrueCloudLab/frostfs-testcases
151 lines
6.2 KiB
Python
151 lines
6.2 KiB
Python
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.resources.wellknown_acl import PRIVATE_ACL_F, PUBLIC_ACL_F, READONLY_ACL_F
|
|
from frostfs_testlib.shell import Shell
|
|
from frostfs_testlib.steps.cli.container import create_container
|
|
from frostfs_testlib.steps.cli.object import put_object_to_random_node
|
|
from frostfs_testlib.storage.dataclasses.acl import EACLRole
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
|
|
from pytest_tests.helpers.container_access import (
|
|
check_full_access_to_container,
|
|
check_no_access_to_container,
|
|
check_read_only_container,
|
|
)
|
|
from pytest_tests.testsuites.acl.conftest import Wallets
|
|
|
|
|
|
@pytest.mark.sanity
|
|
@pytest.mark.smoke
|
|
@pytest.mark.acl
|
|
@pytest.mark.acl_basic
|
|
class TestACLBasic(ClusterTestBase):
|
|
@pytest.fixture(scope="function")
|
|
def public_container(self, wallets: Wallets):
|
|
user_wallet = wallets.get_wallet()
|
|
with reporter.step("Create public container"):
|
|
cid_public = create_container(
|
|
user_wallet,
|
|
basic_acl=PUBLIC_ACL_F,
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
yield cid_public
|
|
|
|
# with reporter.step('Delete public container'):
|
|
# delete_container(user_wallet, cid_public)
|
|
|
|
@pytest.fixture(scope="function")
|
|
def private_container(self, wallets: Wallets):
|
|
user_wallet = wallets.get_wallet()
|
|
with reporter.step("Create private container"):
|
|
cid_private = create_container(
|
|
user_wallet,
|
|
basic_acl=PRIVATE_ACL_F,
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
yield cid_private
|
|
|
|
# with reporter.step('Delete private container'):
|
|
# delete_container(user_wallet, cid_private)
|
|
|
|
@pytest.fixture(scope="function")
|
|
def read_only_container(self, wallets: Wallets):
|
|
user_wallet = wallets.get_wallet()
|
|
with reporter.step("Create public readonly container"):
|
|
cid_read_only = create_container(
|
|
user_wallet,
|
|
basic_acl=READONLY_ACL_F,
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
yield cid_read_only
|
|
|
|
# with reporter.step('Delete public readonly container'):
|
|
# delete_container(user_wallet, cid_read_only)
|
|
|
|
@allure.title("Operations with basic ACL on public container (obj_size={object_size})")
|
|
def test_basic_acl_public(self, wallets: Wallets, public_container: str, file_path: str):
|
|
"""
|
|
Test basic ACL set during public container creation.
|
|
"""
|
|
user_wallet = wallets.get_wallet()
|
|
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
|
|
cid = public_container
|
|
for wallet, desc in ((user_wallet, "owner"), (other_wallet, "other users")):
|
|
with reporter.step("Add test objects to container"):
|
|
# We create new objects for each wallet because check_full_access_to_container
|
|
# deletes the object
|
|
owner_object_oid = put_object_to_random_node(
|
|
user_wallet,
|
|
file_path,
|
|
cid,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
attributes={"created": "owner"},
|
|
)
|
|
other_object_oid = put_object_to_random_node(
|
|
other_wallet,
|
|
file_path,
|
|
cid,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
attributes={"created": "other"},
|
|
)
|
|
with reporter.step(f"Check {desc} has full access to public container"):
|
|
check_full_access_to_container(
|
|
wallet,
|
|
cid,
|
|
owner_object_oid,
|
|
file_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
check_full_access_to_container(
|
|
wallet,
|
|
cid,
|
|
other_object_oid,
|
|
file_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
@allure.title("Operations with basic ACL on PRIVATE container (obj_size={object_size})")
|
|
def test_basic_acl_private(self, wallets: Wallets, private_container: str, file_path: str):
|
|
"""
|
|
Test basic ACL set during private container creation.
|
|
"""
|
|
user_wallet = wallets.get_wallet()
|
|
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
|
|
cid = private_container
|
|
with reporter.step("Add test objects to container"):
|
|
owner_object_oid = put_object_to_random_node(user_wallet, file_path, cid, self.shell, self.cluster)
|
|
|
|
with reporter.step("Check no one except owner has access to operations with container"):
|
|
check_no_access_to_container(other_wallet, cid, owner_object_oid, file_path, self.shell, self.cluster)
|
|
|
|
with reporter.step("Check owner has full access to private container"):
|
|
check_full_access_to_container(user_wallet, cid, owner_object_oid, file_path, self.shell, self.cluster)
|
|
|
|
@allure.title("Operations with basic ACL on READONLY container (obj_size={object_size})")
|
|
def test_basic_acl_readonly(self, wallets: Wallets, client_shell: Shell, read_only_container: str, file_path: str):
|
|
"""
|
|
Test basic ACL Operations for Read-Only Container.
|
|
"""
|
|
user_wallet = wallets.get_wallet()
|
|
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
|
|
cid = read_only_container
|
|
|
|
with reporter.step("Add test objects to container"):
|
|
object_oid = put_object_to_random_node(user_wallet, file_path, cid, client_shell, self.cluster)
|
|
|
|
with reporter.step("Check other has read-only access to operations with container"):
|
|
check_read_only_container(other_wallet, cid, object_oid, file_path, client_shell, self.cluster)
|
|
|
|
with reporter.step("Check owner has full access to public container"):
|
|
check_full_access_to_container(user_wallet, cid, object_oid, file_path, client_shell, self.cluster)
|