[#156] services/tree: Split syncLoop() in functions

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-03-21 16:51:21 +03:00 committed by Gitea
parent 47e8c5bf23
commit 5368c4207a

View file

@ -246,7 +246,6 @@ func (s *Service) SynchronizeAll() error {
} }
} }
// nolint: funlen, gocognit
func (s *Service) syncLoop(ctx context.Context) { func (s *Service) syncLoop(ctx context.Context) {
for { for {
select { select {
@ -263,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) {
continue continue
} }
newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) newMap, cnrsToSync := s.containersToSync(cnrs)
cnrsToSync := make([]cid.ID, 0, len(cnrs))
var removed []cid.ID s.syncContainers(ctx, cnrsToSync)
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 { s.removeContainers(ctx, newMap)
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrsToSync {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
s.cnrMapMtx.Lock()
for cnr := range s.cnrMap {
if _, ok := newMap[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
s.cnrMapMtx.Unlock()
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err = s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
}
s.log.Debug("trees have been synchronized") s.log.Debug("trees have been synchronized")
} }
} }
} }
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrs {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
s.cnrMapMtx.Lock()
defer s.cnrMapMtx.Unlock()
var removed []cid.ID
for cnr := range s.cnrMap {
if _, ok := newContainers[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err := s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
}
}
}
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 {
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
return newMap, cnrsToSync
}
// randomizeNodeOrder shuffles nodes and removes not a `pos` index. // randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes). // It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo { func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {