diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 0368e7586a..260bffba5e 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "sync" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -28,7 +29,13 @@ type Service struct { syncChan chan struct{} syncPool *ants.Pool - cnrMap map[cidSDK.ID]struct{} + + // cnrMap maps contrainer and tree ID to the minimum height which was fetched from _each_ client. + // This allows us to better handle split-brain scenario, because we always synchronize + // from the last seen height. The inner map is read-only and should not be modified in-place. + cnrMap map[cidSDK.ID]map[string]uint64 + // cnrMapMtx protects cnrMap + cnrMapMtx sync.Mutex } var _ TreeServiceServer = (*Service)(nil) @@ -54,7 +61,7 @@ func New(opts ...Option) *Service { s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.containerCache.init(s.containerCacheSize) - s.cnrMap = make(map[cidSDK.ID]struct{}) + s.cnrMap = make(map[cidSDK.ID]map[string]uint64) s.syncChan = make(chan struct{}) s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index f2695e49df..1dbd336ab5 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "sync" @@ -85,15 +86,31 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { return fmt.Errorf("could not fetch tree ID list: %w", outErr) } - for _, tid := range treesToSync { - err = s.synchronizeTree(ctx, d, tid, nodes) - if err != nil { - s.log.Error("could not sync tree", - zap.Stringer("cid", cid), - zap.String("treeID", tid)) + s.cnrMapMtx.Lock() + oldStatus := s.cnrMap[cid] + s.cnrMapMtx.Unlock() + + syncStatus := map[string]uint64{} + for i := range treesToSync { + syncStatus[treesToSync[i]] = 0 + } + for tid := range oldStatus { + if _, ok := syncStatus[tid]; ok { + syncStatus[tid] = oldStatus[tid] } } + for _, tid := range treesToSync { + h := s.synchronizeTree(ctx, d, syncStatus[tid], tid, nodes) + if syncStatus[tid] < h { + syncStatus[tid] = h + } + } + + s.cnrMapMtx.Lock() + s.cnrMap[cid] = syncStatus + s.cnrMapMtx.Unlock() + return nil } @@ -118,18 +135,20 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string return nil } - return s.synchronizeTree(ctx, d, treeID, nodes) + s.synchronizeTree(ctx, d, 0, treeID, nodes) + return nil } -func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, - treeID string, nodes []netmapSDK.NodeInfo) error { - lm, err := s.forest.TreeGetOpLog(d.CID, treeID, 0) - if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { - return err - } +func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64, + treeID string, nodes []netmapSDK.NodeInfo) uint64 { + s.log.Debug("synchronize tree", + zap.Stringer("cid", d.CID), + zap.String("tree", treeID), + zap.Uint64("from", from)) - height := lm.Time + 1 + newHeight := uint64(math.MaxUint64) for _, n := range nodes { + height := from n.IterateNetworkEndpoints(func(addr string) bool { var a network.Address if err := a.FromString(addr); err != nil { @@ -155,8 +174,16 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, } } }) + if height <= from { // do not increase starting height on fail + newHeight = from + } else if height < newHeight { // take minimum across all clients + newHeight = height + } } - return nil + if newHeight == math.MaxUint64 { + newHeight = from + } + return newHeight } func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) { @@ -254,14 +281,14 @@ func (s *Service) syncLoop(ctx context.Context) { newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) cnrsToSync := make([]cid.ID, 0, len(cnrs)) + var removed []cid.ID for _, cnr := range cnrs { _, pos, err := s.getContainerNodes(cnr) if err != nil { s.log.Error("could not calculate container nodes", zap.Stringer("cid", cnr), zap.Error(err)) - delete(s.cnrMap, cnr) - + removed = append(removed, cnr) continue } @@ -303,12 +330,19 @@ func (s *Service) syncLoop(ctx context.Context) { } wg.Wait() - // remove stored redundant trees + s.cnrMapMtx.Lock() for cnr := range s.cnrMap { if _, ok := newMap[cnr]; ok { continue } + removed = append(removed, cnr) + } + for i := range removed { + delete(s.cnrMap, removed[i]) + } + s.cnrMapMtx.Unlock() + for _, cnr := range removed { s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr)) err = s.DropTree(ctx, cnr, "") @@ -320,8 +354,6 @@ func (s *Service) syncLoop(ctx context.Context) { } } - s.cnrMap = newMap - s.log.Debug("trees have been synchronized") } }