213 lines
8.3 KiB
Python
213 lines
8.3 KiB
Python
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.steps.acl import create_eacl, form_bearertoken_file, set_eacl, wait_for_cache_expired
|
|
from frostfs_testlib.storage.dataclasses.acl import EACLAccess, EACLOperation, EACLRole, EACLRule
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
|
|
from pytest_tests.helpers.container_access import (
|
|
check_custom_access_to_container,
|
|
check_full_access_to_container,
|
|
check_no_access_to_container,
|
|
)
|
|
from pytest_tests.testsuites.acl.conftest import Wallets
|
|
|
|
|
|
@pytest.mark.sanity
|
|
@pytest.mark.acl
|
|
@pytest.mark.acl_bearer
|
|
class TestACLBearer(ClusterTestBase):
|
|
@allure.title("Operations with BearerToken (role={role.value}, obj_size={object_size})")
|
|
@pytest.mark.parametrize("role", [EACLRole.USER, EACLRole.OTHERS])
|
|
def test_bearer_token_operations(
|
|
self,
|
|
wallets: Wallets,
|
|
eacl_container_with_objects: tuple[str, list[str], str],
|
|
role: EACLRole,
|
|
):
|
|
cid, objects_oids, file_path = eacl_container_with_objects
|
|
user_wallet = wallets.get_wallet()
|
|
deny_wallet = wallets.get_wallet(role)
|
|
endpoint = self.cluster.default_rpc_endpoint
|
|
|
|
with reporter.step(f"Check {role.value} has full access to container without bearer token"):
|
|
check_full_access_to_container(
|
|
deny_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
wallet_config=deny_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
with reporter.step(f"Set deny all operations for {role.value} via eACL"):
|
|
eacl = [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in EACLOperation]
|
|
eacl_file = create_eacl(cid, eacl, shell=self.shell)
|
|
set_eacl(user_wallet.wallet_path, cid, eacl_file, shell=self.shell, endpoint=endpoint)
|
|
wait_for_cache_expired()
|
|
|
|
with reporter.step(f"Create bearer token for {role.value} with all operations allowed"):
|
|
bearer = form_bearertoken_file(
|
|
user_wallet.wallet_path,
|
|
cid,
|
|
[EACLRule(operation=op, access=EACLAccess.ALLOW, role=role) for op in EACLOperation],
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
with reporter.step(f"Check {role.value} without token has no access to all operations with container"):
|
|
check_no_access_to_container(
|
|
deny_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
wallet_config=deny_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
with reporter.step(f"Check {role.value} with token has access to all operations with container"):
|
|
check_full_access_to_container(
|
|
deny_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
bearer=bearer,
|
|
wallet_config=deny_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
with reporter.step(f"Set allow all operations for {role.value} via eACL"):
|
|
eacl = [EACLRule(access=EACLAccess.ALLOW, role=role, operation=op) for op in EACLOperation]
|
|
eacl_file = create_eacl(cid, eacl, shell=self.shell)
|
|
set_eacl(user_wallet.wallet_path, cid, eacl_file, shell=self.shell, endpoint=endpoint)
|
|
wait_for_cache_expired()
|
|
|
|
with reporter.step(f"Check {role.value} without token has access to all operations with container"):
|
|
check_full_access_to_container(
|
|
deny_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
wallet_config=deny_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
@allure.title("BearerToken for compound operations (obj_size={object_size})")
|
|
def test_bearer_token_compound_operations(self, wallets, eacl_container_with_objects):
|
|
endpoint = self.cluster.default_rpc_endpoint
|
|
cid, objects_oids, file_path = eacl_container_with_objects
|
|
user_wallet = wallets.get_wallet()
|
|
other_wallet = wallets.get_wallet(role=EACLRole.OTHERS)
|
|
|
|
# Operations that we will deny for each role via eACL
|
|
deny_map = {
|
|
EACLRole.USER: [EACLOperation.DELETE],
|
|
EACLRole.OTHERS: [
|
|
EACLOperation.SEARCH,
|
|
EACLOperation.GET_RANGE_HASH,
|
|
EACLOperation.GET_RANGE,
|
|
],
|
|
}
|
|
|
|
# Operations that we will allow for each role with bearer token
|
|
bearer_map = {
|
|
EACLRole.USER: [
|
|
EACLOperation.DELETE,
|
|
EACLOperation.PUT,
|
|
EACLOperation.GET_RANGE,
|
|
],
|
|
EACLRole.OTHERS: [
|
|
EACLOperation.GET,
|
|
EACLOperation.GET_RANGE,
|
|
EACLOperation.GET_RANGE_HASH,
|
|
],
|
|
}
|
|
|
|
deny_map_with_bearer = {
|
|
EACLRole.USER: [op for op in deny_map[EACLRole.USER] if op not in bearer_map[EACLRole.USER]],
|
|
EACLRole.OTHERS: [op for op in deny_map[EACLRole.OTHERS] if op not in bearer_map[EACLRole.OTHERS]],
|
|
}
|
|
|
|
eacl_deny = []
|
|
for role, operations in deny_map.items():
|
|
eacl_deny += [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in operations]
|
|
set_eacl(
|
|
user_wallet.wallet_path,
|
|
cid,
|
|
eacl_table_path=create_eacl(cid, eacl_deny, shell=self.shell),
|
|
shell=self.shell,
|
|
endpoint=endpoint,
|
|
)
|
|
wait_for_cache_expired()
|
|
|
|
with reporter.step("Check rule consistency without bearer"):
|
|
check_custom_access_to_container(
|
|
user_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
deny_operations=deny_map[EACLRole.USER],
|
|
wallet_config=user_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
check_custom_access_to_container(
|
|
other_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
deny_operations=deny_map[EACLRole.OTHERS],
|
|
wallet_config=other_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
|
|
with reporter.step("Check rule consistency using bearer token"):
|
|
bearer_user = form_bearertoken_file(
|
|
user_wallet.wallet_path,
|
|
cid,
|
|
[
|
|
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
|
|
for op in bearer_map[EACLRole.USER]
|
|
],
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
bearer_other = form_bearertoken_file(
|
|
user_wallet.wallet_path,
|
|
cid,
|
|
[
|
|
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS)
|
|
for op in bearer_map[EACLRole.OTHERS]
|
|
],
|
|
shell=self.shell,
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
check_custom_access_to_container(
|
|
user_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
deny_operations=deny_map_with_bearer[EACLRole.USER],
|
|
bearer=bearer_user,
|
|
wallet_config=user_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|
|
check_custom_access_to_container(
|
|
other_wallet.wallet_path,
|
|
cid,
|
|
objects_oids.pop(),
|
|
file_path,
|
|
deny_operations=deny_map_with_bearer[EACLRole.OTHERS],
|
|
bearer=bearer_other,
|
|
wallet_config=other_wallet.config_path,
|
|
shell=self.shell,
|
|
cluster=self.cluster,
|
|
)
|