Do not fail evacuation in case of REP 1 objects #1222

Merged
dstepanov-yadro merged 2 commits from dstepanov-yadro/frostfs-node:fix/evacuate_rep1 into master 2024-09-04 19:51:09 +00:00
3 changed files with 61 additions and 35 deletions

View file

@ -74,8 +74,8 @@ func (s EvacuateScope) TreesOnly() bool {
// EvacuateShardPrm represents parameters for the EvacuateShard operation. // EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct { type EvacuateShardPrm struct {
ShardID []*shard.ID ShardID []*shard.ID
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error) TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
IgnoreErrors bool IgnoreErrors bool
Async bool Async bool
Scope EvacuateScope Scope EvacuateScope
@ -450,7 +450,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
continue continue
} }
nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm) moved, nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
if err != nil { if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree, e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
@ -458,18 +458,32 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err return err
} }
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote, if moved {
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID), e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote,
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK), zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
res.trEvacuated.Add(1) evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.trFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", contTree.TreeID, contTree.CID)
}
} }
return nil return nil
} }
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (string, error) { func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
if prm.TreeHandler == nil { if prm.TreeHandler == nil {
return "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID()) return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
} }
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh) return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
@ -668,13 +682,21 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
} }
err = prm.ObjectsHandler(ctx, addr, getRes.Object()) moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil { if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err return err
} }
res.objEvacuated.Add(1) if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
}
} }
return nil return nil
} }

View file

@ -145,18 +145,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
errReplication := errors.New("handler error") errReplication := errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error { acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n uint64 var n uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error { return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
if n == max { if n == max {
return errReplication return false, errReplication
} }
n++ n++
for i := range objects { for i := range objects {
if addr == objectCore.AddressOf(objects[i]) { if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj) require.Equal(t, objects[i], obj)
return nil return true, nil
} }
} }
require.FailNow(t, "handler was called with an unexpected object: %s", addr) require.FailNow(t, "handler was called with an unexpected object: %s", addr)
@ -268,13 +268,13 @@ func TestEvacuateCancellation(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return false, ctx.Err()
default: default:
} }
return nil return true, nil
} }
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
@ -301,14 +301,14 @@ func TestEvacuateSingleProcess(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
select { select {
case <-running: case <-running:
default: default:
close(running) close(running)
} }
<-blocker <-blocker
return nil return true, nil
} }
eg, egCtx := errgroup.WithContext(context.Background()) eg, egCtx := errgroup.WithContext(context.Background())
@ -344,14 +344,14 @@ func TestEvacuateObjectsAsync(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
select { select {
case <-running: case <-running:
default: default:
close(running) close(running)
} }
<-blocker <-blocker
return nil return true, nil
} }
st, err := e.GetEvacuationState(context.Background()) st, err := e.GetEvacuationState(context.Background())
@ -507,7 +507,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids prm.ShardID = ids
prm.Scope = EvacuateScopeTrees prm.Scope = EvacuateScopeTrees
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (string, error) { prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (bool, string, error) {
key := contID.String() + treeID key := contID.String() + treeID
var height uint64 var height uint64
for { for {
@ -515,7 +515,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
if op.Time == 0 { if op.Time == 0 {
return "", nil return true, "", nil
} }
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
height = op.Time + 1 height = op.Time + 1

View file

@ -57,17 +57,21 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
return resp, nil return resp, nil
} }
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
cid, ok := obj.ContainerID() cid, ok := obj.ContainerID()
if !ok { if !ok {
// Return nil to prevent situations where a shard can't be evacuated // Return nil to prevent situations where a shard can't be evacuated
// because of a single bad/corrupted object. // because of a single bad/corrupted object.
return nil return false, nil
} }
nodes, err := s.getContainerNodes(cid) nodes, err := s.getContainerNodes(cid)
if err != nil { if err != nil {
return err return false, err
}
if len(nodes) == 0 {
return false, nil
} }
var res replicatorResult var res replicatorResult
@ -80,27 +84,27 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
s.replicator.HandleReplicationTask(ctx, task, &res) s.replicator.HandleReplicationTask(ctx, task, &res)
if res.count == 0 { if res.count == 0 {
return errors.New("object was not replicated") return false, errors.New("object was not replicated")
} }
return nil return true, nil
} }
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) { func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) {
nodes, err := s.getContainerNodes(contID) nodes, err := s.getContainerNodes(contID)
if err != nil { if err != nil {
return "", err return false, "", err
} }
if len(nodes) == 0 { if len(nodes) == 0 {
return "", fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", treeID, contID) return false, "", nil
} }
for _, node := range nodes { for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node) err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil { if err == nil {
return hex.EncodeToString(node.PublicKey()), nil return true, hex.EncodeToString(node.PublicKey()), nil
} }
} }
return "", err return false, "", err
} }
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error { func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {