a.berezin
b8d5706515
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
Signed-off-by: a.berezin <a.berezin@yadro.com>
90 lines
3 KiB
Python
90 lines
3 KiB
Python
import os
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.credentials.interfaces import CredentialsProvider, User
|
|
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
|
from frostfs_testlib.shell import Shell
|
|
from frostfs_testlib.steps.cli.container import create_container
|
|
from frostfs_testlib.steps.cli.object import put_object_to_random_node
|
|
from frostfs_testlib.storage.cluster import Cluster
|
|
from frostfs_testlib.storage.dataclasses.acl import EACLRole
|
|
from frostfs_testlib.storage.dataclasses.frostfs_services import InnerRing, StorageNode
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
|
|
OBJECT_COUNT = 5
|
|
|
|
|
|
@dataclass
|
|
class Wallets:
|
|
wallets: dict[EACLRole, list[WalletInfo]]
|
|
|
|
def get_wallet(self, role: EACLRole = EACLRole.USER) -> WalletInfo:
|
|
return self.wallets[role][0]
|
|
|
|
def get_wallets_list(self, role: EACLRole = EACLRole.USER) -> list[WalletInfo]:
|
|
return self.wallets[role]
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def wallets(default_wallet: WalletInfo, credentials_provider: CredentialsProvider, cluster: Cluster) -> Wallets:
|
|
other_wallets: list = []
|
|
for _ in range(2):
|
|
user = User(f"user_{hex(int(datetime.now().timestamp() * 1000000))}")
|
|
other_wallets.append(credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0]))
|
|
|
|
ir_node: InnerRing = cluster.ir_nodes[0]
|
|
storage_node: StorageNode = cluster.storage_nodes[0]
|
|
|
|
wallets_collection = Wallets(
|
|
wallets={
|
|
EACLRole.USER: [default_wallet],
|
|
EACLRole.OTHERS: other_wallets,
|
|
EACLRole.SYSTEM: [
|
|
WalletInfo.from_node(ir_node),
|
|
WalletInfo.from_node(storage_node),
|
|
],
|
|
}
|
|
)
|
|
|
|
for role, wallets in wallets_collection.wallets.items():
|
|
if role == EACLRole.SYSTEM:
|
|
continue
|
|
for wallet in wallets:
|
|
reporter.attach(wallet.path, os.path.basename(wallet.path))
|
|
|
|
return wallets_collection
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def eacl_container_with_objects(
|
|
wallets: Wallets, client_shell: Shell, cluster: Cluster, file_path: str
|
|
) -> tuple[str, list[str], str]:
|
|
user_wallet = wallets.get_wallet()
|
|
with reporter.step("Create eACL public container"):
|
|
cid = create_container(
|
|
user_wallet,
|
|
basic_acl=PUBLIC_ACL,
|
|
shell=client_shell,
|
|
endpoint=cluster.default_rpc_endpoint,
|
|
)
|
|
|
|
with reporter.step("Add test objects to container"):
|
|
objects_oids = [
|
|
put_object_to_random_node(
|
|
user_wallet,
|
|
file_path,
|
|
cid,
|
|
attributes={"key1": "val1", "key": val, "key2": "abc"},
|
|
shell=client_shell,
|
|
cluster=cluster,
|
|
)
|
|
for val in range(OBJECT_COUNT)
|
|
]
|
|
|
|
yield cid, objects_oids, file_path
|
|
|
|
# with reporter.step('Delete eACL public container'):
|
|
# delete_container(user_wallet, cid)
|