Compare commits
3 commits
master
...
fix/evacua
Author | SHA1 | Date | |
---|---|---|---|
90a90c2820 | |||
9b51843aa5 | |||
01d9ed66e1 |
9 changed files with 34 additions and 6 deletions
|
@ -220,12 +220,12 @@ func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatu
|
|||
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING ||
|
||||
resp.GetBody().GetDuration() == nil ||
|
||||
resp.GetBody().GetTotal() == 0 ||
|
||||
resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed() == 0 {
|
||||
resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed()+resp.Body.GetSkipped() == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds())
|
||||
evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed())
|
||||
evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed() + resp.GetBody().GetSkipped())
|
||||
avgObjEvacuationTimeSeconds := durationSeconds / evacuated
|
||||
objectsLeft := float64(resp.GetBody().GetTotal()) - evacuated
|
||||
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
|
||||
|
@ -285,10 +285,11 @@ func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusR
|
|||
}
|
||||
|
||||
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
|
||||
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d.",
|
||||
resp.GetBody().GetEvacuated(),
|
||||
resp.Body.GetTotal(),
|
||||
resp.Body.GetFailed()))
|
||||
resp.GetBody().GetTotal(),
|
||||
resp.GetBody().GetFailed(),
|
||||
resp.GetBody().GetSkipped()))
|
||||
}
|
||||
|
||||
func initControlEvacuationShardCmd() {
|
||||
|
|
|
@ -41,6 +41,7 @@ type EvacuateShardRes struct {
|
|||
evacuated *atomic.Uint64
|
||||
total *atomic.Uint64
|
||||
failed *atomic.Uint64
|
||||
skipped *atomic.Uint64
|
||||
}
|
||||
|
||||
// NewEvacuateShardRes creates new EvacuateShardRes instance.
|
||||
|
@ -49,6 +50,7 @@ func NewEvacuateShardRes() *EvacuateShardRes {
|
|||
evacuated: new(atomic.Uint64),
|
||||
total: new(atomic.Uint64),
|
||||
failed: new(atomic.Uint64),
|
||||
skipped: new(atomic.Uint64),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,6 +99,14 @@ func (p *EvacuateShardRes) Failed() uint64 {
|
|||
return p.failed.Load()
|
||||
}
|
||||
|
||||
// Skipped returns count of skipped objects.
|
||||
func (p *EvacuateShardRes) Skipped() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.skipped.Load()
|
||||
}
|
||||
|
||||
// DeepCopy returns deep copy of result instance.
|
||||
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||
if p == nil {
|
||||
|
@ -107,11 +117,13 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
|||
evacuated: new(atomic.Uint64),
|
||||
total: new(atomic.Uint64),
|
||||
failed: new(atomic.Uint64),
|
||||
skipped: new(atomic.Uint64),
|
||||
}
|
||||
|
||||
res.evacuated.Store(p.evacuated.Load())
|
||||
res.total.Store(p.total.Load())
|
||||
res.failed.Store(p.failed.Load())
|
||||
res.skipped.Store(p.skipped.Load())
|
||||
return res
|
||||
}
|
||||
|
||||
|
@ -224,6 +236,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
|||
zap.Uint64("total", res.Total()),
|
||||
zap.Uint64("evacuated", res.Evacuated()),
|
||||
zap.Uint64("failed", res.Failed()),
|
||||
zap.Uint64("skipped", res.Skipped()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
@ -404,6 +417,9 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
if exists {
|
||||
res.skipped.Add(1)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,13 @@ func (s *EvacuationState) Failed() uint64 {
|
|||
return s.result.Failed()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Skipped() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Skipped()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
|
||||
if s == nil {
|
||||
return EvacuateProcessStateUndefined
|
||||
|
|
|
@ -55,6 +55,7 @@ func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuation
|
|||
StartedAt: startedAt,
|
||||
Duration: duration,
|
||||
ErrorMessage: state.ErrorMessage(),
|
||||
Skipped: state.Skipped(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -362,7 +362,7 @@ message GetShardEvacuationStatusResponse {
|
|||
int64 seconds = 1;
|
||||
}
|
||||
|
||||
// Total objects to evacuate count. The value is approximate, so evacuated + failed == total is not guaranteed after completion.
|
||||
// Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion.
|
||||
uint64 total = 1;
|
||||
// Evacuated objects count.
|
||||
uint64 evacuated = 2;
|
||||
|
@ -379,6 +379,9 @@ message GetShardEvacuationStatusResponse {
|
|||
UnixTimestamp started_at = 7;
|
||||
// Error message if evacuation failed.
|
||||
string error_message = 8;
|
||||
|
||||
// Skipped objects count.
|
||||
uint64 skipped = 9;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/types.pb.go
generated
BIN
pkg/services/control/types.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue