[#947] controlSvc: Return tree evacuation stat

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-02-05 17:48:43 +03:00
parent b3f3505ada
commit 15d853ea22
9 changed files with 178 additions and 105 deletions

View file

@ -59,7 +59,7 @@ func (s EvacuateScope) String() string {
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
ShardID []*shard.ID
Handler func(context.Context, oid.Address, *objectSDK.Object) error
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error
IgnoreErrors bool
Async bool
Scope EvacuateScope
@ -67,53 +67,84 @@ type EvacuateShardPrm struct {
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
evacuated *atomic.Uint64
total *atomic.Uint64
failed *atomic.Uint64
skipped *atomic.Uint64
objEvacuated *atomic.Uint64
objTotal *atomic.Uint64
objFailed *atomic.Uint64
objSkipped *atomic.Uint64
trEvacuated *atomic.Uint64
trTotal *atomic.Uint64
trFailed *atomic.Uint64
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
evacuated: new(atomic.Uint64),
total: new(atomic.Uint64),
failed: new(atomic.Uint64),
skipped: new(atomic.Uint64),
objEvacuated: new(atomic.Uint64),
objTotal: new(atomic.Uint64),
objFailed: new(atomic.Uint64),
objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
}
}
// Evacuated returns amount of evacuated objects.
// ObjectsEvacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p *EvacuateShardRes) Evacuated() uint64 {
func (p *EvacuateShardRes) ObjectsEvacuated() uint64 {
if p == nil {
return 0
}
return p.evacuated.Load()
return p.objEvacuated.Load()
}
// Total returns total count objects to evacuate.
func (p *EvacuateShardRes) Total() uint64 {
// ObjectsTotal returns total count objects to evacuate.
func (p *EvacuateShardRes) ObjectsTotal() uint64 {
if p == nil {
return 0
}
return p.total.Load()
return p.objTotal.Load()
}
// Failed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) Failed() uint64 {
// ObjectsFailed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) ObjectsFailed() uint64 {
if p == nil {
return 0
}
return p.failed.Load()
return p.objFailed.Load()
}
// Skipped returns count of skipped objects.
func (p *EvacuateShardRes) Skipped() uint64 {
// ObjectsSkipped returns count of skipped objects.
func (p *EvacuateShardRes) ObjectsSkipped() uint64 {
if p == nil {
return 0
}
return p.skipped.Load()
return p.objSkipped.Load()
}
// TreesEvacuated returns amount of evacuated trees.
func (p *EvacuateShardRes) TreesEvacuated() uint64 {
if p == nil {
return 0
}
return p.trEvacuated.Load()
}
// TreesTotal returns total count trees to evacuate.
func (p *EvacuateShardRes) TreesTotal() uint64 {
if p == nil {
return 0
}
return p.trTotal.Load()
}
// TreesFailed returns count of failed trees to evacuate.
func (p *EvacuateShardRes) TreesFailed() uint64 {
if p == nil {
return 0
}
return p.trFailed.Load()
}
// DeepCopy returns deep copy of result instance.
@ -123,16 +154,22 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
}
res := &EvacuateShardRes{
evacuated: new(atomic.Uint64),
total: new(atomic.Uint64),
failed: new(atomic.Uint64),
skipped: new(atomic.Uint64),
objEvacuated: new(atomic.Uint64),
objTotal: new(atomic.Uint64),
objFailed: new(atomic.Uint64),
objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
}
res.evacuated.Store(p.evacuated.Load())
res.total.Store(p.total.Load())
res.failed.Store(p.failed.Load())
res.skipped.Store(p.skipped.Load())
res.objEvacuated.Store(p.objEvacuated.Load())
res.objTotal.Store(p.objTotal.Load())
res.objFailed.Store(p.objFailed.Load())
res.objSkipped.Store(p.objSkipped.Load())
res.trTotal.Store(p.trTotal.Load())
res.trEvacuated.Store(p.trEvacuated.Load())
res.trFailed.Store(p.trFailed.Load())
return res
}
@ -168,7 +205,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
))
defer span.End()
shards, weights, err := e.getActualShards(shardIDs, prm.Handler != nil)
shards, weights, err := e.getActualShards(shardIDs, prm)
if err != nil {
return nil, err
}
@ -216,6 +253,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
))
defer func() {
@ -224,18 +262,19 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
}()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField)
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
}
@ -243,10 +282,13 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs),
evacuationOperationLogField,
zap.Uint64("total", res.Total()),
zap.Uint64("evacuated", res.Evacuated()),
zap.Uint64("failed", res.Failed()),
zap.Uint64("skipped", res.Skipped()),
zap.Uint64("total_objects", res.ObjectsTotal()),
zap.Uint64("evacuated_objects", res.ObjectsEvacuated()),
zap.Uint64("failed_objects", res.ObjectsFailed()),
zap.Uint64("skipped_objects", res.ObjectsSkipped()),
zap.Uint64("total_trees", res.TreesTotal()),
zap.Uint64("evacuated_trees", res.TreesEvacuated()),
zap.Uint64("failed_trees", res.TreesFailed()),
)
return nil
}
@ -263,7 +305,7 @@ func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacua
}
return err
}
res.total.Add(cnt)
res.objTotal.Add(cnt)
}
return nil
}
@ -307,7 +349,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) {
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) {
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -322,7 +364,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
}
}
if len(e.shards)-len(shardIDs) < 1 && !handlerDefined {
if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil {
return nil, nil, errMustHaveTwoShards
}
@ -368,7 +410,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.failed.Add(1)
res.objFailed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
@ -385,19 +427,19 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
continue
}
if prm.Handler == nil {
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
}
err = prm.Handler(ctx, addr, getRes.Object())
err = prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
res.evacuated.Add(1)
res.objEvacuated.Add(1)
}
return nil
}
@ -418,7 +460,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
}
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
case putToShardSuccess:
res.evacuated.Add(1)
res.objEvacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
@ -427,7 +469,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return true, nil
case putToShardExists, putToShardRemoved:
res.skipped.Add(1)
res.objSkipped.Add(1)
return true, nil
default:
continue

