[#281] Added tests EC policy

Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
This commit is contained in:
Dmitriy Zayakin 2024-08-06 10:49:04 +03:00
parent 1f43aa4dc0
commit bbc1a20a4d
4 changed files with 534 additions and 153 deletions

View file

@ -1,5 +1,6 @@
{
"rep-3": "REP 3",
"rep-1": "REP 1",
"complex": "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
"complex": "REP 1 IN X CBF 1 SELECT 1 FROM * AS X",
"ec3.1": "EC 3.1 CBF 1 SELECT 4 FROM *"
}

View file

@ -42,6 +42,7 @@ logger = logging.getLogger("NeoLogger")
SERVICE_ACTIVE_TIME = 20
WALLTETS_IN_POOL = 2
# Add logs check test even if it's not fit to mark selectors
def pytest_configure(config: pytest.Config):
markers = config.option.markexpr
@ -52,6 +53,8 @@ def pytest_configure(config: pytest.Config):
number_key = pytest.StashKey[str]()
start_time = pytest.StashKey[int]()
test_outcome = pytest.StashKey[str]()
# pytest hook. Do not rename
def pytest_collection_modifyitems(items: list[pytest.Item]):
# Change order of tests based on @pytest.mark.order(<int>) marker

View file

@ -1,11 +1,12 @@
import math
import time
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
from frostfs_testlib.steps.metrics import check_metrics_counter
from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container
from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object_to_random_node
from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
@ -16,23 +17,29 @@ from frostfs_testlib.utils.file_utils import generate_file
@pytest.mark.container
class TestContainerMetrics(ClusterTestBase):
@allure.title("Container metrics (obj_size={object_size})")
@allure.title("Container metrics (obj_size={object_size},policy={policy})")
@pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")])
def test_container_metrics(
self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster
self,
object_size: ObjectSize,
max_object_size: int,
default_wallet: WalletInfo,
cluster: Cluster,
placement_policy: str,
policy: str,
):
file_path = generate_file(object_size.value)
placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
copies = 2
object_chunks = 0
head_object = 1
copies = 2 if policy == "REP" else 1
object_chunks = 1
link_object = 0
if object_size.value > max_object_size:
object_chunks = math.ceil(object_size.value / max_object_size)
link_object = 1
with reporter.step(f"Create container with policy {placement_policy}"):
cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy)
if object_size.value > max_object_size:
object_chunks = math.ceil(object_size.value / max_object_size)
link_object = len(search_nodes_with_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint, cluster))
with reporter.step("Put object to random node"):
oid = put_object_to_random_node(
wallet=default_wallet,
@ -44,44 +51,72 @@ class TestContainerMetrics(ClusterTestBase):
with reporter.step("Get object nodes"):
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes)
object_nodes = [
cluster_node
for cluster_node in cluster.cluster_nodes
if cluster_node.storage_node in object_storage_nodes
]
object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes]
with reporter.step("Check metric appears in node where the object is located"):
count_metrics = (object_chunks + head_object + link_object) * copies
check_metrics_counter(
object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="phy"
)
check_metrics_counter(
object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="logic"
)
check_metrics_counter(
object_nodes, counter_exp=copies, command="container_objects_total", cid=cid, type="user"
)
count_metrics = (object_chunks * copies) + link_object
if policy == "EC":
count_metrics = (object_chunks * 2) + link_object
check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="phy")
check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="logic")
check_metrics_counter(object_nodes, counter_exp=copies, command="container_objects_total", cid=cid, type="user")
with reporter.step("Delete file, wait until gc remove object"):
delete_object(default_wallet, cid, oid, self.shell, cluster.default_rpc_endpoint)
with reporter.step(f"Check container metrics 'the counter should equal {len(object_nodes)}' in object nodes"):
check_metrics_counter(
object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="phy"
)
check_metrics_counter(
object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="logic"
)
check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="phy")
check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="logic")
check_metrics_counter(object_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user")
with reporter.step("Check metrics(Phy, Logic, User) in each nodes"):
# Phy and Logic metrics are 4, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4
expect_metrics = 4 if policy == "REP" else 2
check_metrics_counter(cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=cid, type="phy")
check_metrics_counter(
cluster.cluster_nodes, counter_exp=4, command="container_objects_total", cid=cid, type="phy"
cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=cid, type="logic"
)
check_metrics_counter(
cluster.cluster_nodes, counter_exp=4, command="container_objects_total", cid=cid, type="logic"
check_metrics_counter(cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user")
@allure.title("Container size metrics (obj_size={object_size},policy={policy})")
@pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")])
def test_container_size_metrics(
self,
object_size: ObjectSize,
default_wallet: WalletInfo,
placement_policy: str,
policy: str,
):
file_path = generate_file(object_size.value)
with reporter.step(f"Create container with policy {policy}"):
cid = create_container(default_wallet, self.shell, self.cluster.default_rpc_endpoint, placement_policy)
with reporter.step("Put object to random node"):
oid = put_object_to_random_node(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
cluster=self.cluster,
)
check_metrics_counter(
cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user"
with reporter.step("Get object nodes"):
object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, self.cluster.storage_nodes)
object_nodes = [
cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes
]
with reporter.step("Check metric appears in all node where the object is located"):
act_metric = sum(
[get_metrics_value(node, command="frostfs_node_engine_container_size_bytes", cid=cid) for node in object_nodes]
)
assert (act_metric // 2) == object_size.value
with reporter.step("Delete file, wait until gc remove object"):
id_tombstone = delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
tombstone = head_object(default_wallet, cid, id_tombstone, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step(f"Check container size metrics"):
act_metric = get_metrics_value(object_nodes[0], command="frostfs_node_engine_container_size_bytes", cid=cid)
assert act_metric == int(tombstone["header"]["payloadLength"])

View file

@ -8,20 +8,22 @@ from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
from frostfs_testlib.cli.netmap_parser import NetmapParser
from frostfs_testlib.credentials.interfaces import User
from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
from frostfs_testlib.shell import Shell
from frostfs_testlib.plugins import load_plugin
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC
from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, HOSTING_CONFIG_FILE
from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper
from frostfs_testlib.s3.interfaces import BucketContainerResolver, VersioningStatus
from frostfs_testlib.steps.cli.object import get_object, put_object
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output
from frostfs_testlib.utils.file_utils import generate_file, get_file_hash
from pytest_tests.resources.common import HOSTING_CONFIG_FILE
def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
if "ec_policy" not in metafunc.fixturenames:
@ -33,8 +35,8 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
node_count = len(hosting_config["hosts"])
ec_map = {
4: ["EC 1.1", "EC 2.1", "EC 3.1"],
8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4"],
4: ["EC 1.1", "EC 2.1", "EC 3.1", "EC 2.2"],
8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4", "EC 3.1"],
16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"],
100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"],
}
@ -55,10 +57,12 @@ class Chunk:
return self.object_id
@allure.title("Initialized local FrostfsCli")
@allure.title("Init bucket container resolver")
@pytest.fixture()
def frostfs_local_cli(client_shell: Shell, default_user: User) -> FrostfsCli:
return FrostfsCli(client_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=default_user.wallet.config_path)
def bucket_container_resolver(node_under_test: ClusterNode) -> BucketContainerResolver:
resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node_under_test.host.config.product)
resolver: BucketContainerResolver = resolver_cls()
return resolver
@allure.title("Initialized remote FrostfsAdm")
@ -72,6 +76,40 @@ def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm:
@pytest.mark.replication
@pytest.mark.ec_replication
class TestECReplication(ClusterTestBase):
@pytest.fixture()
def rep_count(self, object_size: ObjectSize) -> int:
rep_count = 3
if object_size.name == "complex":
rep_count *= int(COMPLEX_OBJECT_CHUNKS_COUNT) + 1 if COMPLEX_OBJECT_TAIL_SIZE else int(COMPLEX_OBJECT_CHUNKS_COUNT)
return rep_count
@wait_for_success(120, 5)
def wait_for_nodes_appears_in_map(self, frostfs_cli: FrostfsCli, alive_node: ClusterNode, desired_nodes_count: int) -> bool:
self.tick_epoch(alive_node, 2)
netmap = parse_netmap_output(
frostfs_cli.netmap.snapshot(alive_node.storage_node.get_rpc_endpoint(), timeout=CLI_DEFAULT_TIMEOUT).stdout
)
assert len(netmap) == desired_nodes_count
@wait_for_success(120, 5)
def wait_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str, success: bool = True) -> None:
if not success:
assert not self.check_replication(total_chunks, local_cli, cid, oid)
else:
assert self.check_replication(total_chunks, local_cli, cid, oid)
@allure.title("Search shard")
def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str:
oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}"
node_shell = node.storage_node.host.get_shell()
shards_watcher = ShardsWatcher(node)
with reporter.step("Search object file"):
for shard_id, shard_info in shards_watcher.shards_snapshots[-1].items():
check_dir = node_shell.exec(f" [ -d {shard_info['blobstor'][1]['path']}/{oid_path} ] && echo 1 || echo 0").stdout
if "1" in check_dir.strip():
return shard_id
@allure.title("Restore chunk maximum params in network params ")
@pytest.fixture
def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None:
@ -82,7 +120,10 @@ class TestECReplication(ClusterTestBase):
def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict:
if not endpoint:
endpoint = self.cluster.default_rpc_endpoint
return json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True).stdout)
object_nodes = json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT).stdout)
if object_nodes.get("errors"):
raise object_nodes["errors"]
return object_nodes
@reporter.step("Get all chunks object ")
def get_all_chunks_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> list[Chunk]:
@ -103,7 +144,7 @@ class TestECReplication(ClusterTestBase):
def search_node_not_chunks(self, chunks: list[Chunk], local_cli: FrostfsCli, endpoint: str = None) -> list[ClusterNode]:
if not endpoint:
self.cluster.default_rpc_endpoint
netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint).stdout)
netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout)
chunks_node_key = []
for chunk in chunks:
chunks_node_key.extend(chunk.confirmed_nodes)
@ -119,11 +160,16 @@ class TestECReplication(ClusterTestBase):
@reporter.step("Create container, policy={policy}")
def create_container(self, user_cli: FrostfsCli, endpoint: str, policy: str) -> str:
return user_cli.container.create(endpoint, policy=policy, await_mode=True).stdout.split(" ")[1].strip().split("\n")[0]
return (
user_cli.container.create(endpoint, policy=policy, await_mode=True, timeout=CLI_DEFAULT_TIMEOUT)
.stdout.split(" ")[1]
.strip()
.split("\n")[0]
)
@reporter.step("Search node chunk {chunk}")
def get_chunk_node(self, frostfs_cli: FrostfsCli, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]:
netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint).stdout)
netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout)
for node_info in netmap:
if node_info.node_id in chunk.confirmed_nodes:
for cluster_node in self.cluster.cluster_nodes:
@ -132,7 +178,9 @@ class TestECReplication(ClusterTestBase):
@reporter.step("Check replication chunks={total_chunks} chunks ")
def check_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str) -> bool:
object_nodes_info = local_cli.object.nodes(self.cluster.default_rpc_endpoint, cid, oid=oid, json=True).stdout
object_nodes_info = local_cli.object.nodes(
self.cluster.default_rpc_endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT
).stdout
object_nodes_info = json.loads(object_nodes_info)
return len(object_nodes_info["data_objects"]) == total_chunks
@ -151,49 +199,48 @@ class TestECReplication(ClusterTestBase):
cluster_state_controller.start_stopped_hosts()
cluster_state_controller.manager(ConfigStateManager).revert_all()
@allure.title("Create container with EC policy (size={object_size.value})")
@allure.title("Create container with EC policy (size={object_size})")
def test_create_container_with_ec_policy(
self,
default_user: User,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
rep_count: int,
) -> None:
test_file = generate_file(object_size.value)
rep_count = 3
if object_size.name == "complex":
rep_count *= 4
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container."):
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check replication chunks."):
assert self.check_replication(rep_count, frostfs_local_cli, cid, oid)
assert self.check_replication(rep_count, frostfs_cli, cid, oid)
@allure.title("Lose node with chunk data")
@pytest.mark.failover
def test_lose_node_with_data_chunk(
self,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
default_user: User,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
disable_policer: None,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
with reporter.step("Put object in container."):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check chunk replication on 4 nodes."):
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
with reporter.step("Search node data chunk"):
chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid)
chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0]
chunk = self.get_data_chunk_object(frostfs_cli, cid, oid)
chunk_node = self.get_chunk_node(frostfs_cli, chunk)[0]
with reporter.step("Stop node with data chunk."):
cluster_state_controller.stop_node_host(chunk_node, "hard")
@ -204,31 +251,31 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped node, and check replication chunks."):
cluster_state_controller.start_node_host(chunk_node)
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
@allure.title("Lose node with chunk parity")
@pytest.mark.failover
def test_lose_node_with_parity_chunk(
self,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
default_user: User,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
disable_policer: None,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
with reporter.step("Put object in container."):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check chunk replication on 4 nodes."):
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
with reporter.step("Search node with parity chunk"):
chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid)
chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0]
chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid)
chunk_node = self.get_chunk_node(frostfs_cli, chunk)[0]
with reporter.step("Stop node parity chunk."):
cluster_state_controller.stop_node_host(chunk_node, "hard")
@ -239,33 +286,33 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stoped node, and check replication chunks."):
cluster_state_controller.start_node_host(chunk_node)
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
@allure.title("Lose nodes with chunk data and parity")
@pytest.mark.failover
def test_lose_nodes_data_chunk_and_parity(
self,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
default_user: User,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
disable_policer: None,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
with reporter.step("Put object in container."):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check count chunks, expect 4."):
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
with reporter.step("Search node data chunk and node parity chunk"):
data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid)
node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0]
parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid)
node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)[0]
data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid)
node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)[0]
parity_chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid)
node_parity_chunk = self.get_chunk_node(frostfs_cli, parity_chunk)[0]
with reporter.step("Stop node with data chunk."):
cluster_state_controller.stop_node_host(node_data_chunk, "hard")
@ -276,7 +323,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped host and check chunks."):
cluster_state_controller.start_node_host(node_data_chunk)
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
with reporter.step("Stop node with parity chunk and one all node."):
cluster_state_controller.stop_node_host(node_data_chunk, "hard")
@ -288,68 +335,73 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped nodes and check replication chunk."):
cluster_state_controller.start_stopped_hosts()
assert self.check_replication(4, frostfs_local_cli, cid, oid)
assert self.check_replication(4, frostfs_cli, cid, oid)
@allure.title("Policer work with chunk")
@pytest.mark.failover
def test_work_policer_with_nodes(
self,
simple_object_size: ObjectSize,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
default_user: User,
cluster_state_controller: ClusterStateController,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object on container."):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check count chunks nodes on 3."):
assert self.check_replication(3, frostfs_local_cli, cid, oid)
assert self.check_replication(3, frostfs_cli, cid, oid)
with reporter.step("Stop node with chunk."):
data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid)
first_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid)
node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0]
data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid)
first_all_chunks = self.get_all_chunks_object(frostfs_cli, cid, oid)
node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)[0]
cluster_state_controller.stop_node_host(node_data_chunk, "hard")
with reporter.step("Check replication chunk with different node."):
alive_endpoint = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0].storage_node.get_rpc_endpoint()
node = self.search_node_not_chunks(first_all_chunks, frostfs_local_cli, endpoint=alive_endpoint)[0]
second_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, node.storage_node.get_rpc_endpoint())
with reporter.step("Tick epoch and wait update network map."):
alive_node = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0]
self.wait_for_nodes_appears_in_map(frostfs_cli, alive_node, 3)
with reporter.step("Wait replication chunk with different node."):
node = self.search_node_not_chunks(first_all_chunks, frostfs_cli, endpoint=alive_node.storage_node.get_rpc_endpoint())[0]
self.wait_replication(3, frostfs_cli, cid, oid)
with reporter.step("Get new chunks"):
second_all_chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, node.storage_node.get_rpc_endpoint())
with reporter.step("Check that oid no change."):
oid_chunk_check = [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]
assert len(oid_chunk_check) > 0
assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]
with reporter.step("Start stopped host, and check delete 4 chunk."):
cluster_state_controller.start_node_host(node_data_chunk)
all_chunks_after_start_node = self.get_all_chunks_object(frostfs_local_cli, cid, oid)
all_chunks_after_start_node = self.get_all_chunks_object(frostfs_cli, cid, oid)
assert len(all_chunks_after_start_node) == 3
@allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size.name})")
@allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})")
def test_create_container_with_difference_count_nodes(
self,
node_count: int,
ec_policy: str,
object_size: ObjectSize,
default_user: User,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
) -> None:
with reporter.step("Create container."):
expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1])
if "complex" in object_size.name:
expected_chunks *= 4
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, ec_policy)
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, ec_policy)
with reporter.step("Put object in container."):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check count object chunks."):
chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
assert len(chunks) == expected_chunks
with reporter.step("get object and check hash."):
@ -359,154 +411,299 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request PUT with copies_number flag")
def test_put_object_with_copies_number(
self,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
default_user: User,
simple_object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container with copies number = 1"):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1)
with reporter.step("Check that count chunks > 1."):
chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
assert len(chunks) > 1
@allure.title("Request PUT and 1 node off")
@pytest.mark.failover
def test_put_object_with_off_cnr_node(
self,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
cluster_state_controller: ClusterStateController,
default_user: User,
simple_object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1")
with reporter.step("Stop one node in container nodes"):
cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard")
with reporter.step("Put object in container, expect error."):
with reporter.step("Put object in container, expect success for EC container."):
test_file = generate_file(simple_object_size.value)
with pytest.raises(RuntimeError, match="put single object on client"):
put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
@allure.title("Request DELETE (size={object_size.name})")
@allure.title("Request PUT (size={object_size})")
def test_put_object_with_ec_cnr(
self,
frostfs_cli: FrostfsCli,
default_user: User,
object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container with --prepare-locally."):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Get chunks object."):
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
with reporter.step("Check header chunks object"):
for chunk in chunks:
chunk_head = frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, raw=True).stdout
assert "EC header:" in chunk_head
@allure.title("Request PUT with copies number")
def test_put_object_with_copies_number(
self,
frostfs_cli: FrostfsCli,
default_user: User,
simple_object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container with --copies-number=1."):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1)
with reporter.step("Check len chunks object."):
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
assert len(chunks) > 1
@allure.title("Request GET (size={object_size})")
def test_get_object_in_ec_cnr(
self,
default_user: User,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 CBF 1")
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
hash_origin_file = get_file_hash(test_file)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Get id all chunks."):
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
with reporter.step("Search chunk node and not chunks node."):
chunk_node = self.get_chunk_node(frostfs_cli, chunks[0])
not_chunk_node = self.search_node_not_chunks(chunks, frostfs_cli, self.cluster.default_rpc_endpoint)
with reporter.step("GET request with chunk node, expect success"):
file_one = get_object(default_user.wallet, cid, oid, self.shell, chunk_node[0].storage_node.get_rpc_endpoint())
hash_file_one = get_file_hash(file_one)
assert hash_file_one == hash_origin_file
with reporter.step("Get request with not chunk node"):
file_two = get_object(default_user.wallet, cid, oid, self.shell, not_chunk_node[0].storage_node.get_rpc_endpoint())
hash_file_two = get_file_hash(file_two)
assert hash_file_two == hash_file_one == hash_origin_file
@allure.title("Request SEARCH with flags 'root' (size={object_size})")
def test_search_object_in_ec_cnr_root_flags(
self,
default_user: User,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation with --root flags"):
search_output = frostfs_cli.object.search(
self.cluster.default_rpc_endpoint, cid, root=True, timeout=CLI_DEFAULT_TIMEOUT
).stdout.split("\n")[1:]
assert search_output[0] == oid
@allure.title("Request SEARCH check valid chunk id (size={object_size})")
def test_search_object_in_ec_cnr_chunk_id(
self,
default_user: User,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation object"):
search_output = frostfs_cli.object.search(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT).stdout
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
for chunk in chunks:
assert chunk.object_id in search_output
@allure.title("Request SEARCH check no chunk index info (size={object_size})")
def test_search_object_in_ec_cnr(
self,
default_user: User,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation all chunk"):
chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
for chunk in chunks:
chunk_search = frostfs_cli.object.search(
self.cluster.default_rpc_endpoint, cid, oid=chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT
).stdout
assert "index" not in chunk_search
@allure.title("Request DELETE (size={object_size})")
@pytest.mark.failover
def test_delete_object_in_ec_cnr(
self,
default_user: User,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container."):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check object chunks nodes."):
chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
chunks_object = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
replication_count = 3 if object_size.name == "simple" else 3 * 4
assert len(chunks_object) == replication_count
with reporter.step("Delete object"):
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid)
frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT)
with reporter.step("Check that delete all chunks."):
for chunk in chunks_object:
with pytest.raises(RuntimeError, match="object already removed"):
frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id)
frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT)
with reporter.step("Put second object."):
oid_second = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check second object chunks nodes."):
chunks_second_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid_second, self.cluster.default_rpc_endpoint)
chunks_second_object = self.get_all_chunks_object(frostfs_cli, cid, oid_second, self.cluster.default_rpc_endpoint)
assert len(chunks_second_object) == replication_count
with reporter.step("Stop nodes with chunk."):
chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_second_object[0])
chunk_node = self.get_chunk_node(frostfs_cli, chunks_second_object[0])
cluster_state_controller.stop_node_host(chunk_node[0], "hard")
with reporter.step("Delete second object"):
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid_second)
cluster_nodes = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})
frostfs_cli.object.delete(cluster_nodes[0].storage_node.get_rpc_endpoint(), cid, oid_second, timeout=CLI_DEFAULT_TIMEOUT)
with reporter.step("Check that delete all chunk second object."):
for chunk in chunks_second_object:
with pytest.raises(RuntimeError, match="object already removed"):
frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id)
frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT)
@allure.title("Request LOCK (size={object_size.name})")
@allure.title("Request LOCK (size={object_size})")
@pytest.mark.failover
def test_lock_object_in_ec_cnr(
self,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
default_user: User,
cluster_state_controller: ClusterStateController,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1")
with reporter.step("Put object in container."):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check object chunks nodes."):
chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
chunks_object = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
replication_count = 3 if object_size.name == "simple" else 3 * 4
assert len(chunks_object) == replication_count
with reporter.step("Put LOCK in object."):
epoch = frostfs_local_cli.netmap.epoch(self.cluster.default_rpc_endpoint).stdout.strip()
frostfs_local_cli.object.lock(self.cluster.default_rpc_endpoint, cid, oid, expire_at=int(epoch) + 5).stdout
epoch = frostfs_cli.netmap.epoch(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout.strip()
frostfs_cli.object.lock(
self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT, expire_at=(int(epoch) + 5)
).stdout
with reporter.step("Check LOCK in object"):
chunks = frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, oid, raw=True).stdout.strip().split(" ")
oids_chunks = [chunk.strip() for chunk in chunks if len(chunk) > 35]
for chunk_id in oids_chunks:
with pytest.raises(RuntimeError, match="could not delete objects"):
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id)
with reporter.step("Check don`t delete chunk"):
for chunk in chunks_object:
with pytest.raises(RuntimeError, match="Lock EC chunk failed"):
frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT)
with reporter.step("Check enable LOCK object"):
with pytest.raises(RuntimeError, match="object is locked"):
frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT)
with reporter.step("Stop chunk node."):
chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_object[0])
chunk_node = self.get_chunk_node(frostfs_cli, chunks_object[0])
cluster_state_controller.stop_node_host(chunk_node[0], "hard")
cluster_state_controller.start_node_host(chunk_node[0])
with reporter.step("Check LOCK in object."):
chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
for chunk_id in oids_chunks:
with pytest.raises(RuntimeError, match="could not delete objects"):
frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id)
with reporter.step("Check don`t delete chunk."):
for chunk in chunks_object:
with pytest.raises(RuntimeError, match="Lock EC chunk failed"):
frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT)
@allure.title("Output MaxEC* params in frostfscli (type={type_shards})")
with reporter.step("Check enable LOCK object"):
with pytest.raises(RuntimeError, match="object is locked"):
frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT)
@allure.title("Output MaxEC* params in frostf-scli (type={type_shards})")
@pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"])
def test_maxec_info_with_output_cli(self, frostfs_local_cli: FrostfsCli, type_shards: str) -> None:
def test_maxec_info_with_output_cli(self, frostfs_cli: FrostfsCli, type_shards: str) -> None:
with reporter.step("Get and check params"):
net_info = frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout
net_info = frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout
assert type_shards in net_info
@allure.title("Change MaxEC*Count params")
def test_change_max_data_shards_params(
self,
frostfs_remote_adm: FrostfsAdm,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
restore_network_config: None,
) -> None:
with reporter.step("Get now params MaxECDataCount and MaxECParityCount"):
node_netinfo = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout)
node_netinfo = NetmapParser.netinfo(
frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout
)
with reporter.step("Change params"):
frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"')
with reporter.step("Get update params"):
update_net_info = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout)
update_net_info = NetmapParser.netinfo(
frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout
)
with reporter.step("Check old and new params difference"):
assert (
@ -528,14 +725,18 @@ class TestECReplication(ClusterTestBase):
def test_create_container_with_select(
self,
select: int,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
) -> None:
with reporter.step("Create container"):
policy = f"EC 1.1 CBF 1 SELECT {select} FROM *"
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy)
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy)
with reporter.step("Check container nodes decomposed"):
container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:]
container_nodes = (
frostfs_cli.container.search_node(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT)
.stdout.strip()
.split("\n")[1:]
)
assert len(container_nodes) == select
@allure.title("Create container with EC policy and CBF (CBF={cbf})")
@ -544,35 +745,176 @@ class TestECReplication(ClusterTestBase):
self,
cbf: int,
expected_nodes: int,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
) -> None:
with reporter.step("Create container."):
policy = f"EC 1.1 CBF {cbf}"
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy)
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy)
with reporter.step("Check expected container nodes."):
container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:]
container_nodes = (
frostfs_cli.container.search_node(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT)
.stdout.strip()
.split("\n")[1:]
)
assert len(container_nodes) == expected_nodes
@allure.title("Create container with EC policy and FILTER")
def test_create_container_with_filter(
self,
default_user: User,
frostfs_local_cli: FrostfsCli,
frostfs_cli: FrostfsCli,
simple_object_size: ObjectSize,
) -> None:
with reporter.step("Create Container."):
policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"
cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy)
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy)
with reporter.step("Put object in container."):
test_file = generate_file(simple_object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Check object is decomposed exclusively on Russian nodes"):
data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint)
node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)
node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)
data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
parity_chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)
node_parity_chunk = self.get_chunk_node(frostfs_cli, parity_chunk)
for node in [node_data_chunk[1], node_parity_chunk[1]]:
assert "Russia" in node.country
@allure.title("Evacuation shard with chunk (type={type})")
@pytest.mark.parametrize("type, get_chunk", [("data", get_data_chunk_object), ("parity", get_parity_chunk_object)])
def test_evacuation_data_shard(
self,
default_user: User,
frostfs_cli: FrostfsCli,
max_object_size: int,
type: str,
get_chunk,
) -> None:
with reporter.step("Create container."):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 1.1 CBF 1")
with reporter.step("Put object in container."):
test_file = generate_file(max_object_size - 1000)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Get object chunks."):
chunk = get_chunk(self, frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
chunk_node = self.get_chunk_node(frostfs_cli, chunk)
frostfs_node_cli = FrostfsCli(
chunk_node[0].host.get_shell(),
frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(),
)
with reporter.step("Search shards chunk"):
shard_id = self.get_shard_chunk(chunk_node[0], chunk)
with reporter.step("Enable evacuation for shard"):
frostfs_node_cli.shards.set_mode(chunk_node[0].storage_node.get_control_endpoint(), mode="read-only", id=shard_id)
frostfs_node_cli.shards.evacuation_start(chunk_node[0].storage_node.get_control_endpoint(), shard_id, await_mode=True)
with reporter.step("Get object after evacuation shard"):
get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint)
@allure.title("[NEGATIVE] Don`t create more 1 EC policy")
def test_more_one_ec_policy(
self,
frostfs_cli: FrostfsCli,
) -> None:
with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"):
with pytest.raises(RuntimeError, match="can't parse placement policy"):
self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *")
@allure.title("Create bucket with EC policy (s3_client={s3_client})")
@pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True)
def test_create_bucket_with_ec_location(
self,
s3_client: S3ClientWrapper,
bucket_container_resolver: BucketContainerResolver,
frostfs_cli: FrostfsCli,
) -> None:
with reporter.step("Create bucket with EC location constrain"):
bucket = s3_client.create_bucket(location_constraint="ec3.1")
with reporter.step("Resolve container bucket"):
cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
with reporter.step("Validate container policy"):
container = frostfs_cli.container.get(
self.cluster.default_rpc_endpoint, cid, json_mode=True, timeout=CLI_DEFAULT_TIMEOUT
).stdout
assert container
@allure.title("[NEGATIVE] Don`t create more 1 EC policy")
def test_more_one_ec_policy(
self,
frostfs_cli: FrostfsCli,
) -> None:
with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"):
with pytest.raises(RuntimeError, match="can't parse placement policy"):
self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *")
@allure.title("Bucket object count chunks (s3_client={s3_client}, size={object_size})")
@pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True)
def test_count_chunks_bucket_with_ec_location(
self,
s3_client: S3ClientWrapper,
bucket_container_resolver: BucketContainerResolver,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
) -> None:
with reporter.step("Create bucket with EC location constrain"):
bucket = s3_client.create_bucket(location_constraint="ec3.1")
with reporter.step("Enable versioning object"):
s3_client.put_bucket_versioning(bucket, VersioningStatus.ENABLED)
bucket_status = s3_client.get_bucket_versioning_status(bucket)
assert bucket_status == VersioningStatus.ENABLED.value
with reporter.step("Put object in bucket"):
test_file = generate_file(object_size.value)
bucket_object = s3_client.put_object(bucket, test_file)
with reporter.step("Watch replication count chunks"):
cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
chunks = self.get_all_chunks_object(frostfs_cli, cid, bucket_object, self.cluster.default_rpc_endpoint)
expect_chunks = 4 if object_size.name == "simple" else 16
assert len(chunks) == expect_chunks
@allure.title("Replication chunk after drop (size={object_size})")
def test_drop_chunk_and_replication(
self,
frostfs_cli: FrostfsCli,
default_user: User,
object_size: ObjectSize,
rep_count: int,
) -> None:
with reporter.step("Create container"):
cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 CBF 1")
with reporter.step("Put object"):
test_file = generate_file(object_size.value)
oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint)
with reporter.step("Get all chunks"):
chunk = self.get_data_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint)
with reporter.step("Search chunk node"):
chunk_node = self.get_chunk_node(frostfs_cli, chunk)
shell_chunk_node = chunk_node[0].host.get_shell()
with reporter.step("Get replication count"):
assert self.check_replication(rep_count, frostfs_cli, cid, oid)
with reporter.step("Delete chunk"):
frostfs_node_cli = FrostfsCli(
shell_chunk_node,
frostfs_cli_exec_path=FROSTFS_CLI_EXEC,
config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(),
)
frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{cid}/{chunk.object_id}")
with reporter.step("Wait replication count after drop one chunk"):
self.wait_replication(rep_count, frostfs_cli, cid, oid)