Fix multiple services start (copy array for upstream functions) #67
1 changed files with 7 additions and 20 deletions
|
@ -1,13 +1,13 @@
|
|||
import copy
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import frostfs_testlib.resources.optionals as optionals
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.shell import CommandOptions, Shell
|
||||
from frostfs_testlib.steps import epoch
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||
from frostfs_testlib.testing.test_control import run_optionally, wait_for_success
|
||||
from frostfs_testlib.testing import parallel
|
||||
from frostfs_testlib.testing.test_control import run_optionally
|
||||
from frostfs_testlib.utils.failover_utils import (
|
||||
wait_all_storage_nodes_returned,
|
||||
wait_for_host_offline,
|
||||
|
@ -139,15 +139,8 @@ class ClusterStateController:
|
|||
# In case if we stopped couple services, for example (s01-s04):
|
||||
# After starting only s01, it may require connections to s02-s04, which is still down, and fail to start.
|
||||
# Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state.
|
||||
# So in order to make sure that services are at least attempted to be started, using threads here.
|
||||
with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor:
|
||||
start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes)
|
||||
|
||||
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
||||
# But will be thrown here.
|
||||
# Not ideal solution, but okay for now
|
||||
for _ in start_result:
|
||||
pass
|
||||
# So in order to make sure that services are at least attempted to be started, using parallel runs here.
|
||||
parallel(self.start_storage_service, copy.copy(self.stopped_storage_nodes))
|
||||
|
||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||
self.stopped_storage_nodes = []
|
||||
|
@ -170,14 +163,8 @@ class ClusterStateController:
|
|||
if not self.stopped_s3_gates:
|
||||
return
|
||||
|
||||
with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor:
|
||||
start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates)
|
||||
|
||||
# Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor,
|
||||
# But will be thrown here.
|
||||
# Not ideal solution, but okay for now
|
||||
for _ in start_result:
|
||||
pass
|
||||
parallel(self.start_s3_gate, copy.copy(self.stopped_s3_gates))
|
||||
self.stopped_s3_gates = []
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Suspend {process_name} service in {node}")
|
||||
|
|
Loading…
Reference in a new issue