[#262] Added new test for EC policy
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
Some checks reported warnings
DCO check / Commits Check (pull_request) Has been cancelled
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
parent
9756953b10
commit
8c20aa69c1
3 changed files with 581 additions and 2 deletions
|
@ -1,8 +1,8 @@
|
|||
[tool.isort]
|
||||
profile = "black"
|
||||
src_paths = ["pytest_tests"]
|
||||
line_length = 120
|
||||
line_length = 140
|
||||
|
||||
[tool.black]
|
||||
line-length = 120
|
||||
line-length = 140
|
||||
target-version = ["py310"]
|
||||
|
|
|
@ -64,6 +64,7 @@ markers =
|
|||
write_cache_loss: tests for write cache loss
|
||||
time: time tests
|
||||
replication: replication tests
|
||||
ec_replication: replication EC
|
||||
static_session_container: tests for a static session in a container
|
||||
shard: shard management tests
|
||||
logs_after_session: tests after a session with logs
|
578
pytest_tests/testsuites/replication/test_ec_replication.py
Normal file
578
pytest_tests/testsuites/replication/test_ec_replication.py
Normal file
|
@ -0,0 +1,578 @@
|
|||
import json
|
||||
from dataclasses import dataclass
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
import yaml
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
|
||||
from frostfs_testlib.cli.netmap_parser import NetmapParser
|
||||
from frostfs_testlib.credentials.interfaces import User
|
||||
from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.steps.cli.object import get_object, put_object
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers import ClusterStateController
|
||||
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.utils.cli_utils import parse_netmap_output
|
||||
from frostfs_testlib.utils.file_utils import generate_file, get_file_hash
|
||||
|
||||
from pytest_tests.resources.common import HOSTING_CONFIG_FILE
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
|
||||
if "ec_policy" not in metafunc.fixturenames:
|
||||
return
|
||||
|
||||
with open(HOSTING_CONFIG_FILE, "r") as file:
|
||||
hosting_config = yaml.full_load(file)
|
||||
|
||||
node_count = len(hosting_config["hosts"])
|
||||
|
||||
ec_map = {
|
||||
4: ["EC 1.1", "EC 2.1", "EC 3.1"],
|
||||
8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4"],
|
||||
16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"],
|
||||
100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"],
|
||||
}
|
||||
|
||||
metafunc.parametrize("ec_policy, node_count", ((ec_policy, node_count) for ec_policy in ec_map[node_count]))
|
||||
|
||||
|
||||
@dataclass
|
||||
class Chunk:
|
||||
def __init__(self, object_id: str, required_nodes: list, confirmed_nodes: list, ec_parent_object_id: str, ec_index: int) -> None:
|
||||
self.object_id = object_id
|
||||
self.required_nodes = required_nodes
|
||||
self.confirmed_nodes = confirmed_nodes
|
||||
self.ec_parent_object_id = ec_parent_object_id
|
||||
self.ec_index = ec_index
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.object_id
|
||||
|
||||
|
||||
@allure.title("Initialized local FrostfsCli")
|
||||
@pytest.fixture()
|
||||
def frostfs_local_cli(client_shell: Shell, default_user: User) -> FrostfsCli:
|
||||
return FrostfsCli(client_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=default_user.wallet.config_path)
|
||||
|
||||
|
||||
@allure.title("Initialized remote FrostfsAdm")
|
||||
@pytest.fixture
|
||||
def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm:
|
||||
node = cluster.cluster_nodes[0]
|
||||
shell = node.host.get_shell()
|
||||
return FrostfsAdm(shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH)
|
||||
|
||||
|
||||
@pytest.mark.replication
|
||||
@pytest.mark.ec_replication
|
||||
class TestECReplication(ClusterTestBase):
|
||||
@allure.title("Restore chunk maximum params in network params ")
|
||||
@pytest.fixture
|
||||
def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None:
|
||||
yield
|
||||
frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=12" "MaxECParityCount=5"')
|
||||
|
||||
@reporter.step("Get object nodes output ")
|
||||
def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict:
|
||||
if not endpoint:
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
return json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True).stdout)
|
||||
|
||||
@reporter.step("Get all chunks object ")
|
||||
def get_all_chunks_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> list[Chunk]:
|
||||
chunks = self.get_object_nodes(cli, cid, oid, endpoint)
|
||||
return [Chunk(**chunk) for chunk in chunks["data_objects"]]
|
||||
|
||||
@reporter.step("Get parity chunk ")
|
||||
def get_parity_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk:
|
||||
chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"]
|
||||
return Chunk(**chunks[-1])
|
||||
|
||||
@reporter.step("Get data chunk ")
|
||||
def get_data_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk:
|
||||
chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"]
|
||||
return Chunk(**chunks[0])
|
||||
|
||||
@reporter.step("Search node without chunks ")
|
||||
def search_node_not_chunks(self, chunks: list[Chunk], local_cli: FrostfsCli, endpoint: str = None) -> list[ClusterNode]:
|
||||
if not endpoint:
|
||||
self.cluster.default_rpc_endpoint
|
||||
netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint).stdout)
|
||||
chunks_node_key = []
|
||||
for chunk in chunks:
|
||||
chunks_node_key.extend(chunk.confirmed_nodes)
|
||||
for node_info in netmap.copy():
|
||||
if node_info.node_id in chunks_node_key and node_info in netmap:
|
||||
netmap.remove(node_info)
|
||||
result = []
|
||||
for node_info in netmap:
|
||||
for cluster_node in self.cluster.cluster_nodes:
|
||||
if node_info.node == cluster_node.host_ip:
|
||||
result.append(cluster_node)
|
||||
return result
|
||||
|
||||
@reporter.step("Create container, policy={policy}")
|
||||
def create_container(self, user_cli: FrostfsCli, endpoint: str, policy: str) -> str:
|
||||
return user_cli.container.create(endpoint, policy=policy, await_mode=True).stdout.split(" ")[1].strip().split("\n")[0]
|
||||
|
||||
@reporter.step("Search node chunk {chunk}")
|
||||
def get_chunk_node(self, frostfs_cli: FrostfsCli, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]:
|
||||
netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint).stdout)
|
||||
for node_info in netmap:
|
||||
if node_info.node_id in chunk.confirmed_nodes:
|
||||
for cluster_node in self.cluster.cluster_nodes:
|
||||
if cluster_node.host_ip == node_info.node:
|
||||
return (cluster_node, node_info)
|
||||
|
||||
@reporter.step("Check replication chunks={total_chunks} chunks ")
|
||||
def check_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str) -> bool:
|
||||
object_nodes_info = local_cli.object.nodes(self.cluster.default_rpc_endpoint, cid, oid=oid, json=True).stdout
|
||||
object_nodes_info = json.loads(object_nodes_info)
|
||||
return len(object_nodes_info["data_objects"]) == total_chunks
|
||||
|
||||
@allure.title("Disable Policer on all nodes")
|
||||
@pytest.fixture()
|
||||
def disable_policer(
|
||||
self,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
) -> None:
|
||||
with reporter.step(f"Disable policer for nodes"):
|
||||
cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes(
|
||||
service_type=StorageNode, values={"policer": {"unsafe_disable": True}}
|
||||
)
|
||||
yield
|
||||
with reporter.step(f"Enable policer for nodes"):
|
||||
cluster_state_controller.start_stopped_hosts()
|
||||
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
||||
|
||||
@allure.title("Create container with EC policy (size={object_size.value})")
|
||||
def test_create_container_with_ec_policy(
|
||||
self,
|
||||
default_user: User,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
object_size: ObjectSize,
|
||||
) -> None:
|
||||
test_file = generate_file(object_size.value)
|
||||
rep_count = 3
|
||||
if object_size.name == "complex":
|
||||
rep_count *= 4
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check replication chunks."):
|
||||
assert self.check_replication(rep_count, frostfs_local_cli, cid, oid)
|
||||
|
||||
@allure.title("Lose node with chunk data")
|
||||
@pytest.mark.failover
|
||||
def test_lose_node_with_data_chunk(
|
||||
self,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
default_user: User,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
disable_policer: None,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check chunk replication on 4 nodes."):
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
with reporter.step("Search node data chunk"):
|
||||
chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid)
|
||||
chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0]
|
||||
|
||||
with reporter.step("Stop node with data chunk."):
|
||||
cluster_state_controller.stop_node_host(chunk_node, "hard")
|
||||
|
||||
with reporter.step("Get object"):
|
||||
node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0]
|
||||
get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step("Start stopped node, and check replication chunks."):
|
||||
cluster_state_controller.start_node_host(chunk_node)
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
@allure.title("Lose node with chunk parity")
|
||||
@pytest.mark.failover
|
||||
def test_lose_node_with_parity_chunk(
|
||||
self,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
default_user: User,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
disable_policer: None,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check chunk replication on 4 nodes."):
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
with reporter.step("Search node with parity chunk"):
|
||||
chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid)
|
||||
chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0]
|
||||
|
||||
with reporter.step("Stop node parity chunk."):
|
||||
cluster_state_controller.stop_node_host(chunk_node, "hard")
|
||||
|
||||
with reporter.step("Get object, expect success."):
|
||||
node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0]
|
||||
get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step("Start stoped node, and check replication chunks."):
|
||||
cluster_state_controller.start_node_host(chunk_node)
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
@allure.title("Lose nodes with chunk data and parity")
|
||||
@pytest.mark.failover
|
||||
def test_lose_nodes_data_chunk_and_parity(
|
||||
self,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
default_user: User,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
disable_policer: None,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check count chunks, expect 4."):
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
with reporter.step("Search node data chunk and node parity chunk"):
|
||||
data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid)
|
||||
node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0]
|
||||
parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid)
|
||||
node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)[0]
|
||||
|
||||
with reporter.step("Stop node with data chunk."):
|
||||
cluster_state_controller.stop_node_host(node_data_chunk, "hard")
|
||||
|
||||
with reporter.step("Get object"):
|
||||
node = list(set(self.cluster.cluster_nodes) - {node_data_chunk, node_parity_chunk})[0]
|
||||
get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step("Start stopped host and check chunks."):
|
||||
cluster_state_controller.start_node_host(node_data_chunk)
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
with reporter.step("Stop node with parity chunk and one all node."):
|
||||
cluster_state_controller.stop_node_host(node_data_chunk, "hard")
|
||||
cluster_state_controller.stop_node_host(node_parity_chunk, "hard")
|
||||
|
||||
with reporter.step("Get object, expect error."):
|
||||
with pytest.raises(RuntimeError):
|
||||
get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step("Start stopped nodes and check replication chunk."):
|
||||
cluster_state_controller.start_stopped_hosts()
|
||||
assert self.check_replication(4, frostfs_local_cli, cid, oid)
|
||||
|
||||
@allure.title("Policer work with chunk")
|
||||
@pytest.mark.failover
|
||||
def test_work_policer_with_nodes(
|
||||
self,
|
||||
simple_object_size: ObjectSize,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
default_user: User,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
|
||||
|
||||
with reporter.step("Put object on container."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check count chunks nodes on 3."):
|
||||
assert self.check_replication(3, frostfs_local_cli, cid, oid)
|
||||
|
||||
with reporter.step("Stop node with chunk."):
|
||||
data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid)
|
||||
first_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid)
|
||||
node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0]
|
||||
cluster_state_controller.stop_node_host(node_data_chunk, "hard")
|
||||
|
||||
with reporter.step("Check replication chunk with different node."):
|
||||
alive_endpoint = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0].storage_node.get_rpc_endpoint()
|
||||
node = self.search_node_not_chunks(first_all_chunks, frostfs_local_cli, endpoint=alive_endpoint)[0]
|
||||
second_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step("Check that oid no change."):
|
||||
oid_chunk_check = [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]
|
||||
assert len(oid_chunk_check) > 0
|
||||
|
||||
with reporter.step("Start stopped host, and check delete 4 chunk."):
|
||||
cluster_state_controller.start_node_host(node_data_chunk)
|
||||
all_chunks_after_start_node = self.get_all_chunks_object(frostfs_local_cli, cid, oid)
|
||||
assert len(all_chunks_after_start_node) == 3
|
||||
|
||||
@allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size.name})")
|
||||
def test_create_container_with_difference_count_nodes(
|
||||
self,
|
||||
node_count: int,
|
||||
ec_policy: str,
|
||||
object_size: ObjectSize,
|
||||
default_user: User,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1])
|
||||
if "complex" in object_size.name:
|
||||
expected_chunks *= 4
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, ec_policy)
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check count object chunks."):
|
||||
chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
assert len(chunks) == expected_chunks
|
||||
|
||||
with reporter.step("get object and check hash."):
|
||||
file_with_node = get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
assert get_file_hash(test_file) == get_file_hash(file_with_node)
|
||||
|
||||
@allure.title("Request PUT with copies_number flag")
|
||||
def test_put_object_with_copies_number(
|
||||
self,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
default_user: User,
|
||||
simple_object_size: ObjectSize,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
|
||||
|
||||
with reporter.step("Put object in container with copies number = 1"):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1)
|
||||
|
||||
with reporter.step("Check that count chunks > 1."):
|
||||
chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
assert len(chunks) > 1
|
||||
|
||||
@allure.title("Request PUT and 1 node off")
|
||||
@pytest.mark.failover
|
||||
def test_put_object_with_off_cnr_node(
|
||||
self,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
default_user: User,
|
||||
simple_object_size: ObjectSize,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
|
||||
|
||||
with reporter.step("Stop one node in container nodes"):
|
||||
cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard")
|
||||
|
||||
with reporter.step("Put object in container, expect error."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
with pytest.raises(RuntimeError, match="put single object on client"):
|
||||
put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
@allure.title("Request DELETE (size={object_size.name})")
|
||||
@pytest.mark.failover
|
||||
def test_delete_object_in_ec_cnr(
|
||||
self,
|
||||
default_user: User,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check object chunks nodes."):
|
||||
chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
replication_count = 3 if object_size.name == "simple" else 3 * 4
|
||||
assert len(chunks_object) == replication_count
|
||||
|
||||
with reporter.step("Delete object"):
|
||||
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid)
|
||||
|
||||
with reporter.step("Check that delete all chunks."):
|
||||
for chunk in chunks_object:
|
||||
with pytest.raises(RuntimeError, match="object already removed"):
|
||||
frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id)
|
||||
|
||||
with reporter.step("Put second object."):
|
||||
oid_second = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check second object chunks nodes."):
|
||||
chunks_second_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid_second, self.cluster.default_rpc_endpoint)
|
||||
assert len(chunks_second_object) == replication_count
|
||||
|
||||
with reporter.step("Stop nodes with chunk."):
|
||||
chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_second_object[0])
|
||||
cluster_state_controller.stop_node_host(chunk_node[0], "hard")
|
||||
|
||||
with reporter.step("Delete second object"):
|
||||
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid_second)
|
||||
|
||||
with reporter.step("Check that delete all chunk second object."):
|
||||
for chunk in chunks_second_object:
|
||||
with pytest.raises(RuntimeError, match="object already removed"):
|
||||
frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id)
|
||||
|
||||
@allure.title("Request LOCK (size={object_size.name})")
|
||||
@pytest.mark.failover
|
||||
def test_lock_object_in_ec_cnr(
|
||||
self,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
object_size: ObjectSize,
|
||||
default_user: User,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check object chunks nodes."):
|
||||
chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
replication_count = 3 if object_size.name == "simple" else 3 * 4
|
||||
assert len(chunks_object) == replication_count
|
||||
|
||||
with reporter.step("Put LOCK in object."):
|
||||
epoch = frostfs_local_cli.netmap.epoch(self.cluster.default_rpc_endpoint).stdout.strip()
|
||||
frostfs_local_cli.object.lock(self.cluster.default_rpc_endpoint, cid, oid, expire_at=int(epoch) + 5).stdout
|
||||
|
||||
with reporter.step("Check LOCK in object"):
|
||||
chunks = frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, oid, raw=True).stdout.strip().split(" ")
|
||||
oids_chunks = [chunk.strip() for chunk in chunks if len(chunk) > 35]
|
||||
for chunk_id in oids_chunks:
|
||||
with pytest.raises(RuntimeError, match="could not delete objects"):
|
||||
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id)
|
||||
|
||||
with reporter.step("Stop chunk node."):
|
||||
chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_object[0])
|
||||
cluster_state_controller.stop_node_host(chunk_node[0], "hard")
|
||||
cluster_state_controller.start_node_host(chunk_node[0])
|
||||
|
||||
with reporter.step("Check LOCK in object."):
|
||||
chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
for chunk_id in oids_chunks:
|
||||
with pytest.raises(RuntimeError, match="could not delete objects"):
|
||||
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id)
|
||||
|
||||
@allure.title("Output MaxEC* params in frostfscli (type={type_shards})")
|
||||
@pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"])
|
||||
def test_maxec_info_with_output_cli(self, frostfs_local_cli: FrostfsCli, type_shards: str) -> None:
|
||||
with reporter.step("Get and check params"):
|
||||
net_info = frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout
|
||||
assert type_shards in net_info
|
||||
|
||||
@allure.title("Change MaxEC*Count params")
|
||||
def test_change_max_data_shards_params(
|
||||
self,
|
||||
frostfs_remote_adm: FrostfsAdm,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
restore_network_config: None,
|
||||
) -> None:
|
||||
with reporter.step("Get now params MaxECDataCount and MaxECParityCount"):
|
||||
node_netinfo = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout)
|
||||
|
||||
with reporter.step("Change params"):
|
||||
frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"')
|
||||
|
||||
with reporter.step("Get update params"):
|
||||
update_net_info = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout)
|
||||
|
||||
with reporter.step("Check old and new params difference"):
|
||||
assert (
|
||||
update_net_info.maximum_count_of_data_shards not in node_netinfo.maximum_count_of_data_shards
|
||||
and update_net_info.maximum_count_of_parity_shards not in node_netinfo.maximum_count_of_parity_shards
|
||||
)
|
||||
|
||||
@allure.title("Check maximum count data and parity shards")
|
||||
def test_change_over_max_parity_shards_params(
|
||||
self,
|
||||
frostfs_remote_adm: FrostfsAdm,
|
||||
) -> None:
|
||||
with reporter.step("Change over maximum params shards count."):
|
||||
with pytest.raises(RuntimeError, match="MaxECDataCount and MaxECParityCount must be <= 256"):
|
||||
frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=130" "MaxECParityCount=130"')
|
||||
|
||||
@allure.title("Create container with EC policy and SELECT (SELECT={select})")
|
||||
@pytest.mark.parametrize("select", [2, 4])
|
||||
def test_create_container_with_select(
|
||||
self,
|
||||
select: int,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
) -> None:
|
||||
with reporter.step("Create container"):
|
||||
policy = f"EC 1.1 CBF 1 SELECT {select} FROM *"
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy)
|
||||
|
||||
with reporter.step("Check container nodes decomposed"):
|
||||
container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:]
|
||||
assert len(container_nodes) == select
|
||||
|
||||
@allure.title("Create container with EC policy and CBF (CBF={cbf})")
|
||||
@pytest.mark.parametrize("cbf, expected_nodes", [(1, 2), (2, 4)])
|
||||
def test_create_container_with_cbf(
|
||||
self,
|
||||
cbf: int,
|
||||
expected_nodes: int,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
policy = f"EC 1.1 CBF {cbf}"
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy)
|
||||
|
||||
with reporter.step("Check expected container nodes."):
|
||||
container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:]
|
||||
assert len(container_nodes) == expected_nodes
|
||||
|
||||
@allure.title("Create container with EC policy and FILTER")
|
||||
def test_create_container_with_filter(
|
||||
self,
|
||||
default_user: User,
|
||||
frostfs_local_cli: FrostfsCli,
|
||||
simple_object_size: ObjectSize,
|
||||
) -> None:
|
||||
with reporter.step("Create Container."):
|
||||
policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"
|
||||
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy)
|
||||
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check object is decomposed exclusively on Russian nodes"):
|
||||
data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
|
||||
node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)
|
||||
node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)
|
||||
for node in [node_data_chunk[1], node_parity_chunk[1]]:
|
||||
assert "Russia" in node.country
|
Loading…
Reference in a new issue