forked from TrueCloudLab/frostfs-node
[#1222] engine: Fix object evacuation
Do not fail evacuation if it unable to evacuate object to other node. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
bbe95dac8b
commit
2bac82cd6f
3 changed files with 31 additions and 19 deletions
|
@ -74,7 +74,7 @@ func (s EvacuateScope) TreesOnly() bool {
|
|||
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
||||
type EvacuateShardPrm struct {
|
||||
ShardID []*shard.ID
|
||||
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error
|
||||
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
|
||||
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error)
|
||||
IgnoreErrors bool
|
||||
Async bool
|
||||
|
@ -668,13 +668,21 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
|
||||
}
|
||||
|
||||
err = prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
res.objEvacuated.Add(1)
|
||||
if moved {
|
||||
res.objEvacuated.Add(1)
|
||||
} else if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
return fmt.Errorf("object %s was not replicated", addr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -145,18 +145,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
|||
|
||||
errReplication := errors.New("handler error")
|
||||
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||
var n uint64
|
||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||
if n == max {
|
||||
return errReplication
|
||||
return false, errReplication
|
||||
}
|
||||
|
||||
n++
|
||||
for i := range objects {
|
||||
if addr == objectCore.AddressOf(objects[i]) {
|
||||
require.Equal(t, objects[i], obj)
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
|
||||
|
@ -268,13 +268,13 @@ func TestEvacuateCancellation(t *testing.T) {
|
|||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
prm.Scope = EvacuateScopeObjects
|
||||
|
||||
|
@ -301,14 +301,14 @@ func TestEvacuateSingleProcess(t *testing.T) {
|
|||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
prm.Scope = EvacuateScopeObjects
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||
select {
|
||||
case <-running:
|
||||
default:
|
||||
close(running)
|
||||
}
|
||||
<-blocker
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
|
@ -344,14 +344,14 @@ func TestEvacuateObjectsAsync(t *testing.T) {
|
|||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
prm.Scope = EvacuateScopeObjects
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||
select {
|
||||
case <-running:
|
||||
default:
|
||||
close(running)
|
||||
}
|
||||
<-blocker
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
st, err := e.GetEvacuationState(context.Background())
|
||||
|
|
|
@ -57,17 +57,21 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
||||
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||
cid, ok := obj.ContainerID()
|
||||
if !ok {
|
||||
// Return nil to prevent situations where a shard can't be evacuated
|
||||
// because of a single bad/corrupted object.
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
nodes, err := s.getContainerNodes(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var res replicatorResult
|
||||
|
@ -80,9 +84,9 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
|
|||
s.replicator.HandleReplicationTask(ctx, task, &res)
|
||||
|
||||
if res.count == 0 {
|
||||
return errors.New("object was not replicated")
|
||||
return false, errors.New("object was not replicated")
|
||||
}
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {
|
||||
|
|
Loading…
Reference in a new issue