forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
1 changed files with 20 additions and 9 deletions
|
@ -21,7 +21,11 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode")
|
var (
|
||||||
|
ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode")
|
||||||
|
|
||||||
|
evacuationOperationLogField = zap.String("operation", "evacuation")
|
||||||
|
)
|
||||||
|
|
||||||
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
||||||
type EvacuateShardPrm struct {
|
type EvacuateShardPrm struct {
|
||||||
|
@ -196,22 +200,28 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
e.evacuateLimiter.Complete(err)
|
e.evacuateLimiter.Complete(err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
|
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField)
|
||||||
|
|
||||||
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
|
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
|
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, shardID := range shardIDs {
|
for _, shardID := range shardIDs {
|
||||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
||||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
|
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
|
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
||||||
|
zap.Strings("shard_ids", shardIDs),
|
||||||
|
evacuationOperationLogField,
|
||||||
|
zap.Uint64("total", res.Total()),
|
||||||
|
zap.Uint64("evacuated", res.Evacuated()),
|
||||||
|
zap.Uint64("failed", res.Failed()),
|
||||||
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,7 +266,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
||||||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
|
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,7 +342,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
res.failed.Add(1)
|
res.failed.Add(1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
|
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -353,7 +363,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
|
|
||||||
err = prm.handler(ctx, addr, getRes.Object())
|
err = prm.handler(ctx, addr, getRes.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
|
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
res.evacuated.Add(1)
|
res.evacuated.Add(1)
|
||||||
|
@ -381,7 +391,8 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
||||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||||
zap.Stringer("from", sh.ID()),
|
zap.Stringer("from", sh.ID()),
|
||||||
zap.Stringer("to", shards[j].ID()),
|
zap.Stringer("to", shards[j].ID()),
|
||||||
zap.Stringer("addr", addr))
|
zap.Stringer("addr", addr),
|
||||||
|
evacuationOperationLogField)
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue