206 lines
3.9 KiB
Go
206 lines
3.9 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type EvacuateProcessState int
|
|
|
|
const (
|
|
EvacuateProcessStateUndefined EvacuateProcessState = iota
|
|
EvacuateProcessStateRunning
|
|
EvacuateProcessStateCompleted
|
|
)
|
|
|
|
type EvacuationState struct {
|
|
shardIDs []string
|
|
processState EvacuateProcessState
|
|
startedAt time.Time
|
|
finishedAt time.Time
|
|
result *EvacuateShardRes
|
|
errMessage string
|
|
}
|
|
|
|
func (s *EvacuationState) ShardIDs() []string {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.shardIDs
|
|
}
|
|
|
|
func (s *EvacuationState) ObjectsEvacuated() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.ObjectsEvacuated()
|
|
}
|
|
|
|
func (s *EvacuationState) ObjectsTotal() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.ObjectsTotal()
|
|
}
|
|
|
|
func (s *EvacuationState) ObjectsFailed() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.ObjectsFailed()
|
|
}
|
|
|
|
func (s *EvacuationState) ObjectsSkipped() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.ObjectsSkipped()
|
|
}
|
|
|
|
func (s *EvacuationState) TreesEvacuated() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.TreesEvacuated()
|
|
}
|
|
|
|
func (s *EvacuationState) TreesTotal() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.TreesTotal()
|
|
}
|
|
|
|
func (s *EvacuationState) TreesFailed() uint64 {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.result.TreesFailed()
|
|
}
|
|
|
|
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
|
|
if s == nil {
|
|
return EvacuateProcessStateUndefined
|
|
}
|
|
return s.processState
|
|
}
|
|
|
|
func (s *EvacuationState) StartedAt() *time.Time {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
defaultTime := time.Time{}
|
|
if s.startedAt == defaultTime {
|
|
return nil
|
|
}
|
|
return &s.startedAt
|
|
}
|
|
|
|
func (s *EvacuationState) FinishedAt() *time.Time {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
defaultTime := time.Time{}
|
|
if s.finishedAt == defaultTime {
|
|
return nil
|
|
}
|
|
return &s.finishedAt
|
|
}
|
|
|
|
func (s *EvacuationState) ErrorMessage() string {
|
|
if s == nil {
|
|
return ""
|
|
}
|
|
return s.errMessage
|
|
}
|
|
|
|
func (s *EvacuationState) DeepCopy() *EvacuationState {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
shardIDs := make([]string, len(s.shardIDs))
|
|
copy(shardIDs, s.shardIDs)
|
|
|
|
return &EvacuationState{
|
|
shardIDs: shardIDs,
|
|
processState: s.processState,
|
|
startedAt: s.startedAt,
|
|
finishedAt: s.finishedAt,
|
|
errMessage: s.errMessage,
|
|
result: s.result.DeepCopy(),
|
|
}
|
|
}
|
|
|
|
type evacuationLimiter struct {
|
|
state EvacuationState
|
|
eg *errgroup.Group
|
|
cancel context.CancelFunc
|
|
|
|
guard sync.RWMutex
|
|
}
|
|
|
|
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
|
|
l.guard.Lock()
|
|
defer l.guard.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if l.state.processState == EvacuateProcessStateRunning {
|
|
return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
|
|
}
|
|
|
|
var egCtx context.Context
|
|
egCtx, l.cancel = context.WithCancel(ctx)
|
|
l.eg, egCtx = errgroup.WithContext(egCtx)
|
|
l.state = EvacuationState{
|
|
shardIDs: shardIDs,
|
|
processState: EvacuateProcessStateRunning,
|
|
startedAt: time.Now().UTC(),
|
|
result: result,
|
|
}
|
|
|
|
return l.eg, egCtx, nil
|
|
}
|
|
|
|
func (l *evacuationLimiter) Complete(err error) {
|
|
l.guard.Lock()
|
|
defer l.guard.Unlock()
|
|
|
|
errMsq := ""
|
|
if err != nil {
|
|
errMsq = err.Error()
|
|
}
|
|
l.state.processState = EvacuateProcessStateCompleted
|
|
l.state.errMessage = errMsq
|
|
l.state.finishedAt = time.Now().UTC()
|
|
|
|
l.eg = nil
|
|
}
|
|
|
|
func (l *evacuationLimiter) GetState() *EvacuationState {
|
|
l.guard.RLock()
|
|
defer l.guard.RUnlock()
|
|
|
|
return l.state.DeepCopy()
|
|
}
|
|
|
|
func (l *evacuationLimiter) CancelIfRunning() error {
|
|
l.guard.Lock()
|
|
defer l.guard.Unlock()
|
|
|
|
if l.state.processState != EvacuateProcessStateRunning {
|
|
return logicerr.New("there is no running evacuation task")
|
|
}
|
|
|
|
l.cancel()
|
|
return nil
|
|
}
|