diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 4bc7eac1..457228bb 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" @@ -57,35 +58,89 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from one shard to the others. // The shard being moved must be in read-only mode. -// -// nolint: funlen, gocognit func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { - sidList := make([]string, len(prm.shardID)) + shardIDs := make([]string, len(prm.shardID)) for i := range prm.shardID { - sidList[i] = prm.shardID[i].String() + shardIDs[i] = prm.shardID[i].String() } + shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil) + if err != nil { + return EvacuateShardRes{}, err + } + + shardsToEvacuate := make(map[string]*shard.Shard) + for i := range shardIDs { + for j := range shards { + if shards[j].ID().String() == shardIDs[i] { + shardsToEvacuate[shardIDs[i]] = shards[j].Shard + } + } + } + + e.log.Info("started shards evacuation", zap.Strings("shard_ids", shardIDs)) + + var res EvacuateShardRes + + for _, shardID := range shardIDs { + if err = e.evacuateShard(shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil { + return res, err + } + } + + e.log.Info("finished shards evacuation", zap.Strings("shard_ids", shardIDs)) + return res, nil +} + +func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { + var listPrm shard.ListWithCursorPrm + listPrm.WithCount(defaultEvacuateBatchSize) + + sh := shardsToEvacuate[shardID] + + var c *meta.Cursor + for { + listPrm.WithCursor(c) + + // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes + // because ListWithCursor works only with the metabase. + listRes, err := sh.ListWithCursor(listPrm) + if err != nil { + if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { + break + } + return err + } + + if err = e.evacuateObjects(sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil { + return err + } + + c = listRes.Cursor() + } + return nil +} + +func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) { e.mtx.RLock() - for i := range sidList { - sh, ok := e.shards[sidList[i]] + defer e.mtx.RUnlock() + + for i := range shardIDs { + sh, ok := e.shards[shardIDs[i]] if !ok { - e.mtx.RUnlock() - return EvacuateShardRes{}, errShardNotFound + return nil, nil, errShardNotFound } if !sh.GetMode().ReadOnly() { - e.mtx.RUnlock() - return EvacuateShardRes{}, shard.ErrMustBeReadOnly + return nil, nil, shard.ErrMustBeReadOnly } } - if len(e.shards)-len(sidList) < 1 && prm.handler == nil { - e.mtx.RUnlock() - return EvacuateShardRes{}, errMustHaveTwoShards + if len(e.shards)-len(shardIDs) < 1 && !handlerDefined { + return nil, nil, errMustHaveTwoShards } - e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList)) - // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. @@ -96,100 +151,69 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) pool: e.shardPools[id], }) } - e.mtx.RUnlock() weights := make([]float64, 0, len(shards)) for i := range shards { weights = append(weights, e.shardWeight(shards[i].Shard)) } - shardMap := make(map[string]*shard.Shard) - for i := range sidList { - for j := range shards { - if shards[j].ID().String() == sidList[i] { - shardMap[sidList[i]] = shards[j].Shard + return shards, weights, nil +} + +func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error { + for i := range toEvacuate { + addr := toEvacuate[i].Address + + var getPrm shard.GetPrm + getPrm.SetAddress(addr) + + getRes, err := sh.Get(getPrm) + if err != nil { + if prm.ignoreErrors { + continue } + return err } + + if e.tryEvacuateObject(addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) { + continue + } + + if prm.handler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) + } + + err = prm.handler(addr, getRes.Object()) + if err != nil { + return err + } + res.count++ } + return nil +} - var listPrm shard.ListWithCursorPrm - listPrm.WithCount(defaultEvacuateBatchSize) - - var res EvacuateShardRes - -mainLoop: - for n := range sidList { - sh := shardMap[sidList[n]] - - var c *meta.Cursor - for { - listPrm.WithCursor(c) - - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { - continue mainLoop - } - return res, err - } - - // TODO (@fyrchik): #1731 parallelize the loop - lst := listRes.AddressList() - - loop: - for i := range lst { - addr := lst[i].Address - - var getPrm shard.GetPrm - getPrm.SetAddress(addr) - - getRes, err := sh.Get(getPrm) - if err != nil { - if prm.ignoreErrors { - continue - } - return res, err - } - - hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) - for j := range shards { - if _, ok := shardMap[shards[j].ID().String()]; ok { - continue - } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object()) - if putDone || exists { - if putDone { - e.log.Debug("object is moved to another shard", - zap.String("from", sidList[n]), - zap.Stringer("to", shards[j].ID()), - zap.Stringer("addr", addr)) - - res.count++ - } - continue loop - } - } - - if prm.handler == nil { - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) - } - - err = prm.handler(addr, getRes.Object()) - if err != nil { - return res, err - } +func (e *StorageEngine) tryEvacuateObject(addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes, + shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool { + hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) + for j := range shards { + if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { + continue + } + putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, object) + if putDone || exists { + if putDone { + e.log.Debug("object is moved to another shard", + zap.Stringer("from", sh.ID()), + zap.Stringer("to", shards[j].ID()), + zap.Stringer("addr", addr)) res.count++ } - - c = listRes.Cursor() + return true } } - e.log.Info("finished shards evacuation", - zap.Strings("shard_ids", sidList)) - return res, nil + return false }