frostfs-testcases/pytest_tests/testsuites/acl/conftest.py

113 lines
3.8 KiB
Python
Raw Normal View History

import os
import uuid
from dataclasses import dataclass
from typing import Optional
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, DEFAULT_WALLET_PASS
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import put_object_to_random_node
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.acl import EACLRole
from frostfs_testlib.storage.dataclasses.frostfs_services import InnerRing, StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.utils import wallet_utils
from frostfs_testlib.utils.file_utils import generate_file
OBJECT_COUNT = 5
@dataclass
class Wallet:
wallet_path: Optional[str] = None
config_path: Optional[str] = None
@dataclass
class Wallets:
wallets: dict[EACLRole, list[Wallet]]
def get_wallet(self, role: EACLRole = EACLRole.USER) -> Wallet:
return self.wallets[role][0]
def get_wallets_list(self, role: EACLRole = EACLRole.USER) -> list[Wallet]:
return self.wallets[role]
@pytest.fixture(scope="module")
def wallets(default_wallet: str, temp_directory: str, cluster: Cluster) -> Wallets:
other_wallets_paths = [os.path.join(temp_directory, f"{str(uuid.uuid4())}.json") for _ in range(2)]
for other_wallet_path in other_wallets_paths:
wallet_utils.init_wallet(other_wallet_path, DEFAULT_WALLET_PASS)
ir_node: InnerRing = cluster.ir_nodes[0]
storage_node: StorageNode = cluster.storage_nodes[0]
ir_wallet_path = ir_node.get_wallet_path()
ir_wallet_config = ir_node.get_wallet_config_path()
storage_wallet_path = storage_node.get_wallet_path()
storage_wallet_config = storage_node.get_wallet_config_path()
wallets_collection = Wallets(
wallets={
EACLRole.USER: [Wallet(wallet_path=default_wallet, config_path=DEFAULT_WALLET_CONFIG)],
EACLRole.OTHERS: [
Wallet(wallet_path=other_wallet_path, config_path=DEFAULT_WALLET_CONFIG)
for other_wallet_path in other_wallets_paths
],
EACLRole.SYSTEM: [
Wallet(wallet_path=ir_wallet_path, config_path=ir_wallet_config),
Wallet(wallet_path=storage_wallet_path, config_path=storage_wallet_config),
],
}
)
for role, wallets in wallets_collection.wallets.items():
if role == EACLRole.SYSTEM:
continue
for wallet in wallets:
reporter.attach(wallet.wallet_path, os.path.basename(wallet.wallet_path))
return wallets_collection
@pytest.fixture()
2023-09-08 10:35:34 +00:00
def file_path(object_size: ObjectSize) -> str:
yield generate_file(object_size.value)
@pytest.fixture(scope="function")
def eacl_container_with_objects(
wallets: Wallets, client_shell: Shell, cluster: Cluster, file_path: str
) -> tuple[str, list[str], str]:
user_wallet = wallets.get_wallet()
with reporter.step("Create eACL public container"):
cid = create_container(
user_wallet.wallet_path,
basic_acl=PUBLIC_ACL,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)
with reporter.step("Add test objects to container"):
objects_oids = [
put_object_to_random_node(
user_wallet.wallet_path,
file_path,
cid,
attributes={"key1": "val1", "key": val, "key2": "abc"},
shell=client_shell,
cluster=cluster,
)
for val in range(OBJECT_COUNT)
]
yield cid, objects_oids, file_path
# with reporter.step('Delete eACL public container'):
# delete_container(user_wallet, cid)