[#188] engine: Refactor shard evacuation

Resolve funlen and gocognit linter for StorageEngine.Evacuate method

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-03-31 11:33:08 +03:00
parent 456bc097f7
commit 5a66db80c5

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
@ -57,35 +58,89 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others. // Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode. // The shard being moved must be in read-only mode.
//
// nolint: funlen, gocognit
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
sidList := make([]string, len(prm.shardID)) shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID { for i := range prm.shardID {
sidList[i] = prm.shardID[i].String() shardIDs[i] = prm.shardID[i].String()
} }
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
if err != nil {
return EvacuateShardRes{}, err
}
shardsToEvacuate := make(map[string]*shard.Shard)
for i := range shardIDs {
for j := range shards {
if shards[j].ID().String() == shardIDs[i] {
shardsToEvacuate[shardIDs[i]] = shards[j].Shard
}
}
}
e.log.Info("started shards evacuation", zap.Strings("shard_ids", shardIDs))
var res EvacuateShardRes
for _, shardID := range shardIDs {
if err = e.evacuateShard(shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
return res, err
}
}
e.log.Info("finished shards evacuation", zap.Strings("shard_ids", shardIDs))
return res, nil
}
func (e *StorageEngine) evacuateShard(shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
return err
}
if err = e.evacuateObjects(sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
}
return nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) {
e.mtx.RLock() e.mtx.RLock()
for i := range sidList { defer e.mtx.RUnlock()
sh, ok := e.shards[sidList[i]]
for i := range shardIDs {
sh, ok := e.shards[shardIDs[i]]
if !ok { if !ok {
e.mtx.RUnlock() return nil, nil, errShardNotFound
return EvacuateShardRes{}, errShardNotFound
} }
if !sh.GetMode().ReadOnly() { if !sh.GetMode().ReadOnly() {
e.mtx.RUnlock() return nil, nil, shard.ErrMustBeReadOnly
return EvacuateShardRes{}, shard.ErrMustBeReadOnly
} }
} }
if len(e.shards)-len(sidList) < 1 && prm.handler == nil { if len(e.shards)-len(shardIDs) < 1 && !handlerDefined {
e.mtx.RUnlock() return nil, nil, errMustHaveTwoShards
return EvacuateShardRes{}, errMustHaveTwoShards
} }
e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList))
// We must have all shards, to have correct information about their // We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase. // indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put. // Evacuated shard is skipped during put.
@ -96,51 +151,19 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
pool: e.shardPools[id], pool: e.shardPools[id],
}) })
} }
e.mtx.RUnlock()
weights := make([]float64, 0, len(shards)) weights := make([]float64, 0, len(shards))
for i := range shards { for i := range shards {
weights = append(weights, e.shardWeight(shards[i].Shard)) weights = append(weights, e.shardWeight(shards[i].Shard))
} }
shardMap := make(map[string]*shard.Shard) return shards, weights, nil
for i := range sidList {
for j := range shards {
if shards[j].ID().String() == sidList[i] {
shardMap[sidList[i]] = shards[j].Shard
}
}
} }
var listPrm shard.ListWithCursorPrm func (e *StorageEngine) evacuateObjects(sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
listPrm.WithCount(defaultEvacuateBatchSize) shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
for i := range toEvacuate {
var res EvacuateShardRes addr := toEvacuate[i].Address
mainLoop:
for n := range sidList {
sh := shardMap[sidList[n]]
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
continue mainLoop
}
return res, err
}
// TODO (@fyrchik): #1731 parallelize the loop
lst := listRes.AddressList()
loop:
for i := range lst {
addr := lst[i].Address
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
@ -150,46 +173,47 @@ mainLoop:
if prm.ignoreErrors { if prm.ignoreErrors {
continue continue
} }
return res, err return err
} }
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) if e.tryEvacuateObject(addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate) {
for j := range shards {
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue continue
} }
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object())
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
zap.String("from", sidList[n]),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
continue loop
}
}
if prm.handler == nil { if prm.handler == nil {
// Do not check ignoreErrors flag here because // Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless. // ignoring errors on put make this command kinda useless.
return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
} }
err = prm.handler(addr, getRes.Object()) err = prm.handler(addr, getRes.Object())
if err != nil { if err != nil {
return res, err return err
} }
res.count++ res.count++
} }
return nil
}
c = listRes.Cursor() func (e *StorageEngine) tryEvacuateObject(addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) bool {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
return true
} }
} }
e.log.Info("finished shards evacuation", return false
zap.Strings("shard_ids", sidList))
return res, nil
} }