[#2165] services/tree: Remember starting height for the synchronization
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
06137dbf8e
commit
f9fcd85363
2 changed files with 61 additions and 22 deletions
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
@ -28,7 +29,13 @@ type Service struct {
|
||||||
|
|
||||||
syncChan chan struct{}
|
syncChan chan struct{}
|
||||||
syncPool *ants.Pool
|
syncPool *ants.Pool
|
||||||
cnrMap map[cidSDK.ID]struct{}
|
|
||||||
|
// cnrMap maps contrainer and tree ID to the minimum height which was fetched from _each_ client.
|
||||||
|
// This allows us to better handle split-brain scenario, because we always synchronize
|
||||||
|
// from the last seen height. The inner map is read-only and should not be modified in-place.
|
||||||
|
cnrMap map[cidSDK.ID]map[string]uint64
|
||||||
|
// cnrMapMtx protects cnrMap
|
||||||
|
cnrMapMtx sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ TreeServiceServer = (*Service)(nil)
|
var _ TreeServiceServer = (*Service)(nil)
|
||||||
|
@ -54,7 +61,7 @@ func New(opts ...Option) *Service {
|
||||||
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
||||||
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
|
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
|
||||||
s.containerCache.init(s.containerCacheSize)
|
s.containerCache.init(s.containerCacheSize)
|
||||||
s.cnrMap = make(map[cidSDK.ID]struct{})
|
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
|
||||||
s.syncChan = make(chan struct{})
|
s.syncChan = make(chan struct{})
|
||||||
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
|
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -85,15 +86,31 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
||||||
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
|
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tid := range treesToSync {
|
s.cnrMapMtx.Lock()
|
||||||
err = s.synchronizeTree(ctx, d, tid, nodes)
|
oldStatus := s.cnrMap[cid]
|
||||||
if err != nil {
|
s.cnrMapMtx.Unlock()
|
||||||
s.log.Error("could not sync tree",
|
|
||||||
zap.Stringer("cid", cid),
|
syncStatus := map[string]uint64{}
|
||||||
zap.String("treeID", tid))
|
for i := range treesToSync {
|
||||||
|
syncStatus[treesToSync[i]] = 0
|
||||||
|
}
|
||||||
|
for tid := range oldStatus {
|
||||||
|
if _, ok := syncStatus[tid]; ok {
|
||||||
|
syncStatus[tid] = oldStatus[tid]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, tid := range treesToSync {
|
||||||
|
h := s.synchronizeTree(ctx, d, syncStatus[tid], tid, nodes)
|
||||||
|
if syncStatus[tid] < h {
|
||||||
|
syncStatus[tid] = h
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.cnrMapMtx.Lock()
|
||||||
|
s.cnrMap[cid] = syncStatus
|
||||||
|
s.cnrMapMtx.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,18 +135,20 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.synchronizeTree(ctx, d, treeID, nodes)
|
s.synchronizeTree(ctx, d, 0, treeID, nodes)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64,
|
||||||
treeID string, nodes []netmapSDK.NodeInfo) error {
|
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
||||||
lm, err := s.forest.TreeGetOpLog(d.CID, treeID, 0)
|
s.log.Debug("synchronize tree",
|
||||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
zap.Stringer("cid", d.CID),
|
||||||
return err
|
zap.String("tree", treeID),
|
||||||
}
|
zap.Uint64("from", from))
|
||||||
|
|
||||||
height := lm.Time + 1
|
newHeight := uint64(math.MaxUint64)
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
|
height := from
|
||||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||||
var a network.Address
|
var a network.Address
|
||||||
if err := a.FromString(addr); err != nil {
|
if err := a.FromString(addr); err != nil {
|
||||||
|
@ -155,8 +174,16 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
if height <= from { // do not increase starting height on fail
|
||||||
|
newHeight = from
|
||||||
|
} else if height < newHeight { // take minimum across all clients
|
||||||
|
newHeight = height
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
if newHeight == math.MaxUint64 {
|
||||||
|
newHeight = from
|
||||||
|
}
|
||||||
|
return newHeight
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
||||||
|
@ -254,14 +281,14 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
||||||
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
||||||
|
|
||||||
|
var removed []cid.ID
|
||||||
for _, cnr := range cnrs {
|
for _, cnr := range cnrs {
|
||||||
_, pos, err := s.getContainerNodes(cnr)
|
_, pos, err := s.getContainerNodes(cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("could not calculate container nodes",
|
s.log.Error("could not calculate container nodes",
|
||||||
zap.Stringer("cid", cnr),
|
zap.Stringer("cid", cnr),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
delete(s.cnrMap, cnr)
|
removed = append(removed, cnr)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,12 +330,19 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// remove stored redundant trees
|
s.cnrMapMtx.Lock()
|
||||||
for cnr := range s.cnrMap {
|
for cnr := range s.cnrMap {
|
||||||
if _, ok := newMap[cnr]; ok {
|
if _, ok := newMap[cnr]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
removed = append(removed, cnr)
|
||||||
|
}
|
||||||
|
for i := range removed {
|
||||||
|
delete(s.cnrMap, removed[i])
|
||||||
|
}
|
||||||
|
s.cnrMapMtx.Unlock()
|
||||||
|
|
||||||
|
for _, cnr := range removed {
|
||||||
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
||||||
|
|
||||||
err = s.DropTree(ctx, cnr, "")
|
err = s.DropTree(ctx, cnr, "")
|
||||||
|
@ -320,8 +354,6 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cnrMap = newMap
|
|
||||||
|
|
||||||
s.log.Debug("trees have been synchronized")
|
s.log.Debug("trees have been synchronized")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue