[#1731] engine: Add Evacuate command
Make it possible to move all data from 1 shard to other shards. Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
7377979e12
commit
a51b76056e
2 changed files with 265 additions and 0 deletions
150
pkg/local_object_storage/engine/evacuate.go
Normal file
150
pkg/local_object_storage/engine/evacuate.go
Normal file
|
@ -0,0 +1,150 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/hrw"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
||||
type EvacuateShardPrm struct {
|
||||
shardID *shard.ID
|
||||
ignoreErrors bool
|
||||
}
|
||||
|
||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||
type EvacuateShardRes struct {
|
||||
count int
|
||||
}
|
||||
|
||||
// WithShardID sets shard ID.
|
||||
func (p *EvacuateShardPrm) WithShardID(id *shard.ID) {
|
||||
p.shardID = id
|
||||
}
|
||||
|
||||
// WithIgnoreErrors sets flag to ignore errors.
|
||||
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
|
||||
p.ignoreErrors = ignore
|
||||
}
|
||||
|
||||
// Count returns amount of evacuated objects.
|
||||
func (p EvacuateShardRes) Count() int {
|
||||
return p.count
|
||||
}
|
||||
|
||||
const defaultEvacuateBatchSize = 100
|
||||
|
||||
type pooledShard struct {
|
||||
hashedShard
|
||||
pool util.WorkerPool
|
||||
}
|
||||
|
||||
var errMustHaveTwoShards = errors.New("amount of shards must be > 2")
|
||||
|
||||
// Evacuate moves data from one shard to the others.
|
||||
// The shard being moved must be in read-only mode.
|
||||
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
|
||||
sid := prm.shardID.String()
|
||||
|
||||
e.mtx.RLock()
|
||||
sh, ok := e.shards[sid]
|
||||
if !ok {
|
||||
e.mtx.RUnlock()
|
||||
return EvacuateShardRes{}, errShardNotFound
|
||||
}
|
||||
|
||||
if len(e.shards) < 2 {
|
||||
e.mtx.RUnlock()
|
||||
return EvacuateShardRes{}, errMustHaveTwoShards
|
||||
}
|
||||
|
||||
if !sh.GetMode().ReadOnly() {
|
||||
e.mtx.RUnlock()
|
||||
return EvacuateShardRes{}, shard.ErrMustBeReadOnly
|
||||
}
|
||||
|
||||
// We must have all shards, to have correct information about their
|
||||
// indexes in a sorted slice and set appropriate marks in the metabase.
|
||||
// Evacuated shard is skipped during put.
|
||||
shards := make([]pooledShard, 0, len(e.shards))
|
||||
for id := range e.shards {
|
||||
shards = append(shards, pooledShard{
|
||||
hashedShard: hashedShard(e.shards[id]),
|
||||
pool: e.shardPools[id],
|
||||
})
|
||||
}
|
||||
e.mtx.RUnlock()
|
||||
|
||||
weights := make([]float64, 0, len(shards))
|
||||
for i := range shards {
|
||||
weights = append(weights, e.shardWeight(shards[i].Shard))
|
||||
}
|
||||
|
||||
var listPrm shard.ListWithCursorPrm
|
||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
var c *meta.Cursor
|
||||
var res EvacuateShardRes
|
||||
for {
|
||||
listPrm.WithCursor(c)
|
||||
|
||||
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
|
||||
// because ListWithCursor works only with the metabase.
|
||||
listRes, err := sh.Shard.ListWithCursor(listPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, meta.ErrEndOfListing) {
|
||||
return res, nil
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
// TODO (@fyrchik): #1731 parallelize the loop
|
||||
lst := listRes.AddressList()
|
||||
|
||||
loop:
|
||||
for i := range lst {
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(lst[i])
|
||||
|
||||
getRes, err := sh.Get(getPrm)
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
continue
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString())))
|
||||
for j := range shards {
|
||||
if shards[j].ID().String() == sid {
|
||||
continue
|
||||
}
|
||||
ok := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object())
|
||||
if ok {
|
||||
e.log.Debug("object is moved to another shard",
|
||||
zap.String("from", sid),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", lst[i]))
|
||||
|
||||
res.count++
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (@fyrchik): #1731 try replicating to another node.
|
||||
// The suggestion is to have prm.handler which is called
|
||||
// if a Put has failed.
|
||||
|
||||
// Do not check ignoreErrors flag here because
|
||||
// ignoring errors on put make this command kinda useless.
|
||||
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
|
||||
}
|
||||
|
||||
c = listRes.Cursor()
|
||||
}
|
||||
}
|
115
pkg/local_object_storage/engine/evacuate_test.go
Normal file
115
pkg/local_object_storage/engine/evacuate_test.go
Normal file
|
@ -0,0 +1,115 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestEvacuateShard(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
|
||||
e := New(
|
||||
WithLogger(zaptest.NewLogger(t)),
|
||||
WithShardPoolSize(1))
|
||||
|
||||
var ids [3]*shard.ID
|
||||
var fsTree *fstree.FSTree
|
||||
|
||||
for i := range ids {
|
||||
fsTree = fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
fstree.WithDepth(1))
|
||||
|
||||
ids[i], err = e.AddShard(
|
||||
shard.WithLogger(zaptest.NewLogger(t)),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages([]blobstor.SubStorage{{
|
||||
Storage: fsTree,
|
||||
}})),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{}),
|
||||
))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
const objPerShard = 3
|
||||
|
||||
evacuateShardID := ids[2].String()
|
||||
|
||||
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
||||
for i := 0; ; i++ {
|
||||
objects = append(objects, generateObjectWithCID(t, cidtest.ID()))
|
||||
|
||||
var putPrm PutPrm
|
||||
putPrm.WithObject(objects[i])
|
||||
|
||||
_, err := e.Put(putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := e.shards[evacuateShardID].List()
|
||||
require.NoError(t, err)
|
||||
if len(res.AddressList()) == objPerShard {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
checkHasObjects := func(t *testing.T) {
|
||||
for i := range objects {
|
||||
var prm GetPrm
|
||||
prm.WithAddress(objectCore.AddressOf(objects[i]))
|
||||
|
||||
_, err := e.Get(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
checkHasObjects(t)
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.WithShardID(ids[2])
|
||||
|
||||
t.Run("must be read-only", func(t *testing.T) {
|
||||
res, err := e.Evacuate(prm)
|
||||
require.ErrorIs(t, err, shard.ErrMustBeReadOnly)
|
||||
require.Equal(t, 0, res.Count())
|
||||
})
|
||||
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
||||
|
||||
res, err := e.Evacuate(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objPerShard, res.count)
|
||||
|
||||
// We check that all objects are available both before and after shard removal.
|
||||
// First case is a real-world use-case. It ensures that an object can be put in presense
|
||||
// of all metabase checks/marks.
|
||||
// Second case ensures that all objects are indeed moved and available.
|
||||
checkHasObjects(t)
|
||||
|
||||
e.mtx.Lock()
|
||||
delete(e.shards, evacuateShardID)
|
||||
delete(e.shardPools, evacuateShardID)
|
||||
e.mtx.Unlock()
|
||||
|
||||
checkHasObjects(t)
|
||||
}
|
Loading…
Reference in a new issue