Do not fail evacuation in case of REP 1 objects #1222
3 changed files with 61 additions and 35 deletions
|
@ -74,8 +74,8 @@ func (s EvacuateScope) TreesOnly() bool {
|
||||||
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
||||||
type EvacuateShardPrm struct {
|
type EvacuateShardPrm struct {
|
||||||
ShardID []*shard.ID
|
ShardID []*shard.ID
|
||||||
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error
|
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) (bool, error)
|
||||||
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error)
|
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (bool, string, error)
|
||||||
IgnoreErrors bool
|
IgnoreErrors bool
|
||||||
Async bool
|
Async bool
|
||||||
Scope EvacuateScope
|
Scope EvacuateScope
|
||||||
|
@ -450,7 +450,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
|
moved, nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
|
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
|
@ -458,18 +458,32 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote,
|
if moved {
|
||||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
|
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote,
|
||||||
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
|
||||||
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
|
||||||
res.trEvacuated.Add(1)
|
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
res.trEvacuated.Add(1)
|
||||||
|
} else if prm.IgnoreErrors {
|
||||||
|
res.trFailed.Add(1)
|
||||||
|
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveTree,
|
||||||
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
|
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
||||||
|
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
} else {
|
||||||
|
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
|
||||||
|
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||||
|
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
||||||
|
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
return fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", contTree.TreeID, contTree.CID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (string, error) {
|
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (bool, string, error) {
|
||||||
if prm.TreeHandler == nil {
|
if prm.TreeHandler == nil {
|
||||||
return "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
return false, "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
||||||
|
@ -668,13 +682,21 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
|
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
err = prm.ObjectsHandler(ctx, addr, getRes.Object())
|
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
res.objEvacuated.Add(1)
|
if moved {
|
||||||
|
res.objEvacuated.Add(1)
|
||||||
|
} else if prm.IgnoreErrors {
|
||||||
|
res.objFailed.Add(1)
|
||||||
|
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("object %s was not replicated", addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,18 +145,18 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
|
|
||||||
errReplication := errors.New("handler error")
|
errReplication := errors.New("handler error")
|
||||||
|
|
||||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
|
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||||
var n uint64
|
var n uint64
|
||||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||||
if n == max {
|
if n == max {
|
||||||
return errReplication
|
return false, errReplication
|
||||||
}
|
}
|
||||||
|
|
||||||
n++
|
n++
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
if addr == objectCore.AddressOf(objects[i]) {
|
if addr == objectCore.AddressOf(objects[i]) {
|
||||||
require.Equal(t, objects[i], obj)
|
require.Equal(t, objects[i], obj)
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
|
require.FailNow(t, "handler was called with an unexpected object: %s", addr)
|
||||||
|
@ -268,13 +268,13 @@ func TestEvacuateCancellation(t *testing.T) {
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[1:2]
|
prm.ShardID = ids[1:2]
|
||||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return false, ctx.Err()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
prm.Scope = EvacuateScopeObjects
|
prm.Scope = EvacuateScopeObjects
|
||||||
|
|
||||||
|
@ -301,14 +301,14 @@ func TestEvacuateSingleProcess(t *testing.T) {
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[1:2]
|
prm.ShardID = ids[1:2]
|
||||||
prm.Scope = EvacuateScopeObjects
|
prm.Scope = EvacuateScopeObjects
|
||||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||||
select {
|
select {
|
||||||
case <-running:
|
case <-running:
|
||||||
default:
|
default:
|
||||||
close(running)
|
close(running)
|
||||||
}
|
}
|
||||||
<-blocker
|
<-blocker
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(context.Background())
|
eg, egCtx := errgroup.WithContext(context.Background())
|
||||||
|
@ -344,14 +344,14 @@ func TestEvacuateObjectsAsync(t *testing.T) {
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[1:2]
|
prm.ShardID = ids[1:2]
|
||||||
prm.Scope = EvacuateScopeObjects
|
prm.Scope = EvacuateScopeObjects
|
||||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||||
select {
|
select {
|
||||||
case <-running:
|
case <-running:
|
||||||
default:
|
default:
|
||||||
close(running)
|
close(running)
|
||||||
}
|
}
|
||||||
<-blocker
|
<-blocker
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
st, err := e.GetEvacuationState(context.Background())
|
st, err := e.GetEvacuationState(context.Background())
|
||||||
|
@ -507,7 +507,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids
|
prm.ShardID = ids
|
||||||
prm.Scope = EvacuateScopeTrees
|
prm.Scope = EvacuateScopeTrees
|
||||||
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (string, error) {
|
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (bool, string, error) {
|
||||||
key := contID.String() + treeID
|
key := contID.String() + treeID
|
||||||
var height uint64
|
var height uint64
|
||||||
for {
|
for {
|
||||||
|
@ -515,7 +515,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
if op.Time == 0 {
|
if op.Time == 0 {
|
||||||
return "", nil
|
return true, "", nil
|
||||||
}
|
}
|
||||||
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
||||||
height = op.Time + 1
|
height = op.Time + 1
|
||||||
|
|
|
@ -57,17 +57,21 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||||
cid, ok := obj.ContainerID()
|
cid, ok := obj.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
// Return nil to prevent situations where a shard can't be evacuated
|
// Return nil to prevent situations where a shard can't be evacuated
|
||||||
// because of a single bad/corrupted object.
|
// because of a single bad/corrupted object.
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes, err := s.getContainerNodes(cid)
|
nodes, err := s.getContainerNodes(cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var res replicatorResult
|
var res replicatorResult
|
||||||
|
@ -80,27 +84,27 @@ func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *obj
|
||||||
s.replicator.HandleReplicationTask(ctx, task, &res)
|
s.replicator.HandleReplicationTask(ctx, task, &res)
|
||||||
|
|
||||||
if res.count == 0 {
|
if res.count == 0 {
|
||||||
return errors.New("object was not replicated")
|
return false, errors.New("object was not replicated")
|
||||||
}
|
}
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {
|
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (bool, string, error) {
|
||||||
nodes, err := s.getContainerNodes(contID)
|
nodes, err := s.getContainerNodes(contID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return false, "", err
|
||||||
}
|
}
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return "", fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", treeID, contID)
|
return false, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
|
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return hex.EncodeToString(node.PublicKey()), nil
|
return true, hex.EncodeToString(node.PublicKey()), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return "", err
|
return false, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
|
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
|
||||||
|
|
Loading…
Add table
Reference in a new issue