frostfs-testlib/src/frostfs_testlib/component_tests/fixtures.py
Vitaliy Potyarkin 211f9a0abd Implement fixtures for deploying FrostFS components
Exported from a private playground repo @ commit
ba8c88d7e11e8e8c17e54ca1317bc2dbf8b52204

Signed-off-by: Vitaliy Potyarkin <v.potyarkin@yadro.com>
2025-05-07 15:35:07 +03:00

707 lines
24 KiB
Python

"""
Reusable fixtures for deploying FrostFS components with all the dependencies.
"""
# TODO: This file is larger that desirable.
# TODO: If anyone knows how to break it into fixtures/base.py, fixtures/alphabet.py, fixtures/... - be my guest
import gzip
import importlib.resources
import json
import random
import re
import shlex
import shutil
import string
import subprocess
import tarfile
import tempfile
from base64 import b64decode
from collections.abc import Mapping
from enum import Enum
from itertools import chain
from pathlib import Path
from types import SimpleNamespace
from typing import List
from urllib.request import urlopen
import pytest
import yaml
from neo3.wallet.account import Account
from neo3.wallet.wallet import Wallet
from testcontainers.core.network import Network
from .container import ContainerizedService, ExecResult
_SCOPE = "session"
_PREFIX = "frostfs-"
glagolic = [
"az",
"buky",
"vedi",
"glagoli",
"dobro",
"yest",
"zhivete",
"dzelo",
"zemlja",
"izhe",
"izhei",
"gerv",
"kako",
"ljudi",
"mislete",
"nash",
"on",
"pokoj",
"rtsi",
"slovo",
"tverdo",
"uk",
]
class Component(Enum):
ADM = "adm"
ALPHABET = "alphabet"
CONTRACT = "contract"
HTTPGW = "httpgw"
INNERRING = "innerring"
LOCODE = "locode"
NEOGO = "neogo"
S3GW = "s3gw"
STORAGE = "storage"
def __str__(self):
return self.value
def __len__(self):
return len(self.value)
@pytest.fixture(scope=_SCOPE)
def _deployment(_deployment_dir) -> dict:
"""
Read deployment options from environment.
DO NOT REFERENCE THIS FIXTURE DIRECTLY FROM TESTS!
This fixture is to be referenced only from other *_deployment fixtures.
Runtime overrides will not be applied to _deployment() -
only to specific service fixtures, e.g. alphabet_deployment().
"""
with importlib.resources.path("frostfs_testlib.component_tests.templates", ".") as template_dir:
default = {
"dir": _deployment_dir,
"template": template_dir,
}
for service in Component:
default[f"{service}_dir"] = _deployment_dir / str(service)
default[f"{service}_template"] = template_dir / f"{service}.yml"
config = { # TODO: replace hardcoded values with reading from a config file
"adm_image": "git.frostfs.info/truecloudlab/frostfs-adm",
"adm_version": "0.44.9",
"alphabet_foo": "bar", # FIXME
"alphabet_node_count": 4,
"contract_archive_url": "https://git.frostfs.info/TrueCloudLab/frostfs-contract/releases/download/v{version}/frostfs-contract-v{version}.tar.gz",
"contract_version": "0.21.1",
"httpgw_image": "git.frostfs.info/truecloudlab/frostfs-http-gw",
"httpgw_node_count": 3,
"httpgw_version": "0.32.1-debian", # TODO: none of the published images work: either POSIX shell is missing or CORS container is required
"innerring_image": "git.frostfs.info/truecloudlab/frostfs-ir",
"innerring_version": "0.44.9",
"locode_archive_url": "https://git.frostfs.info/TrueCloudLab/frostfs-locode-db/releases/download/v{version}/locode_db.gz",
"locode_version": "0.5.2",
"neogo_image": "nspccdev/neo-go",
"neogo_min_peers": 3,
"neogo_version": "0.106.3",
"storage_image": "git.frostfs.info/truecloudlab/frostfs-storage",
"storage_node_count": 2,
"storage_version": "0.44.9",
}
default.update(config)
yield default
def _customizable_deployment(service: Component, _deployment, request):
"""Test fixture builder that allows overriding some deployment parameters later."""
config = {}
for key, value in _deployment.items():
if not key.startswith(f"{service}_"):
continue
config[key[len(service) + 1 :]] = value
override = getattr(request, "param", {})
config.update(override)
directory = config.get("dir")
if directory:
directory = Path(directory)
basename = directory.name
index = 0
while directory.exists():
index += 1
directory = directory.with_name(basename + f"-{index}")
config["dir"] = directory
config["prefix"] = f"{directory.parent.name}-{directory.name}-"
for key in ["dir", "template"]:
if isinstance(config[key], str):
config[key] = Path(config[key])
return SimpleNamespace(**config)
def _customize_decorator(service: Component, options):
"""
Test decorator that overrides deployment options for the specific service.
Docs: https://docs.pytest.org/en/latest/example/parametrize.html#indirect-parametrization
"""
return pytest.mark.parametrize(f"{service}_deployment", [options], indirect=[f"{service}_deployment"], ids=[f"custom_{service}"])
@pytest.fixture(scope=_SCOPE)
def alphabet_deployment(_deployment, request):
"""Alphabet node parameters."""
return _customizable_deployment(Component.ALPHABET, _deployment, request)
def alphabet_customize(**options):
"""Test decorator that overrides deployment options for alphabet nodes."""
return _customize_decorator(Component.ALPHABET, options)
@pytest.fixture(scope=_SCOPE)
def contract_deployment(_deployment, request):
"""Contract deployment parameters."""
return _customizable_deployment(Component.CONTRACT, _deployment, request)
def contract_customize(**options):
"""Test decorator that overrides deployment options for frostfs-contracts."""
return _customize_decorator(Component.CONTRACT, options)
@pytest.fixture(scope=_SCOPE)
def neogo_deployment(_deployment, request, alphabet_deployment):
"""neo-go deployment parameters."""
deployment = _customizable_deployment(Component.NEOGO, _deployment, request)
deployment.node_count = alphabet_deployment.node_count
return deployment
def neogo_customize(**options):
"""Test decorator that overrides deployment options for neo-go nodes."""
return _customize_decorator(Component.NEOGO, options)
@pytest.fixture(scope=_SCOPE)
def adm_deployment(_deployment, request):
"""Frostfs-adm container parameters."""
return _customizable_deployment(Component.ADM, _deployment, request)
def adm_customize(**options):
"""Test decorator that overrides deployment options for frostfs-adm container."""
return _customize_decorator(Component.ADM, options)
@pytest.fixture(scope=_SCOPE)
def locode_deployment(_deployment, request):
"""Frostfs locode database parameters."""
return _customizable_deployment(Component.LOCODE, _deployment, request)
def locode_customize(**options):
"""Test decorator that overrides deployment options for frostfs-locode-db archive."""
return _customize_decorator(Component.LOCODE, options)
@pytest.fixture(scope=_SCOPE)
def innerring_deployment(_deployment, request):
"""Innerring node parameters."""
return _customizable_deployment(Component.INNERRING, _deployment, request)
def innerring_customize(**options):
"""Test decorator that overrides deployment options for innerring nodes."""
return _customize_decorator(Component.INNERRING, options)
@pytest.fixture(scope=_SCOPE)
def storage_deployment(_deployment, request):
"""Storage node parameters."""
return _customizable_deployment(Component.STORAGE, _deployment, request)
def storage_customize(**options):
"""Test decorator that overrides deployment options for storage nodes."""
return _customize_decorator(Component.STORAGE, options)
@pytest.fixture(scope=_SCOPE)
def httpgw_deployment(_deployment, request):
"""HTTP gateway deployment parameters."""
return _customizable_deployment(Component.HTTPGW, _deployment, request)
def httpgw_customize(**options):
"""Test decorator that overrides deployment options for HTTP gateways."""
return _customize_decorator(Component.HTTPGW, options)
@pytest.fixture(scope=_SCOPE)
def _network():
"""Docker container network fixture. Should not be referenced directly from tests."""
network = Network()
network.name = f"{_PREFIX}{network.name}"
network.create()
yield network
network.remove()
@pytest.fixture(scope=_SCOPE)
def _deployment_dir() -> Path:
"""Temporary directory for a dynamic deployment. Should not be referenced directly from tests."""
tmp = Path(tempfile.mkdtemp(prefix=f"{_PREFIX}test-")).absolute()
yield tmp
shutil.rmtree(tmp)
@pytest.fixture(scope=_SCOPE)
def adm_config(alphabet_deployment):
alphabet_deployment.dir.mkdir(mode=0o700, exist_ok=False)
file = alphabet_deployment.dir / "_frostfs_adm.json"
tree = {
"alphabet-wallets": str(alphabet_deployment.dir),
"credentials": {},
}
rnd = random.SystemRandom()
for key in chain(["contract"], glagolic[: alphabet_deployment.node_count]):
tree["credentials"][key] = "".join(rnd.choice(string.ascii_letters) for _ in range(12))
with open(file, "w") as f:
json.dump(tree, f, indent=True, sort_keys=True, ensure_ascii=False)
yield tree, file
shutil.rmtree(alphabet_deployment.dir)
@pytest.fixture(scope=_SCOPE)
def alphabet_wallets(alphabet_deployment, frostfs_adm):
dest = alphabet_deployment.dir
count = alphabet_deployment.node_count
frostfs_adm(f"morph generate-alphabet --size {count}")
frostfs_adm.fetch(dest, dest)
pubkeys = _read_alphabet_public_keys(dest, count)
return pubkeys, dest
def _read_alphabet_public_keys(directory, count) -> List[str]:
public = []
for index in range(count):
letter = glagolic[index]
file = directory / f"{letter}.json"
public.append(_wallet_public_key(file))
return public
def _wallet_address(path: Path, account=0) -> str:
"""Read account address from Neo NEP-6 wallet."""
with open(path) as f:
wallet = json.load(f)
account = wallet["accounts"][account]
return account["address"]
def _wallet_public_key(path: Path, account=0) -> str:
"""Read public key from Neo NEP-6 wallet."""
with open(path) as f:
wallet = json.load(f)
account = wallet["accounts"][account]
script = b64decode(account["contract"]["script"])
if not _is_signature_contract(script):
raise ValueError(f"not a signature contract: {account['contract']['script']}")
return script[2:35].hex()
def _is_signature_contract(script: bytes) -> bool:
"""
Test if the provided script is a (single) signature contract.
Args:
script: contract script.
Copied from neo-mamba (neo3.contracts.utils.is_signature_contract).
"""
PUSHDATA1 = 0x0C
SYSCALL = 0x41
SYSTEM_CRYPTO_CHECK_STANDARD_ACCOUNT = bytes((0x56, 0xE7, 0xB3, 0x27))
if len(script) != 40:
return False
if script[0] != PUSHDATA1 or script[1] != 33 or script[35] != SYSCALL or script[36:40] != SYSTEM_CRYPTO_CHECK_STANDARD_ACCOUNT:
return False
return True
@pytest.fixture(scope=_SCOPE)
def neogo_config(neogo_deployment, adm_config, alphabet_wallets):
neogo_deployment.dir.mkdir(mode=0o700, exist_ok=False)
with open(neogo_deployment.template) as f:
template = f.read()
alphabet, alphabet_dir = alphabet_wallets
adm, _ = adm_config
credentials = adm["credentials"]
seedlist = [f"{neogo_deployment.prefix}{index}:20333" for index in range(len(alphabet))]
override = getattr(neogo_deployment, "override", {})
fields = vars(neogo_deployment)
configs = {}
for index in range(len(alphabet)):
letter = glagolic[index]
fields.update(
dict(
letter=letter,
index=index,
password=credentials[letter],
)
)
config = yaml.load(template.format(**fields), yaml.SafeLoader)
config["ProtocolConfiguration"]["Hardforks"] = {} # kludge: templating collision for {}
config["ProtocolConfiguration"]["StandbyCommittee"] = alphabet
config["ProtocolConfiguration"]["SeedList"] = seedlist[:index] + seedlist[index + 1 :]
_update(config, override)
with open(neogo_deployment.dir / f"{letter}.json", "w") as c:
json.dump(config, c, ensure_ascii=False, indent=True, sort_keys=True)
configs[letter] = config
yield configs, neogo_deployment.dir
shutil.rmtree(neogo_deployment.dir)
def _update(old: Mapping, new: Mapping) -> None:
"""Recursive version of dict.update."""
for key in new:
if key in old and isinstance(old[key], Mapping) and isinstance(new[key], Mapping):
_update(old[key], new[key])
continue
old[key] = new[key]
@pytest.fixture(scope=_SCOPE)
def neogo(neogo_deployment, neogo_config, alphabet_deployment, frostfs_adm, _network):
wallet_dir = alphabet_deployment.dir
_, config_dir = neogo_config
nodes = []
for index in range(neogo_deployment.node_count):
letter = glagolic[index]
node = ContainerizedService(
command=f"neo-go node --config-file /neogo/{letter}.json --privnet --debug",
image=f"{neogo_deployment.image}:{neogo_deployment.version}",
name=f"{neogo_deployment.prefix}{index+1}",
network=_network,
)
node.add_file(wallet_dir / f"{letter}.json", f"/wallet/{letter}.json")
node.add_file(config_dir / f"{letter}.json", f"/neogo/{letter}.json")
node.start()
nodes.append(node)
def add_rpc_endpoint(command):
for arg in shlex.split(command):
# Check that there is at least one non-flag argument
# (--version does not work with --rpc-endpoint)
if not arg.startswith("-"):
break
else:
return command
return f"--rpc-endpoint 'http://{random.choice(nodes).name}:30333' {command}"
frostfs_adm._default_cmd_rewrite = add_rpc_endpoint
yield nodes
for node in nodes:
node.destroy()
@pytest.fixture(scope=_SCOPE)
def frostfs_adm(adm_deployment, adm_config, alphabet_deployment, _network):
_, config_file = adm_config
adm = ContainerizedService(
command="sleep infinity",
default_command=f'frostfs-adm --config "{config_file}" ' "{command}",
image=f"{adm_deployment.image}:{adm_deployment.version}",
name=f"{adm_deployment.prefix.strip('-')}",
network=_network,
)
wallet_dir = alphabet_deployment.dir
adm.add_directory(wallet_dir, wallet_dir)
adm.add_file(config_file, config_file)
yield adm
adm.destroy()
@pytest.fixture(scope=_SCOPE)
def frostfs_bootstrap(frostfs_contract, frostfs_adm, neogo) -> Mapping[str, str]:
output = {}
def morph(command: str) -> str:
output[command] = frostfs_adm(f"morph {command}")
frostfs_adm.add_directory(frostfs_contract, frostfs_contract)
morph(f"init --contracts '{frostfs_contract}'")
morph(
"ape add-rule-chain "
"--target-type namespace "
"--target-name '' "
"--rule 'allow Container.* *' "
"--chain-id 'allow_container_ops'"
)
morph("set-config ContainerFee=0")
morph("set-config ContainerAliasFee=0")
morph("set-config InnerRingCandidateFee=13")
morph("set-config WithdrawFee=17")
return output
@pytest.fixture(scope=_SCOPE)
def frostfs_contract(contract_deployment):
if contract_deployment.dir.exists():
return contract_deployment.dir
contract_deployment.dir.mkdir(mode=0o700, exist_ok=False)
with urlopen(
contract_deployment.archive_url.format(
version=contract_deployment.version,
)
) as request:
with tarfile.open(fileobj=request, mode="r|*") as tar:
tar.extractall(path=contract_deployment.dir, filter=_tar_strip_components(1))
return contract_deployment.dir
@pytest.fixture(scope=_SCOPE)
def frostfs_locode(locode_deployment):
locode = locode_deployment.dir / "locode_db"
if locode.exists():
return locode
locode_deployment.dir.mkdir(mode=0o700, exist_ok=False)
with urlopen(
locode_deployment.archive_url.format(
version=locode_deployment.version,
)
) as request:
with gzip.GzipFile(fileobj=request, mode="rb") as archive:
with open(locode, "wb") as destination:
shutil.copyfileobj(archive, destination)
return locode
@pytest.fixture(scope=_SCOPE)
def innerring_config(innerring_deployment, neogo, adm_config, alphabet_wallets):
innerring_deployment.dir.mkdir(mode=0o700, exist_ok=False)
with open(innerring_deployment.template) as f:
template = f.read()
alphabet, alphabet_dir = alphabet_wallets
adm, _ = adm_config
credentials = adm["credentials"]
override = getattr(innerring_deployment, "override", {})
fields = vars(innerring_deployment)
configs = {}
for index in range(len(alphabet)):
letter = glagolic[index]
fields.update(
dict(
letter=letter,
index=index,
password=credentials[letter],
neogo=neogo[index].name,
)
)
config = yaml.load(template.format(**fields), yaml.SafeLoader)
config["morph"]["validators"] = alphabet
_update(config, override)
with open(innerring_deployment.dir / f"{letter}.json", "w") as c:
json.dump(config, c, ensure_ascii=False, indent=True, sort_keys=True)
configs[letter] = config
yield configs, innerring_deployment.dir
shutil.rmtree(innerring_deployment.dir)
@pytest.fixture(scope=_SCOPE)
def innerring(innerring_deployment, innerring_config, frostfs_locode, frostfs_bootstrap, alphabet_deployment, _network):
wallet_dir = alphabet_deployment.dir
_, config_dir = innerring_config
nodes = []
for index in range(alphabet_deployment.node_count):
letter = glagolic[index]
node = ContainerizedService(
command=f"frostfs-ir --config /innerring/{letter}.json",
image=f"{innerring_deployment.image}:{innerring_deployment.version}",
name=f"{innerring_deployment.prefix}{index+1}",
network=_network,
)
node.add_file(wallet_dir / f"{letter}.json", f"/wallet/{letter}.json")
node.add_file(config_dir / f"{letter}.json", f"/innerring/{letter}.json")
node.add_file(frostfs_locode, f"/innerring/locode.db")
node.start()
nodes.append(node)
yield nodes
for node in nodes:
node.destroy()
@pytest.fixture(scope=_SCOPE)
def storage_config(storage_deployment, neogo, frostfs_adm, innerring):
storage_deployment.dir.mkdir(mode=0o700, exist_ok=False)
with open(storage_deployment.template) as f:
template = f.read()
sidechain = []
for node in neogo:
sidechain.append(
dict(
address=f"ws://{node.name}:30333/ws",
priority=0,
)
)
override = getattr(storage_deployment, "override", {})
fields = vars(storage_deployment)
configs = []
for index in range(storage_deployment.node_count):
wallet = storage_deployment.dir / f"wallet-{index}.json"
_, password = _new_wallet(wallet)
fields.update(
dict(
index=index,
prefix=storage_deployment.prefix,
wallet=str(wallet),
password=password,
price=42,
)
)
config = yaml.load(template.format(**fields), yaml.SafeLoader)
config["morph"]["rpc_endpoint"] = sidechain
_update(config, override)
with open(storage_deployment.dir / f"config-{index}.json", "w") as c:
json.dump(config, c, ensure_ascii=False, indent=True, sort_keys=True)
frostfs_adm.add_file(wallet, wallet)
frostfs_adm(f"morph refill-gas --storage-wallet '{wallet}' --gas 50.0")
configs.append(config)
yield configs, storage_deployment.dir
shutil.rmtree(storage_deployment.dir)
@pytest.fixture(scope=_SCOPE)
def storage(storage_deployment, storage_config, frostfs_adm, _network):
nodes = []
configs, _ = storage_config
for index, config in enumerate(configs):
node = ContainerizedService(
command=f"frostfs-node --config /storage/config.json",
image=f"{storage_deployment.image}:{storage_deployment.version}",
name=f"{storage_deployment.prefix}{index+1}",
network=_network,
)
node.add_file(config["node"]["wallet"]["path"], config["node"]["wallet"]["path"])
node.add_file(storage_deployment.dir / f"config-{index}.json", f"/storage/config.json")
node.start()
nodes.append(node)
for index, node in enumerate(nodes):
# Adding storage node account to proxy contract is required to be able to use apemanager:
# https://chat.yadro.com/yadro/pl/eet5jxiuabn1i8omg6jz4yeeso
address = _wallet_address(configs[index]["node"]["wallet"]["path"])
frostfs_adm(f"morph proxy-add-account --account {address}")
frostfs_adm("morph force-new-epoch")
yield nodes
for node in nodes:
node.destroy()
@pytest.fixture(scope=_SCOPE)
def httpgw_config(httpgw_deployment, storage, neogo):
httpgw_deployment.dir.mkdir(mode=0o700, exist_ok=False)
with open(httpgw_deployment.template) as f:
template = f.read()
peers = {}
for index, node in enumerate(storage):
peers[index] = dict(
address=f"grpc://{node.name}:8802",
priority=1,
weight=1,
)
override = getattr(httpgw_deployment, "override", {})
fields = vars(httpgw_deployment)
configs = []
for index in range(httpgw_deployment.node_count):
wallet = httpgw_deployment.dir / f"wallet-{index}.json"
_, password = _new_wallet(wallet)
fields.update(
dict(
wallet=str(wallet),
password=password,
morph=neogo[index % len(neogo)].name,
)
)
config = yaml.load(template.format(**fields), yaml.SafeLoader)
config["peers"] = peers
_update(config, override)
with open(httpgw_deployment.dir / f"config-{index}.json", "w") as c:
json.dump(config, c, ensure_ascii=False, indent=True, sort_keys=True)
configs.append(config)
yield configs, httpgw_deployment.dir
shutil.rmtree(httpgw_deployment.dir)
@pytest.fixture(scope=_SCOPE)
def httpgw(httpgw_deployment, httpgw_config, _network):
nodes = []
configs, _ = httpgw_config
for index, config in enumerate(configs):
node = ContainerizedService(
command=f"frostfs-http-gw --config /httpgw/config.json",
image=f"{httpgw_deployment.image}:{httpgw_deployment.version}",
name=f"{httpgw_deployment.prefix}{index+1}",
network=_network,
)
node.add_file(config["wallet"]["path"], config["wallet"]["path"])
node.add_file(httpgw_deployment.dir / f"config-{index}.json", f"/httpgw/config.json")
node.start()
nodes.append(node)
ready = re.compile(r"starting server.*\:80")
for node in nodes:
node.wait(ready, timeout=10)
yield nodes
for node in nodes:
node.destroy()
def _new_wallet(path: Path, password: str = None) -> (Wallet, str):
"""
Create new wallet and new account.
"""
wallet = Wallet()
if password is None:
password = "".join(random.choice(string.ascii_letters) for _ in range(12))
account = Account.create_new(password)
wallet.account_add(account)
with open(path, "w") as out:
json.dump(wallet.to_json(), out)
return wallet, password
def _tar_strip_components(number=1):
"""
See --strip-components in `man tar`.
"""
sep = "/"
def _filter(member: tarfile.TarInfo, path: str) -> tarfile.TarInfo | None:
components = member.name.split(sep)
for _ in range(number):
if not components:
break
components.pop(0)
if not components:
return None
member = member.replace(name=sep.join(components))
return tarfile.data_filter(member, path)
return _filter