frostfs-node/pkg/local_object_storage/engine/evacuate.go

242 lines
6.8 KiB
Go

package engine
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"go.uber.org/zap"
)
var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode")
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
shardID []*shard.ID
handler func(context.Context, oid.Address, *objectSDK.Object) error
ignoreErrors bool
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
count int
}
// WithShardIDList sets shard ID.
func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) {
p.shardID = id
}
// WithIgnoreErrors sets flag to ignore errors.
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, *objectSDK.Object) error) {
p.handler = f
}
// Count returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
return p.count
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
}
var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
}
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
if err != nil {
return EvacuateShardRes{}, err
}
shardsToEvacuate := make(map[string]*shard.Shard)
for i := range shardIDs {
for j := range shards {
if shards[j].ID().String() == shardIDs[i] {
shardsToEvacuate[shardIDs[i]] = shards[j].Shard
}
}
}
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
var res EvacuateShardRes
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
return res, err
}
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
return res, nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
}
return nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) {
e.mtx.RLock()
defer e.mtx.RUnlock()
for i := range shardIDs {
sh, ok := e.shards[shardIDs[i]]
if !ok {
return nil, nil, errShardNotFound
}
if !sh.GetMode().ReadOnly() {
return nil, nil, ErrMustBeReadOnly
}
}
if len(e.shards)-len(shardIDs) < 1 && !handlerDefined {
return nil, nil, errMustHaveTwoShards
}
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]),
pool: e.shardPools[id],
})
}
weights := make([]float64, 0, len(shards))
for i := range shards {
weights = append(weights, e.shardWeight(shards[i].Shard))
}
return shards, weights, nil
}
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
for i := range toEvacuate {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
addr := toEvacuate[i].Address
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
continue
}
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
if err != nil {
return err
}
if evacuatedLocal {
continue
}
if prm.handler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
}
err = prm.handler(ctx, addr, getRes.Object())
if err != nil {
return err
}
res.count++
}
return nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
return true, nil
}
}
return false, nil
}