From 5368c4207a710919bb8fea902a6c632baa8ca431 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 21 Mar 2023 16:51:21 +0300 Subject: [PATCH] [#156] services/tree: Split syncLoop() in functions Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/sync.go | 158 ++++++++++++++++++++------------------ 1 file changed, 85 insertions(+), 73 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 9fe24fed..d4ef7df5 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -246,7 +246,6 @@ func (s *Service) SynchronizeAll() error { } } -// nolint: funlen, gocognit func (s *Service) syncLoop(ctx context.Context) { for { select { @@ -263,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) { continue } - newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) - cnrsToSync := make([]cid.ID, 0, len(cnrs)) + newMap, cnrsToSync := s.containersToSync(cnrs) - var removed []cid.ID - for _, cnr := range cnrs { - _, pos, err := s.getContainerNodes(cnr) - if err != nil { - s.log.Error("could not calculate container nodes", - zap.Stringer("cid", cnr), - zap.Error(err)) - continue - } + s.syncContainers(ctx, cnrsToSync) - if pos < 0 { - // node is not included in the container. - continue - } - - newMap[cnr] = struct{}{} - cnrsToSync = append(cnrsToSync, cnr) - } - - // sync new containers - var wg sync.WaitGroup - for _, cnr := range cnrsToSync { - wg.Add(1) - cnr := cnr - err := s.syncPool.Submit(func() { - defer wg.Done() - s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) - - err := s.synchronizeAllTrees(ctx, cnr) - if err != nil { - s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) - return - } - - s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) - }) - if err != nil { - wg.Done() - s.log.Error("could not query trees for synchronization", - zap.Stringer("cid", cnr), - zap.Error(err)) - if errors.Is(err, ants.ErrPoolClosed) { - return - } - } - } - wg.Wait() - - s.cnrMapMtx.Lock() - for cnr := range s.cnrMap { - if _, ok := newMap[cnr]; ok { - continue - } - removed = append(removed, cnr) - } - for i := range removed { - delete(s.cnrMap, removed[i]) - } - s.cnrMapMtx.Unlock() - - for _, cnr := range removed { - s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr)) - - err = s.DropTree(ctx, cnr, "") - if err != nil { - s.log.Error("could not remove redundant tree", - zap.Stringer("cid", cnr), - zap.Error(err)) - continue - } - } + s.removeContainers(ctx, newMap) s.log.Debug("trees have been synchronized") } } } +func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) { + // sync new containers + var wg sync.WaitGroup + for _, cnr := range cnrs { + wg.Add(1) + cnr := cnr + err := s.syncPool.Submit(func() { + defer wg.Done() + s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) + + err := s.synchronizeAllTrees(ctx, cnr) + if err != nil { + s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) + return + } + + s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) + }) + if err != nil { + wg.Done() + s.log.Error("could not query trees for synchronization", + zap.Stringer("cid", cnr), + zap.Error(err)) + if errors.Is(err, ants.ErrPoolClosed) { + return + } + } + } + wg.Wait() +} + +func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) { + s.cnrMapMtx.Lock() + defer s.cnrMapMtx.Unlock() + + var removed []cid.ID + for cnr := range s.cnrMap { + if _, ok := newContainers[cnr]; ok { + continue + } + removed = append(removed, cnr) + } + for i := range removed { + delete(s.cnrMap, removed[i]) + } + + for _, cnr := range removed { + s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr)) + + err := s.DropTree(ctx, cnr, "") + if err != nil { + s.log.Error("could not remove redundant tree", + zap.Stringer("cid", cnr), + zap.Error(err)) + } + } +} + +func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) { + newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) + cnrsToSync := make([]cid.ID, 0, len(cnrs)) + + for _, cnr := range cnrs { + _, pos, err := s.getContainerNodes(cnr) + if err != nil { + s.log.Error("could not calculate container nodes", + zap.Stringer("cid", cnr), + zap.Error(err)) + continue + } + + if pos < 0 { + // node is not included in the container. + continue + } + + newMap[cnr] = struct{}{} + cnrsToSync = append(cnrsToSync, cnr) + } + return newMap, cnrsToSync +} + // randomizeNodeOrder shuffles nodes and removes not a `pos` index. // It is assumed that 0 <= pos < len(nodes). func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {