Fix multiple services start (copy array for upstream functions) #67
1 changed files with 7 additions and 20 deletions
|
@ -1,13 +1,13 @@
|
||||||
|
import copy
|
||||||
import time
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
|
|
||||||
import frostfs_testlib.resources.optionals as optionals
|
import frostfs_testlib.resources.optionals as optionals
|
||||||
from frostfs_testlib.reporter import get_reporter
|
from frostfs_testlib.reporter import get_reporter
|
||||||
from frostfs_testlib.shell import CommandOptions, Shell
|
from frostfs_testlib.shell import CommandOptions, Shell
|
||||||
from frostfs_testlib.steps import epoch
|
|
||||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||||
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
from frostfs_testlib.testing import parallel
|
||||||
|
from frostfs_testlib.testing.test_control import run_optionally
|
||||||
from frostfs_testlib.utils.failover_utils import (
|
from frostfs_testlib.utils.failover_utils import (
|
||||||
wait_all_storage_nodes_returned,
|
wait_all_storage_nodes_returned,
|
||||||
wait_for_host_offline,
|
wait_for_host_offline,
|
||||||
|
@ -139,15 +139,8 @@ class ClusterStateController:
|
||||||
# In case if we stopped couple services, for example (s01-s04):
|
# In case if we stopped couple services, for example (s01-s04):
|
||||||
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
||||||
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
||||||
# So in order to make sure that services are at least attempted to be started, using threads here.
|
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
||||||
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
|
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
|
||||||
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
|
|
||||||
|
|
||||||
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
|
||||||
# But will be thrown here.
|
|
||||||
# Not ideal solution, but okay for now
|
|
||||||
for _ in start_result:
|
|
||||||
pass
|
|
||||||
|
|
||||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||||
self.stopped_storage_nodes = []
|
self.stopped_storage_nodes = []
|
||||||
|
@ -170,14 +163,8 @@ class ClusterStateController:
|
||||||
if not self.stopped_s3_gates:
|
if not self.stopped_s3_gates:
|
||||||
return
|
return
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor:
|
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
|
||||||
start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates)
|
self.stopped_s3_gates = []
|
||||||
|
|
||||||
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
|
||||||
# But will be thrown here.
|
|
||||||
# Not ideal solution, but okay for now
|
|
||||||
for _ in start_result:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||||
@reporter.step_deco("Suspend {process_name} service in {node}")
|
@reporter.step_deco("Suspend {process_name} service in {node}")
|
||||||
|
|
Loading…
Reference in a new issue