190 lines
8 KiB
Python
190 lines
8 KiB
Python
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
|
|
from frostfs_testlib.storage.dataclasses import ape
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
|
|
from pytest_tests.helpers.bearer_token import create_bearer_token
|
|
from pytest_tests.helpers.container_access import (
|
|
ALL_OBJECT_OPERATIONS,
|
|
assert_access_to_container,
|
|
assert_full_access_to_container,
|
|
assert_no_access_to_container,
|
|
)
|
|
|
|
|
|
@pytest.mark.sanity
|
|
@pytest.mark.bearer
|
|
@pytest.mark.ape
|
|
class TestApeBearer(ClusterTestBase):
|
|
@allure.title("Operations with BearerToken (role={role}, obj_size={object_size})")
|
|
@pytest.mark.parametrize("role", [ape.Role.OWNER, ape.Role.OTHERS], indirect=True)
|
|
def test_bearer_token_operations(
|
|
self,
|
|
container_with_objects: tuple[str, list[str], str],
|
|
frostfs_cli: FrostfsCli,
|
|
temp_directory: str,
|
|
test_wallet: WalletInfo,
|
|
role: ape.Role,
|
|
):
|
|
cid, objects_oids, file_path = container_with_objects
|
|
endpoint = self.cluster.default_rpc_endpoint
|
|
|
|
with reporter.step(f"Check {role} has full access to container without bearer token"):
|
|
assert_full_access_to_container(test_wallet, cid, objects_oids.pop(), file_path, self.shell, self.cluster)
|
|
|
|
with reporter.step(f"Deny all operations for everyone via APE"):
|
|
rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS)
|
|
frostfs_cli.ape_manager.add(endpoint, rule.chain_id, target_name=cid, target_type="container", rule=rule.as_string())
|
|
|
|
with reporter.step("Wait for one block"):
|
|
self.wait_for_blocks()
|
|
|
|
with reporter.step(f"Create bearer token with all operations allowed"):
|
|
bearer = create_bearer_token(
|
|
frostfs_cli,
|
|
temp_directory,
|
|
cid,
|
|
rule=ape.Rule(ape.Verb.ALLOW, ALL_OBJECT_OPERATIONS),
|
|
endpoint=self.cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
with reporter.step(f"Check {role} without token has no access to all operations with container"):
|
|
assert_no_access_to_container(test_wallet, cid, objects_oids.pop(), file_path, self.shell, self.cluster)
|
|
|
|
with reporter.step(f"Check {role} with token has access to all operations with container"):
|
|
assert_full_access_to_container(test_wallet, cid, objects_oids.pop(), file_path, self.shell, self.cluster, bearer)
|
|
|
|
with reporter.step(f"Remove deny rule from APE"):
|
|
frostfs_cli.ape_manager.remove(endpoint, rule.chain_id, target_name=cid, target_type="container")
|
|
|
|
with reporter.step("Wait for one block"):
|
|
self.wait_for_blocks()
|
|
|
|
with reporter.step(f"Check {role} without token has access to all operations with container"):
|
|
assert_full_access_to_container(test_wallet, cid, objects_oids.pop(), file_path, self.shell, self.cluster)
|
|
|
|
@allure.title("BearerToken for compound operations (obj_size={object_size})")
|
|
def test_bearer_token_compound_operations(
|
|
self,
|
|
frostfs_cli: FrostfsCli,
|
|
temp_directory: str,
|
|
default_wallet: WalletInfo,
|
|
other_wallet: WalletInfo,
|
|
container_with_objects: tuple[str, list[str], str],
|
|
):
|
|
"""
|
|
Bearer Token COMPLETLY overrides chains set for the specific target.
|
|
Thus, any restictions or permissions should be explicitly defined in BT.
|
|
"""
|
|
|
|
endpoint = self.cluster.default_rpc_endpoint
|
|
cid, objects_oids, file_path = container_with_objects
|
|
|
|
wallets_map = {
|
|
ape.Role.OWNER: default_wallet,
|
|
ape.Role.OTHERS: other_wallet,
|
|
}
|
|
|
|
access_map = {
|
|
ape.Role.OWNER: {
|
|
ape.ObjectOperations.PUT: True,
|
|
ape.ObjectOperations.GET: True,
|
|
ape.ObjectOperations.HEAD: True,
|
|
ape.ObjectOperations.GET_RANGE: True,
|
|
ape.ObjectOperations.GET_RANGE_HASH: True,
|
|
ape.ObjectOperations.SEARCH: True,
|
|
ape.ObjectOperations.DELETE: False,
|
|
},
|
|
ape.Role.OTHERS: {
|
|
ape.ObjectOperations.PUT: True,
|
|
ape.ObjectOperations.GET: True,
|
|
ape.ObjectOperations.HEAD: True,
|
|
ape.ObjectOperations.GET_RANGE: False,
|
|
ape.ObjectOperations.GET_RANGE_HASH: False,
|
|
ape.ObjectOperations.SEARCH: False,
|
|
ape.ObjectOperations.DELETE: True,
|
|
},
|
|
}
|
|
|
|
bt_access_map = {
|
|
ape.Role.OWNER: {
|
|
ape.ObjectOperations.PUT: True,
|
|
ape.ObjectOperations.GET: True,
|
|
ape.ObjectOperations.HEAD: True,
|
|
ape.ObjectOperations.GET_RANGE: True,
|
|
ape.ObjectOperations.GET_RANGE_HASH: True,
|
|
ape.ObjectOperations.SEARCH: True,
|
|
ape.ObjectOperations.DELETE: True,
|
|
},
|
|
ape.Role.OTHERS: {
|
|
ape.ObjectOperations.PUT: True,
|
|
ape.ObjectOperations.GET: False,
|
|
ape.ObjectOperations.HEAD: True,
|
|
ape.ObjectOperations.GET_RANGE: False,
|
|
ape.ObjectOperations.GET_RANGE_HASH: False,
|
|
# Although SEARCH is denied by the APE chain defined in Policy contract,
|
|
# Bearer Token COMPLETLY overrides chains set for the specific target.
|
|
# Thus, any restictions or permissions should be explicitly defined in BT.
|
|
ape.ObjectOperations.SEARCH: True,
|
|
ape.ObjectOperations.DELETE: True,
|
|
},
|
|
}
|
|
|
|
# Operations that we will deny for each role via APE
|
|
deny_map = {
|
|
ape.Role.OWNER: [ape.ObjectOperations.DELETE],
|
|
ape.Role.OTHERS: [
|
|
ape.ObjectOperations.SEARCH,
|
|
ape.ObjectOperations.GET_RANGE_HASH,
|
|
ape.ObjectOperations.GET_RANGE,
|
|
],
|
|
}
|
|
|
|
# Operations that we will allow for each role with bearer token
|
|
bearer_map = {
|
|
ape.Role.OWNER: [
|
|
ape.ObjectOperations.DELETE,
|
|
ape.ObjectOperations.PUT,
|
|
ape.ObjectOperations.GET_RANGE,
|
|
],
|
|
ape.Role.OTHERS: [
|
|
ape.ObjectOperations.GET,
|
|
ape.ObjectOperations.GET_RANGE,
|
|
ape.ObjectOperations.GET_RANGE_HASH,
|
|
],
|
|
}
|
|
|
|
conditions_map = {
|
|
ape.Role.OWNER: ape.Condition.by_role(ape.Role.OWNER),
|
|
ape.Role.OTHERS: ape.Condition.by_role(ape.Role.OTHERS),
|
|
}
|
|
|
|
verb_map = {ape.Role.OWNER: ape.Verb.ALLOW, ape.Role.OTHERS: ape.Verb.DENY}
|
|
|
|
for role, operations in deny_map.items():
|
|
with reporter.step(f"Add APE deny rule for {role}"):
|
|
rule = ape.Rule(ape.Verb.DENY, operations, conditions_map[role])
|
|
frostfs_cli.ape_manager.add(endpoint, rule.chain_id, target_name=cid, target_type="container", rule=rule.as_string())
|
|
|
|
with reporter.step("Wait for one block"):
|
|
self.wait_for_blocks()
|
|
|
|
for role, wallet in wallets_map.items():
|
|
with reporter.step(f"Assert access to container without bearer token for {role}"):
|
|
assert_access_to_container(access_map[role], wallet, cid, objects_oids.pop(), file_path, self.shell, self.cluster)
|
|
|
|
bearer_tokens = {}
|
|
for role in wallets_map.keys():
|
|
with reporter.step(f"Create bearer token for {role}"):
|
|
rule = ape.Rule(verb_map[role], bearer_map[role], conditions_map[role])
|
|
bt = create_bearer_token(frostfs_cli, temp_directory, cid, rule, endpoint)
|
|
bearer_tokens[role] = bt
|
|
|
|
for role, wallet in wallets_map.items():
|
|
with reporter.step(f"Assert access to container with bearer token for {role}"):
|
|
assert_access_to_container(
|
|
bt_access_map[role], wallet, cid, objects_oids.pop(), file_path, self.shell, self.cluster, bearer_tokens[role]
|
|
)
|