View file

@ -34,32 +34,53 @@ func (s *EvacuationState) ShardIDs() []string {
return s.shardIDs
}
func (s *EvacuationState) Evacuated() uint64 {
func (s *EvacuationState) ObjectsEvacuated() uint64 {
if s == nil {
return 0
}
return s.result.Evacuated()
return s.result.ObjectsEvacuated()
}
func (s *EvacuationState) Total() uint64 {
func (s *EvacuationState) ObjectsTotal() uint64 {
if s == nil {
return 0
}
return s.result.Total()
return s.result.ObjectsTotal()
}
func (s *EvacuationState) Failed() uint64 {
func (s *EvacuationState) ObjectsFailed() uint64 {
if s == nil {
return 0
}
return s.result.Failed()
return s.result.ObjectsFailed()
}
func (s *EvacuationState) Skipped() uint64 {
func (s *EvacuationState) ObjectsSkipped() uint64 {
if s == nil {
return 0
}
return s.result.Skipped()
return s.result.ObjectsSkipped()
}
func (s *EvacuationState) TreesEvacuated() uint64 {
if s == nil {
return 0
}
return s.result.TreesEvacuated()
}
func (s *EvacuationState) TreesTotal() uint64 {
if s == nil {
return 0
}
return s.result.TreesTotal()
}
func (s *EvacuationState) TreesFailed() uint64 {
if s == nil {
return 0
}
return s.result.TreesFailed()
}
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {

View file

@ -107,14 +107,14 @@ func TestEvacuateShard(t *testing.T) {
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.Evacuated())
require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
@ -125,7 +125,7 @@ func TestEvacuateShard(t *testing.T) {
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
checkHasObjects(t)
@ -177,13 +177,13 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.Handler = acceptOneOf(objects, 2)
prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated())
require.Equal(t, uint64(2), res.ObjectsEvacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
@ -197,18 +197,18 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
prm.Handler = acceptOneOf(objects, 2)
prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated())
require.Equal(t, uint64(2), res.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.Handler = acceptOneOf(objects, 3)
prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(3), res.Evacuated())
require.Equal(t, uint64(3), res.ObjectsEvacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -234,18 +234,18 @@ func TestEvacuateNetwork(t *testing.T) {
var prm EvacuateShardPrm
prm.ShardID = evacuateIDs
prm.Handler = acceptOneOf(objects, totalCount-1)
prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Evacuated())
require.Equal(t, totalCount-1, res.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) {
prm.Handler = acceptOneOf(objects, totalCount)
prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Evacuated())
require.Equal(t, totalCount, res.ObjectsEvacuated())
})
})
}
@ -262,7 +262,7 @@ func TestEvacuateCancellation(t *testing.T) {
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -276,7 +276,7 @@ func TestEvacuateCancellation(t *testing.T) {
res, err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}
func TestEvacuateSingleProcess(t *testing.T) {
@ -293,7 +293,7 @@ func TestEvacuateSingleProcess(t *testing.T) {
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
@ -307,14 +307,14 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.Evacuated())
require.Equal(t, uint64(0), res.ObjectsEvacuated())
close(blocker)
return nil
})
@ -335,7 +335,7 @@ func TestEvacuateAsync(t *testing.T) {
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
@ -348,7 +348,7 @@ func TestEvacuateAsync(t *testing.T) {
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
@ -358,7 +358,7 @@ func TestEvacuateAsync(t *testing.T) {
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil
})
@ -367,7 +367,7 @@ func TestEvacuateAsync(t *testing.T) {
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get running state failed")
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid running count")
require.NotNil(t, st.StartedAt(), "invalid running started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
expectedShardIDs := make([]string, 0, 2)
@ -385,7 +385,7 @@ func TestEvacuateAsync(t *testing.T) {
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")

View file

@ -48,14 +48,17 @@ func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuation
return &control.GetShardEvacuationStatusResponse{
Body: &control.GetShardEvacuationStatusResponse_Body{
Shard_ID: shardIDs,
Evacuated: state.Evacuated(),
Total: state.Total(),
Failed: state.Failed(),
EvacuatedObjects: state.ObjectsEvacuated(),
TotalObjects: state.ObjectsTotal(),
FailedObjects: state.ObjectsFailed(),
Status: evacStatus,
StartedAt: startedAt,
Duration: duration,
ErrorMessage: state.ErrorMessage(),
Skipped: state.Skipped(),
SkippedObjects: state.ObjectsSkipped(),
TotalTrees: state.TreesTotal(),
EvacuatedTrees: state.TreesEvacuated(),
FailedTrees: state.TreesFailed(),
},
}, nil
}

View file

@ -28,7 +28,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
Handler: s.replicate,
ObjectsHandler: s.replicate,
Scope: engine.EvacuateScopeObjects,
}
@ -39,7 +39,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.Evacuated()),
Count: uint32(res.ObjectsEvacuated()),
},
}

View file

@ -24,7 +24,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
Handler: s.replicate,
ObjectsHandler: s.replicate,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
}

Binary file not shown.

View file

@ -394,11 +394,11 @@ message GetShardEvacuationStatusResponse {
}
// Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion.
uint64 total = 1;
uint64 total_objects = 1;
// Evacuated objects count.
uint64 evacuated = 2;
uint64 evacuated_objects = 2;
// Failed objects count.
uint64 failed = 3;
uint64 failed_objects = 3;
// Shard IDs.
repeated bytes shard_ID = 4;
@ -412,7 +412,14 @@ message GetShardEvacuationStatusResponse {
string error_message = 8;
// Skipped objects count.
uint64 skipped = 9;
uint64 skipped_objects = 9;
// Total trees to evacuate count.
uint64 total_trees = 10;
// Evacuated trees count.
uint64 evacuated_trees = 11;
// Failed trees count.
uint64 failed_trees = 12;
}
Body body = 1;

Binary file not shown.