frostfs-node/pkg/local_object_storage/shard/gc.go
Dmitrii Stepanov a7c79c773a [#168] node: Refactor node config
Resolve containedctx linter for cfg

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-03-31 09:32:59 +03:00

602 lines
13 KiB
Go

package shard
import (
"context"
"sync"
"time"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
minExpiredWorkers = 2
minExpiredBatchSize = 1
)
// TombstoneSource is an interface that checks
// tombstone status in the FrostFS network.
type TombstoneSource interface {
// IsTombstoneAvailable must return boolean value that means
// provided tombstone's presence in the FrostFS network at the
// time of the passed epoch.
IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool
}
// Event represents class of external events.
type Event interface {
typ() eventType
}
type eventType int
const (
_ eventType = iota
eventNewEpoch
)
type newEpoch struct {
epoch uint64
}
func (e newEpoch) typ() eventType {
return eventNewEpoch
}
// EventNewEpoch returns new epoch event.
func EventNewEpoch(e uint64) Event {
return newEpoch{
epoch: e,
}
}
type eventHandler func(context.Context, Event)
type eventHandlers struct {
prevGroup sync.WaitGroup
cancelFunc context.CancelFunc
handlers []eventHandler
}
type gc struct {
*gcCfg
onceStop sync.Once
stopChannel chan struct{}
wg sync.WaitGroup
workerPool util.WorkerPool
remover func()
eventChan chan Event
mEventHandler map[eventType]*eventHandlers
}
type gcCfg struct {
removerInterval time.Duration
log *logger.Logger
workerPoolInit func(int) util.WorkerPool
expiredCollectorWorkersCount int
expiredCollectorBatchSize int
}
func defaultGCCfg() gcCfg {
return gcCfg{
removerInterval: 10 * time.Second,
log: &logger.Logger{Logger: zap.L()},
workerPoolInit: func(int) util.WorkerPool {
return nil
},
}
}
func (gc *gc) init(ctx context.Context) {
sz := 0
for _, v := range gc.mEventHandler {
sz += len(v.handlers)
}
if sz > 0 {
gc.workerPool = gc.workerPoolInit(sz)
}
gc.wg.Add(2)
go gc.tickRemover()
go gc.listenEvents(ctx)
}
func (gc *gc) listenEvents(ctx context.Context) {
defer gc.wg.Done()
for {
event, ok := <-gc.eventChan
if !ok {
gc.log.Warn("stop event listener by closed channel")
return
}
v, ok := gc.mEventHandler[event.typ()]
if !ok {
continue
}
v.cancelFunc()
v.prevGroup.Wait()
ctx, v.cancelFunc = context.WithCancel(ctx)
v.prevGroup.Add(len(v.handlers))
for i := range v.handlers {
h := v.handlers[i]
err := gc.workerPool.Submit(func() {
h(ctx, event)
v.prevGroup.Done()
})
if err != nil {
gc.log.Warn("could not submit GC job to worker pool",
zap.String("error", err.Error()),
)
v.prevGroup.Done()
}
}
}
}
func (gc *gc) tickRemover() {
defer gc.wg.Done()
timer := time.NewTimer(gc.removerInterval)
defer timer.Stop()
for {
select {
case <-gc.stopChannel:
if gc.workerPool != nil {
gc.workerPool.Release()
}
close(gc.eventChan)
gc.log.Debug("GC is stopped")
return
case <-timer.C:
gc.remover()
timer.Reset(gc.removerInterval)
}
}
}
func (gc *gc) stop() {
gc.onceStop.Do(func() {
gc.stopChannel <- struct{}{}
})
gc.log.Info("waiting for GC workers to stop...")
gc.wg.Wait()
}
// iterates over metabase and deletes objects
// with GC-marked graves.
// Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage() {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode != mode.ReadWrite {
return
}
buf := make([]oid.Address, 0, s.rmBatchSize)
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
return meta.ErrInterruptIterator
}
return nil
})
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(iterPrm)
if err != nil {
s.log.Warn("iterator over metabase graveyard failed",
zap.String("error", err.Error()),
)
return
} else if len(buf) == 0 {
return
}
var deletePrm DeletePrm
deletePrm.SetAddresses(buf...)
// delete accumulated objects
_, err = s.delete(deletePrm)
if err != nil {
s.log.Warn("could not delete the objects",
zap.String("error", err.Error()),
)
return
}
}
func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
workersCount = minExpiredWorkers
batchSize = minExpiredBatchSize
if s.gc.gcCfg.expiredCollectorBatchSize > batchSize {
batchSize = s.gc.gcCfg.expiredCollectorBatchSize
}
if s.gc.gcCfg.expiredCollectorWorkersCount > workersCount {
workersCount = s.gc.gcCfg.expiredCollectorWorkersCount
}
return
}
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(workersCount)
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
if o.Type() != object.TypeTombstone && o.Type() != object.TypeLock {
batch = append(batch, o.Address())
if len(batch) == batchSize {
expired := batch
errGroup.Go(func() error {
s.handleExpiredObjects(egCtx, expired)
return egCtx.Err()
})
batch = make([]oid.Address, 0, batchSize)
}
}
})
if err != nil {
return err
}
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
s.handleExpiredObjects(egCtx, expired)
return egCtx.Err()
})
}
return nil
})
if err := errGroup.Wait(); err != nil {
s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
}
}
func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) {
select {
case <-ctx.Done():
return
default:
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return
}
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(expired...)
inhumePrm.SetGCMark()
// inhume the collected objects
res, err := s.metaBase.Inhume(inhumePrm)
if err != nil {
s.log.Warn("could not inhume the objects",
zap.String("error", err.Error()),
)
return
}
s.decObjectCounterBy(logical, res.AvailableInhumed())
i := 0
for i < res.GetDeletionInfoLength() {
delInfo := res.GetDeletionInfoByIndex(i)
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++
}
}
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))
log.Debug("started expired tombstones handling")
const tssDeleteBatch = 50
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
var iterPrm meta.GraveyardIterationPrm
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
tss = append(tss, deletedObject)
if len(tss) == tssDeleteBatch {
return meta.ErrInterruptIterator
}
return nil
})
for {
log.Debug("iterating tombstones")
s.m.RLock()
if s.info.Mode.NoMetabase() {
s.log.Debug("shard is in a degraded mode, skip collecting expired tombstones")
s.m.RUnlock()
return
}
err := s.metaBase.IterateOverGraveyard(iterPrm)
if err != nil {
log.Error("iterator over graveyard failed", zap.Error(err))
s.m.RUnlock()
return
}
s.m.RUnlock()
tssLen := len(tss)
if tssLen == 0 {
break
}
for _, ts := range tss {
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
tssExp = append(tssExp, ts)
}
}
log.Debug("handling expired tombstones batch", zap.Int("number", len(tssExp)))
s.expiredTombstonesCallback(ctx, tssExp)
iterPrm.SetOffset(tss[tssLen-1].Address())
tss = tss[:0]
tssExp = tssExp[:0]
}
log.Debug("finished expired tombstones handling")
}
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
workersCount, batchSize := s.getExpiredObjectsParameters()
errGroup, egCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(workersCount)
errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize)
err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
if o.Type() == object.TypeLock {
batch = append(batch, o.Address())
if len(batch) == batchSize {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
return egCtx.Err()
})
batch = make([]oid.Address, 0, batchSize)
}
}
})
if err != nil {
return err
}
if len(batch) > 0 {
expired := batch
errGroup.Go(func() error {
s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired)
return egCtx.Err()
})
}
return nil
})
if err := errGroup.Wait(); err != nil {
s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error()))
}
}
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
default:
onExpiredFound(expiredObject)
return nil
}
})
if err != nil {
return err
}
return ctx.Err()
}
func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.metaBase.FilterExpired(ctx, epoch, addresses)
}
// HandleExpiredTombstones marks tombstones themselves as garbage
// and clears up corresponding graveyard records.
//
// Does not modify tss.
func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
if s.GetMode().NoMetabase() {
return
}
// Mark tombstones as garbage.
var pInhume meta.InhumePrm
tsAddrs := make([]oid.Address, 0, len(tss))
for _, ts := range tss {
tsAddrs = append(tsAddrs, ts.Tombstone())
}
pInhume.SetGCMark()
pInhume.SetAddresses(tsAddrs...)
// inhume tombstones
res, err := s.metaBase.Inhume(pInhume)
if err != nil {
s.log.Warn("could not mark tombstones as garbage",
zap.String("error", err.Error()),
)
return
}
s.decObjectCounterBy(logical, res.AvailableInhumed())
i := 0
for i < res.GetDeletionInfoLength() {
delInfo := res.GetDeletionInfoByIndex(i)
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++
}
// drop just processed expired tombstones
// from graveyard
err = s.metaBase.DropGraves(tss)
if err != nil {
s.log.Warn("could not drop expired grave records", zap.Error(err))
}
}
// HandleExpiredLocks unlocks all objects which were locked by lockers.
// If successful, marks lockers themselves as garbage.
func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) {
if s.GetMode().NoMetabase() {
return
}
unlocked, err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
zap.String("error", err.Error()),
)
return
}
var pInhume meta.InhumePrm
pInhume.SetAddresses(lockers...)
pInhume.SetForceGCMark()
res, err := s.metaBase.Inhume(pInhume)
if err != nil {
s.log.Warn("failure to mark lockers as garbage",
zap.String("error", err.Error()),
)
return
}
s.decObjectCounterBy(logical, res.AvailableInhumed())
i := 0
for i < res.GetDeletionInfoLength() {
delInfo := res.GetDeletionInfoByIndex(i)
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++
}
s.inhumeUnlockedIfExpired(ctx, epoch, unlocked)
}
func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unlocked []oid.Address) {
expiredUnlocked, err := s.selectExpired(ctx, epoch, unlocked)
if err != nil {
s.log.Warn("failure to get expired unlocked objects", zap.Error(err))
return
}
if len(expiredUnlocked) == 0 {
return
}
s.handleExpiredObjects(ctx, expiredUnlocked)
}
// HandleDeletedLocks unlocks all objects which were locked by lockers.
func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
if s.GetMode().NoMetabase() {
return
}
_, err := s.metaBase.FreeLockedBy(lockers)
if err != nil {
s.log.Warn("failure to unlock objects",
zap.String("error", err.Error()),
)
return
}
}
// NotificationChannel returns channel for shard events.
func (s *Shard) NotificationChannel() chan<- Event {
return s.gc.eventChan
}