Compare commits
3 commits
master
...
fix/evacua
Author | SHA1 | Date | |
---|---|---|---|
90a90c2820 | |||
9b51843aa5 | |||
01d9ed66e1 |
9 changed files with 34 additions and 6 deletions
|
@ -220,12 +220,12 @@ func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatu
|
||||||
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING ||
|
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING ||
|
||||||
resp.GetBody().GetDuration() == nil ||
|
resp.GetBody().GetDuration() == nil ||
|
||||||
resp.GetBody().GetTotal() == 0 ||
|
resp.GetBody().GetTotal() == 0 ||
|
||||||
resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed() == 0 {
|
resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed()+resp.Body.GetSkipped() == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds())
|
durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds())
|
||||||
evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed())
|
evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed() + resp.GetBody().GetSkipped())
|
||||||
avgObjEvacuationTimeSeconds := durationSeconds / evacuated
|
avgObjEvacuationTimeSeconds := durationSeconds / evacuated
|
||||||
objectsLeft := float64(resp.GetBody().GetTotal()) - evacuated
|
objectsLeft := float64(resp.GetBody().GetTotal()) - evacuated
|
||||||
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
|
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
|
||||||
|
@ -285,10 +285,11 @@ func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusR
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||||
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
|
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d.",
|
||||||
resp.GetBody().GetEvacuated(),
|
resp.GetBody().GetEvacuated(),
|
||||||
resp.Body.GetTotal(),
|
resp.GetBody().GetTotal(),
|
||||||
resp.Body.GetFailed()))
|
resp.GetBody().GetFailed(),
|
||||||
|
resp.GetBody().GetSkipped()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func initControlEvacuationShardCmd() {
|
func initControlEvacuationShardCmd() {
|
||||||
|
|
|
@ -41,6 +41,7 @@ type EvacuateShardRes struct {
|
||||||
evacuated *atomic.Uint64
|
evacuated *atomic.Uint64
|
||||||
total *atomic.Uint64
|
total *atomic.Uint64
|
||||||
failed *atomic.Uint64
|
failed *atomic.Uint64
|
||||||
|
skipped *atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEvacuateShardRes creates new EvacuateShardRes instance.
|
// NewEvacuateShardRes creates new EvacuateShardRes instance.
|
||||||
|
@ -49,6 +50,7 @@ func NewEvacuateShardRes() *EvacuateShardRes {
|
||||||
evacuated: new(atomic.Uint64),
|
evacuated: new(atomic.Uint64),
|
||||||
total: new(atomic.Uint64),
|
total: new(atomic.Uint64),
|
||||||
failed: new(atomic.Uint64),
|
failed: new(atomic.Uint64),
|
||||||
|
skipped: new(atomic.Uint64),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,6 +99,14 @@ func (p *EvacuateShardRes) Failed() uint64 {
|
||||||
return p.failed.Load()
|
return p.failed.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skipped returns count of skipped objects.
|
||||||
|
func (p *EvacuateShardRes) Skipped() uint64 {
|
||||||
|
if p == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return p.skipped.Load()
|
||||||
|
}
|
||||||
|
|
||||||
// DeepCopy returns deep copy of result instance.
|
// DeepCopy returns deep copy of result instance.
|
||||||
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
|
@ -107,11 +117,13 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||||
evacuated: new(atomic.Uint64),
|
evacuated: new(atomic.Uint64),
|
||||||
total: new(atomic.Uint64),
|
total: new(atomic.Uint64),
|
||||||
failed: new(atomic.Uint64),
|
failed: new(atomic.Uint64),
|
||||||
|
skipped: new(atomic.Uint64),
|
||||||
}
|
}
|
||||||
|
|
||||||
res.evacuated.Store(p.evacuated.Load())
|
res.evacuated.Store(p.evacuated.Load())
|
||||||
res.total.Store(p.total.Load())
|
res.total.Store(p.total.Load())
|
||||||
res.failed.Store(p.failed.Load())
|
res.failed.Store(p.failed.Load())
|
||||||
|
res.skipped.Store(p.skipped.Load())
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,6 +236,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
zap.Uint64("total", res.Total()),
|
zap.Uint64("total", res.Total()),
|
||||||
zap.Uint64("evacuated", res.Evacuated()),
|
zap.Uint64("evacuated", res.Evacuated()),
|
||||||
zap.Uint64("failed", res.Failed()),
|
zap.Uint64("failed", res.Failed()),
|
||||||
|
zap.Uint64("skipped", res.Skipped()),
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -404,6 +417,9 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
||||||
evacuationOperationLogField,
|
evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
}
|
}
|
||||||
|
if exists {
|
||||||
|
res.skipped.Add(1)
|
||||||
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,13 @@ func (s *EvacuationState) Failed() uint64 {
|
||||||
return s.result.Failed()
|
return s.result.Failed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *EvacuationState) Skipped() uint64 {
|
||||||
|
if s == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return s.result.Skipped()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
|
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return EvacuateProcessStateUndefined
|
return EvacuateProcessStateUndefined
|
||||||
|
|
|
@ -55,6 +55,7 @@ func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuation
|
||||||
StartedAt: startedAt,
|
StartedAt: startedAt,
|
||||||
Duration: duration,
|
Duration: duration,
|
||||||
ErrorMessage: state.ErrorMessage(),
|
ErrorMessage: state.ErrorMessage(),
|
||||||
|
Skipped: state.Skipped(),
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -362,7 +362,7 @@ message GetShardEvacuationStatusResponse {
|
||||||
int64 seconds = 1;
|
int64 seconds = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Total objects to evacuate count. The value is approximate, so evacuated + failed == total is not guaranteed after completion.
|
// Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion.
|
||||||
uint64 total = 1;
|
uint64 total = 1;
|
||||||
// Evacuated objects count.
|
// Evacuated objects count.
|
||||||
uint64 evacuated = 2;
|
uint64 evacuated = 2;
|
||||||
|
@ -379,6 +379,9 @@ message GetShardEvacuationStatusResponse {
|
||||||
UnixTimestamp started_at = 7;
|
UnixTimestamp started_at = 7;
|
||||||
// Error message if evacuation failed.
|
// Error message if evacuation failed.
|
||||||
string error_message = 8;
|
string error_message = 8;
|
||||||
|
|
||||||
|
// Skipped objects count.
|
||||||
|
uint64 skipped = 9;
|
||||||
}
|
}
|
||||||
|
|
||||||
Body body = 1;
|
Body body = 1;
|
||||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/types.pb.go
generated
BIN
pkg/services/control/types.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue