frostfs-node/pkg/local_object_storage/engine/evacuate_limiter.go
Dmitrii Stepanov 226e84d782 [] node: Add skipped objects count to evacuation result
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-11-07 12:17:11 +00:00

185 lines
3.5 KiB
Go

package engine
import (
"context"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"golang.org/x/sync/errgroup"
)
type EvacuateProcessState int
const (
EvacuateProcessStateUndefined EvacuateProcessState = iota
EvacuateProcessStateRunning
EvacuateProcessStateCompleted
)
type EvacuationState struct {
shardIDs []string
processState EvacuateProcessState
startedAt time.Time
finishedAt time.Time
result *EvacuateShardRes
errMessage string
}
func (s *EvacuationState) ShardIDs() []string {
if s == nil {
return nil
}
return s.shardIDs
}
func (s *EvacuationState) Evacuated() uint64 {
if s == nil {
return 0
}
return s.result.Evacuated()
}
func (s *EvacuationState) Total() uint64 {
if s == nil {
return 0
}
return s.result.Total()
}
func (s *EvacuationState) Failed() uint64 {
if s == nil {
return 0
}
return s.result.Failed()
}
func (s *EvacuationState) Skipped() uint64 {
if s == nil {
return 0
}
return s.result.Skipped()
}
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
if s == nil {
return EvacuateProcessStateUndefined
}
return s.processState
}
func (s *EvacuationState) StartedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.startedAt == defaultTime {
return nil
}
return &s.startedAt
}
func (s *EvacuationState) FinishedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.finishedAt == defaultTime {
return nil
}
return &s.finishedAt
}
func (s *EvacuationState) ErrorMessage() string {
if s == nil {
return ""
}
return s.errMessage
}
func (s *EvacuationState) DeepCopy() *EvacuationState {
if s == nil {
return nil
}
shardIDs := make([]string, len(s.shardIDs))
copy(shardIDs, s.shardIDs)
return &EvacuationState{
shardIDs: shardIDs,
processState: s.processState,
startedAt: s.startedAt,
finishedAt: s.finishedAt,
errMessage: s.errMessage,
result: s.result.DeepCopy(),
}
}
type evacuationLimiter struct {
state EvacuationState
eg *errgroup.Group
cancel context.CancelFunc
guard sync.RWMutex
}
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
l.guard.Lock()
defer l.guard.Unlock()
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if l.state.processState == EvacuateProcessStateRunning {
return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
}
var egCtx context.Context
egCtx, l.cancel = context.WithCancel(ctx)
l.eg, egCtx = errgroup.WithContext(egCtx)
l.state = EvacuationState{
shardIDs: shardIDs,
processState: EvacuateProcessStateRunning,
startedAt: time.Now().UTC(),
result: result,
}
return l.eg, egCtx, nil
}
func (l *evacuationLimiter) Complete(err error) {
l.guard.Lock()
defer l.guard.Unlock()
errMsq := ""
if err != nil {
errMsq = err.Error()
}
l.state.processState = EvacuateProcessStateCompleted
l.state.errMessage = errMsq
l.state.finishedAt = time.Now().UTC()
l.eg = nil
}
func (l *evacuationLimiter) GetState() *EvacuationState {
l.guard.RLock()
defer l.guard.RUnlock()
return l.state.DeepCopy()
}
func (l *evacuationLimiter) CancelIfRunning() error {
l.guard.Lock()
defer l.guard.Unlock()
if l.state.processState != EvacuateProcessStateRunning {
return logicerr.New("there is no running evacuation task")
}
l.cancel()
return nil
}