Add ACL and eACL PyTest tests
Signed-off-by: Vladimir Avdeev <v.avdeev@yadro.com>
This commit is contained in:
parent
590a5cfb0e
commit
6d040c6834
36 changed files with 979 additions and 1318 deletions
70
robot/resources/lib/python_keywords/container_access.py
Normal file
70
robot/resources/lib/python_keywords/container_access.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
from typing import List, Optional
|
||||
|
||||
from acl import EACLOperation
|
||||
from python_keywords.object_access import (can_get_object, can_put_object, can_delete_object, can_get_head_object,
|
||||
can_get_range_hash_of_object, can_get_range_of_object, can_search_object)
|
||||
|
||||
|
||||
def check_full_access_to_container(wallet: str, cid: str, oid: str, file_name: str,
|
||||
bearer: Optional[str] = None, wallet_config: Optional[str] = None,
|
||||
xhdr: Optional[dict] = None):
|
||||
assert can_put_object(wallet, cid, file_name, bearer, wallet_config, xhdr)
|
||||
assert can_get_head_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert can_get_range_of_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert can_get_range_hash_of_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert can_search_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert can_get_object(wallet, cid, oid, file_name, bearer, wallet_config, xhdr)
|
||||
assert can_delete_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
|
||||
|
||||
def check_no_access_to_container(wallet: str, cid: str, oid: str, file_name: str,
|
||||
bearer: Optional[str] = None, wallet_config: Optional[str] = None,
|
||||
xhdr: Optional[dict] = None):
|
||||
assert not can_put_object(wallet, cid, file_name, bearer, wallet_config, xhdr)
|
||||
assert not can_get_head_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert not can_get_range_of_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert not can_get_range_hash_of_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert not can_search_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
assert not can_get_object(wallet, cid, oid, file_name, bearer, wallet_config, xhdr)
|
||||
assert not can_delete_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
|
||||
|
||||
def check_custom_access_to_container(wallet: str, cid: str, oid: str, file_name: str,
|
||||
deny_operations: Optional[List[EACLOperation]] = None,
|
||||
ignore_operations: Optional[List[EACLOperation]] = None,
|
||||
bearer: Optional[str] = None, wallet_config: Optional[str] = None,
|
||||
xhdr: Optional[dict] = None):
|
||||
deny_operations = [op.value for op in deny_operations or []]
|
||||
ignore_operations = [op.value for op in ignore_operations or []]
|
||||
checks: dict = {}
|
||||
if EACLOperation.PUT.value not in ignore_operations:
|
||||
checks[EACLOperation.PUT.value] = can_put_object(wallet, cid, file_name, bearer, wallet_config, xhdr)
|
||||
if EACLOperation.HEAD.value not in ignore_operations:
|
||||
checks[EACLOperation.HEAD.value] = can_get_head_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
if EACLOperation.GET_RANGE.value not in ignore_operations:
|
||||
checks[EACLOperation.GET_RANGE.value] = can_get_range_of_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
if EACLOperation.GET_RANGE_HASH.value not in ignore_operations:
|
||||
checks[EACLOperation.GET_RANGE_HASH.value] = can_get_range_hash_of_object(wallet, cid, oid, bearer,
|
||||
wallet_config, xhdr)
|
||||
if EACLOperation.SEARCH.value not in ignore_operations:
|
||||
checks[EACLOperation.SEARCH.value] = can_search_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
if EACLOperation.GET.value not in ignore_operations:
|
||||
checks[EACLOperation.GET.value] = can_get_object(wallet, cid, oid, file_name, bearer, wallet_config, xhdr)
|
||||
if EACLOperation.DELETE.value not in ignore_operations:
|
||||
checks[EACLOperation.DELETE.value] = can_delete_object(wallet, cid, oid, bearer, wallet_config, xhdr)
|
||||
|
||||
failed_checks = (
|
||||
[f'allowed {action} failed' for action, success in checks.items() if
|
||||
not success and action not in deny_operations] +
|
||||
[f'denied {action} succeeded' for action, success in checks.items() if
|
||||
success and action in deny_operations])
|
||||
|
||||
assert not failed_checks, ", ".join(failed_checks)
|
||||
|
||||
|
||||
def check_read_only_container(wallet: str, cid: str, oid: str, file_name: str,
|
||||
bearer: Optional[str] = None, wallet_config: Optional[str] = None,
|
||||
xhdr: Optional[dict] = None):
|
||||
return check_custom_access_to_container(wallet, cid, oid, file_name,
|
||||
deny_operations=[EACLOperation.PUT, EACLOperation.DELETE],
|
||||
bearer=bearer, wallet_config=wallet_config, xhdr=xhdr)
|
Loading…
Add table
Add a link
Reference in a new issue