frostfs-testcases/pytest_tests/testsuites/access/ape/test_ape.py
Kirill Sosnovskikh 3c26f892ac
All checks were successful
DCO check / DCO (pull_request) Successful in 2m53s
[#353] Extend testsuites for PATCH method
Expandable suites:
- TestApeContainer
- TestApeBearer
- TestApeLocalOverrideAllow
- TestApeLocalOverrideDeny
- TestObjectApiWithoutUser
- TestObjectApiWithBearerToken

Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2024-12-25 12:33:45 +03:00

397 lines
19 KiB
Python

import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.resources.error_patterns import OBJECT_ACCESS_DENIED
from frostfs_testlib.steps.cli.object import put_object_to_random_node
from frostfs_testlib.steps.node_management import drop_object
from frostfs_testlib.storage.dataclasses import ape
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import wallet_utils
from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import TestFile
from ....helpers.container_access import (
ALL_OBJECT_OPERATIONS,
assert_access_to_container,
assert_full_access_to_container,
assert_no_access_to_container,
)
from ....helpers.container_request import APE_EVERYONE_ALLOW_ALL, OWNER_ALLOW_ALL, ContainerRequest
@pytest.fixture
def denied_wallet(default_wallet: WalletInfo, other_wallet: WalletInfo, role: ape.Role) -> WalletInfo:
return other_wallet if role == ape.Role.OTHERS else default_wallet
@pytest.fixture
def allowed_wallet(default_wallet: WalletInfo, other_wallet: WalletInfo, role: ape.Role) -> WalletInfo:
return default_wallet if role == ape.Role.OTHERS else other_wallet
@pytest.mark.nightly
@pytest.mark.ape
class TestApeContainer(ClusterTestBase):
# TODO: Without PATCH operation,
# since it requires specific permissions that do not apply when testing all operations at once
@pytest.mark.sanity
@allure.title("Deny operations via APE by role (role={role}, obj_size={object_size})")
@pytest.mark.parametrize("role", [ape.Role.OWNER, ape.Role.OTHERS], indirect=True)
@pytest.mark.parametrize("objects", [4], indirect=True)
def test_deny_operations_via_ape_by_role(
self,
denied_wallet: WalletInfo,
allowed_wallet: WalletInfo,
frostfs_cli: FrostfsCli,
container: str,
objects: list[str],
role: ape.Role,
test_file: TestFile,
rpc_endpoint: str,
):
with reporter.step(f"Deny all operations for {role} via APE"):
deny_rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, ape.Condition.by_role(role.value))
frostfs_cli.ape_manager.add(
rpc_endpoint, deny_rule.chain_id, target_name=container, target_type="container", rule=deny_rule.as_string()
)
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step(f"Assert denied role have no access to public container"):
# access checks will try to remove object, so we use .pop() to ensure we have object before deletion
assert_no_access_to_container(denied_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step(f"Assert allowed role have full access to public container"):
assert_full_access_to_container(allowed_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step(f"Remove deny rule from APE"):
frostfs_cli.ape_manager.remove(rpc_endpoint, deny_rule.chain_id, target_name=container, target_type="container")
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Assert allowed role have full access to public container"):
assert_full_access_to_container(allowed_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step("Assert denied role have full access to public container"):
assert_full_access_to_container(denied_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
# TODO: Without PATCH operation,
# since it requires specific permissions that do not apply when testing all operations at once
@allure.title("Deny operations for others via APE excluding single pubkey (obj_size={object_size})")
@pytest.mark.parametrize("objects", [2], indirect=True)
def test_deny_opeartions_excluding_pubkey(
self,
frostfs_cli: FrostfsCli,
default_wallet: WalletInfo,
other_wallet: WalletInfo,
other_wallet_2: WalletInfo,
container: str,
objects: list[str],
rpc_endpoint: str,
test_file: TestFile,
):
with reporter.step("Add deny APE rules for others except single wallet"):
rule_conditions = [
ape.Condition.by_role(ape.Role.OTHERS),
ape.Condition.by_key(
wallet_utils.get_wallet_public_key(other_wallet_2.path, other_wallet_2.password),
match_type=ape.MatchType.NOT_EQUAL,
),
]
rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, rule_conditions)
frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string())
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Assert others have no access to public container"):
# access checks will try to remove object, so we use .pop() to ensure we have object before deletion
assert_no_access_to_container(other_wallet, container, objects[0], test_file, self.shell, self.cluster)
with reporter.step("Assert owner have full access to public container"):
assert_full_access_to_container(default_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step("Assert allowed wallet have full access to public container"):
assert_full_access_to_container(other_wallet_2, container, objects.pop(), test_file, self.shell, self.cluster)
@allure.title("Replication works with APE deny rules on OWNER and OTHERS (obj_size={object_size})")
@pytest.mark.parametrize(
"container_request",
[ContainerRequest(f"REP %NODE_COUNT% IN X CBF 1 SELECT %NODE_COUNT% FROM * AS X", APE_EVERYONE_ALLOW_ALL, "custom")],
indirect=True,
)
def test_replication_works_with_deny_rules(
self,
default_wallet: WalletInfo,
frostfs_cli: FrostfsCli,
container: str,
rpc_endpoint: str,
test_file: TestFile,
):
with reporter.step("Put object to container"):
oid = put_object_to_random_node(default_wallet, test_file, container, self.shell, self.cluster)
with reporter.step("Wait for object replication after upload"):
wait_object_replication(container, oid, len(self.cluster.cluster_nodes), self.shell, self.cluster.storage_nodes)
with reporter.step("Add deny APE rules for owner and others"):
rule_conditions = [
ape.Condition.by_role(ape.Role.OWNER),
ape.Condition.by_role(ape.Role.OTHERS),
]
for rule_condition in rule_conditions:
rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, rule_condition)
frostfs_cli.ape_manager.add(
rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string()
)
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Drop object"):
drop_object(self.cluster.storage_nodes[0], container, oid)
with reporter.step("Wait for dropped object to be replicated"):
wait_object_replication(container, oid, len(self.cluster.storage_nodes), self.shell, self.cluster.storage_nodes)
# TODO: Without PATCH operation,
# since it requires specific permissions that do not apply when testing all operations at once
@allure.title("Deny operations via APE by role (role=ir, obj_size={object_size})")
@pytest.mark.parametrize("container_request", [OWNER_ALLOW_ALL], indirect=True)
@pytest.mark.parametrize("objects", [3], indirect=True)
def test_deny_operations_via_ape_by_role_ir(
self, frostfs_cli: FrostfsCli, ir_wallet: WalletInfo, container: str, objects: list[str], rpc_endpoint: str, test_file: TestFile
):
default_ir_access = {
ape.ObjectOperations.PUT: False,
ape.ObjectOperations.GET: True,
ape.ObjectOperations.HEAD: True,
ape.ObjectOperations.GET_RANGE: True,
ape.ObjectOperations.GET_RANGE_HASH: True,
ape.ObjectOperations.SEARCH: True,
ape.ObjectOperations.PATCH: False,
ape.ObjectOperations.DELETE: False,
}
with reporter.step("Assert IR wallet access in default state"):
assert_access_to_container(default_ir_access, ir_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step("Add deny APE rule with deny all operations for IR role"):
rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, [ape.Condition.by_role(ape.Role.IR.value)])
frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string())
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Assert IR wallet ignores APE rules"):
assert_access_to_container(default_ir_access, ir_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step("Remove APE rule"):
frostfs_cli.ape_manager.remove(rpc_endpoint, rule.chain_id, target_name=container, target_type="container")
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Assert IR wallet access is restored"):
assert_access_to_container(default_ir_access, ir_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
# TODO: Without PATCH operation,
# since it requires specific permissions that do not apply when testing all operations at once
@allure.title("Deny operations via APE by role (role=container, obj_size={object_size})")
@pytest.mark.parametrize("container_request", [OWNER_ALLOW_ALL], indirect=True)
@pytest.mark.parametrize("objects", [3], indirect=True)
def test_deny_operations_via_ape_by_role_container(
self,
frostfs_cli: FrostfsCli,
container_node_wallet: WalletInfo,
container: str,
objects: list[str],
rpc_endpoint: str,
test_file: TestFile,
):
access_matrix = {
ape.ObjectOperations.PUT: True,
ape.ObjectOperations.GET: True,
ape.ObjectOperations.HEAD: True,
ape.ObjectOperations.GET_RANGE: True,
ape.ObjectOperations.GET_RANGE_HASH: True,
ape.ObjectOperations.SEARCH: True,
ape.ObjectOperations.PATCH: True,
ape.ObjectOperations.DELETE: True,
}
with reporter.step("Assert CONTAINER wallet access in default state"):
assert_access_to_container(access_matrix, container_node_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
rule = ape.Rule(ape.Verb.DENY, ALL_OBJECT_OPERATIONS, ape.Condition.by_role(ape.Role.CONTAINER.value))
with reporter.step(f"Add APE rule with deny all operations for CONTAINER and IR roles"):
frostfs_cli.ape_manager.add(rpc_endpoint, rule.chain_id, target_name=container, target_type="container", rule=rule.as_string())
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Assert CONTAINER wallet ignores APE rule"):
assert_access_to_container(access_matrix, container_node_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
with reporter.step("Remove APE rule"):
frostfs_cli.ape_manager.remove(rpc_endpoint, rule.chain_id, target_name=container, target_type="container")
with reporter.step("Wait for one block"):
self.wait_for_blocks()
with reporter.step("Assert CONTAINER wallet access after rule was removed"):
assert_access_to_container(access_matrix, container_node_wallet, container, objects.pop(), test_file, self.shell, self.cluster)
# ^
@allure.title("Deny PATCH operation via APE (object_size={object_size})")
@pytest.mark.parametrize("objects", [1], indirect=True)
def test_patch_object_with_deny_rule(
self,
frostfs_cli: FrostfsCli,
grpc_client: GrpcClientWrapper,
grpc_client_with_other_wallet: GrpcClientWrapper,
grpc_client_with_container_wallet: GrpcClientWrapper,
grpc_client_with_ir_wallet: GrpcClientWrapper,
container: str,
objects: list[str],
test_file: TestFile,
):
patch_params = {
"cid": container,
"oid": objects[0],
"endpoint": self.cluster.default_rpc_endpoint,
"ranges": ["300:200"],
"payloads": [test_file],
"new_attrs": "owner=true",
"timeout": "200s",
}
with reporter.step("Check that PATCH is available with owner wallet"):
patched_oid = grpc_client.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is available with another wallet"):
patch_params["ranges"] = ["100:50"]
patch_params["new_attrs"] = "other=true"
patched_oid = grpc_client_with_other_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is available with container wallet"):
patch_params["ranges"] = ["600:0"]
patch_params["new_attrs"] = "container=true"
patched_oid = grpc_client_with_container_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is available with ir wallet"):
patch_params["ranges"] = ["0:1000"]
patch_params["new_attrs"] = "ir=true"
patched_oid = grpc_client_with_ir_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
rule = ape.Rule(ape.Verb.DENY, ape.ObjectOperations.PATCH)
with reporter.step("Add APE rule with deny PATCH operation"):
frostfs_cli.ape_manager.add(
self.cluster.default_rpc_endpoint,
rule.chain_id,
target_name=container,
target_type="container",
rule=rule.as_string(),
)
with reporter.step("Wait for one block"):
self.wait_for_blocks(1)
with reporter.step("Check that PATCH is not allowed with owner wallet"):
patch_params["ranges"] = ["300:200"]
patch_params["new_attrs"] = "owner_2=false"
with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED):
grpc_client.object.patch(**patch_params)
with reporter.step("Check that PATCH is not allowed with another wallet"):
patch_params["ranges"] = ["100:50"]
patch_params["new_attrs"] = "other_2=false"
with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED):
grpc_client_with_other_wallet.object.patch(**patch_params)
with reporter.step("Check that PATCH is allowed with container wallet as rule is ignored"):
patch_params["ranges"] = ["600:0"]
patch_params["new_attrs"] = "container_2=true"
patched_oid = grpc_client_with_container_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is not allowed with ir waller"):
patch_params["ranges"] = ["0:1000"]
patch_params["new_attrs"] = "ir_2=true"
with pytest.raises(Exception, match=OBJECT_ACCESS_DENIED):
grpc_client_with_ir_wallet.object.patch(**patch_params)
with reporter.step("Remove APE rule"):
frostfs_cli.ape_manager.remove(
self.cluster.default_rpc_endpoint,
rule.chain_id,
target_name=container,
target_type="container",
)
with reporter.step("Wait for one block"):
self.wait_for_blocks(1)
with reporter.step("Check that PATCH is available with owner wallet"):
patch_params["ranges"] = ["300:200"]
patch_params["new_attrs"] = "owner_3=true"
patched_oid = grpc_client.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is available with another wallet"):
patch_params["ranges"] = ["100:50"]
patch_params["new_attrs"] = "other_3=true"
patched_oid = grpc_client_with_other_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is available with container wallet"):
patch_params["ranges"] = ["600:0"]
patch_params["new_attrs"] = "container_3=true"
patched_oid = grpc_client_with_container_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
with reporter.step("Check that PATCH is available with ir wallet"):
patch_params["ranges"] = ["0:1000"]
patch_params["new_attrs"] = "ir_3=true"
patched_oid = grpc_client_with_ir_wallet.object.patch(**patch_params)
assert patched_oid != patch_params["oid"], "OID of patched object must be different from original one"
patch_params["oid"] = patched_oid
attrs = {"owner", "other", "container", "ir", "container_2", "owner_3", "other_3", "container_3", "ir_3"}
with reporter.step("Ensure that all attributes match expected values"):
object_info: dict = grpc_client.object.head(container, patch_params["oid"], self.cluster.default_rpc_endpoint)
object_attrs: dict = object_info["header"]["attributes"]
assert attrs <= {k for k in object_attrs.keys()}, f"Received attributes do not match expected ones: {object_attrs}"
assert all(
v == "true" for k, v in object_attrs.items() if k in attrs
), f"Received attributes do not match expected ones: {object_attrs}"