forked from TrueCloudLab/frostfs-node
[#1731] engine: Allow to use user handler for evacuated objects
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
93ae3f0b19
commit
a49137349b
2 changed files with 102 additions and 20 deletions
|
@ -8,12 +8,15 @@ import (
|
|||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
||||
type EvacuateShardPrm struct {
|
||||
shardID *shard.ID
|
||||
handler func(oid.Address, *objectSDK.Object) error
|
||||
ignoreErrors bool
|
||||
}
|
||||
|
||||
|
@ -32,7 +35,13 @@ func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
|
|||
p.ignoreErrors = ignore
|
||||
}
|
||||
|
||||
// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
|
||||
func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) {
|
||||
p.handler = f
|
||||
}
|
||||
|
||||
// Count returns amount of evacuated objects.
|
||||
// Objects for which handler returned no error are also assumed evacuated.
|
||||
func (p EvacuateShardRes) Count() int {
|
||||
return p.count
|
||||
}
|
||||
|
@ -58,7 +67,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
|
|||
return EvacuateShardRes{}, errShardNotFound
|
||||
}
|
||||
|
||||
if len(e.shards) < 2 {
|
||||
if len(e.shards) < 2 && prm.handler == nil {
|
||||
e.mtx.RUnlock()
|
||||
return EvacuateShardRes{}, errMustHaveTwoShards
|
||||
}
|
||||
|
@ -138,15 +147,19 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
|
|||
}
|
||||
}
|
||||
|
||||
// TODO (@fyrchik): #1731 try replicating to another node.
|
||||
// The suggestion is to have prm.handler which is called
|
||||
// if a Put has failed.
|
||||
|
||||
if prm.handler == nil {
|
||||
// Do not check ignoreErrors flag here because
|
||||
// ignoring errors on put make this command kinda useless.
|
||||
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
|
||||
}
|
||||
|
||||
err = prm.handler(lst[i], getRes.Object())
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
res.count++
|
||||
}
|
||||
|
||||
c = listRes.Cursor()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -15,11 +16,12 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestEvacuateShard(t *testing.T) {
|
||||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
|
@ -28,19 +30,16 @@ func TestEvacuateShard(t *testing.T) {
|
|||
WithLogger(zaptest.NewLogger(t)),
|
||||
WithShardPoolSize(1))
|
||||
|
||||
var ids [3]*shard.ID
|
||||
var fsTree *fstree.FSTree
|
||||
ids := make([]*shard.ID, shardNum)
|
||||
|
||||
for i := range ids {
|
||||
fsTree = fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
fstree.WithDepth(1))
|
||||
|
||||
ids[i], err = e.AddShard(
|
||||
shard.WithLogger(zaptest.NewLogger(t)),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages([]blobstor.SubStorage{{
|
||||
Storage: fsTree,
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
fstree.WithDepth(1)),
|
||||
}})),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
|
@ -52,10 +51,6 @@ func TestEvacuateShard(t *testing.T) {
|
|||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
const objPerShard = 3
|
||||
|
||||
evacuateShardID := ids[2].String()
|
||||
|
||||
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
||||
for i := 0; ; i++ {
|
||||
objects = append(objects, generateObjectWithCID(t, cidtest.ID()))
|
||||
|
@ -66,12 +61,21 @@ func TestEvacuateShard(t *testing.T) {
|
|||
_, err := e.Put(putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := e.shards[evacuateShardID].List()
|
||||
res, err := e.shards[ids[len(ids)-1].String()].List()
|
||||
require.NoError(t, err)
|
||||
if len(res.AddressList()) == objPerShard {
|
||||
break
|
||||
}
|
||||
}
|
||||
return e, ids, objects
|
||||
}
|
||||
|
||||
func TestEvacuateShard(t *testing.T) {
|
||||
const objPerShard = 3
|
||||
|
||||
e, ids, objects := newEngineEvacuate(t, 3, objPerShard)
|
||||
|
||||
evacuateShardID := ids[2].String()
|
||||
|
||||
checkHasObjects := func(t *testing.T) {
|
||||
for i := range objects {
|
||||
|
@ -120,3 +124,68 @@ func TestEvacuateShard(t *testing.T) {
|
|||
|
||||
checkHasObjects(t)
|
||||
}
|
||||
|
||||
func TestEvacuateNetwork(t *testing.T) {
|
||||
var errReplication = errors.New("handler error")
|
||||
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max int) func(oid.Address, *objectSDK.Object) error {
|
||||
var n int
|
||||
return func(addr oid.Address, obj *objectSDK.Object) error {
|
||||
if n == max {
|
||||
return errReplication
|
||||
}
|
||||
|
||||
n++
|
||||
for i := range objects {
|
||||
if addr == objectCore.AddressOf(objects[i]) {
|
||||
require.Equal(t, objects[i], obj)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("single shard", func(t *testing.T) {
|
||||
e, ids, objects := newEngineEvacuate(t, 1, 3)
|
||||
evacuateShardID := ids[0].String()
|
||||
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.shardID = ids[0]
|
||||
|
||||
res, err := e.Evacuate(prm)
|
||||
require.ErrorIs(t, err, errMustHaveTwoShards)
|
||||
require.Equal(t, 0, res.Count())
|
||||
|
||||
prm.handler = acceptOneOf(objects, 2)
|
||||
|
||||
res, err = e.Evacuate(prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, 2, res.Count())
|
||||
})
|
||||
t.Run("multiple shards", func(t *testing.T) {
|
||||
e, ids, objects := newEngineEvacuate(t, 2, 3)
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.shardID = ids[1]
|
||||
prm.handler = acceptOneOf(objects, 2)
|
||||
|
||||
res, err := e.Evacuate(prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, 2, res.Count())
|
||||
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
prm.handler = acceptOneOf(objects, 3)
|
||||
|
||||
res, err := e.Evacuate(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, res.Count())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue