[#1318] engine: Change tombstone clear process
- Delete objects physically on tombstone's arrival; - Store information about tombstones in the Graveyard; - Clear Graveyard every epoch based on the information about TS in the network. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
e4cfeec449
commit
7799f8e4cf
4 changed files with 93 additions and 66 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -153,15 +154,9 @@ func (e *StorageEngine) inhumeAddr(addr *addressSDK.Address, prm *shard.InhumePr
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*addressSDK.Address) {
|
||||
tss := make(map[string]*addressSDK.Address, len(addrs))
|
||||
|
||||
for i := range addrs {
|
||||
tss[addrs[i].String()] = addrs[i]
|
||||
}
|
||||
|
||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
sh.HandleExpiredTombstones(tss)
|
||||
sh.HandleExpiredTombstones(addrs)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -2,10 +2,10 @@ package meta
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
|
|
|
@ -13,6 +13,15 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TombstoneSource is an interface that checks
|
||||
// tombstone status in the NeoFS network.
|
||||
type TombstoneSource interface {
|
||||
// IsTombstoneAvailable must return boolean value that means
|
||||
// provided tombstone's presence in the NeoFS network at the
|
||||
// time of the passed epoch.
|
||||
IsTombstoneAvailable(ctx context.Context, addr *addressSDK.Address, epoch uint64) bool
|
||||
}
|
||||
|
||||
// Event represents class of external events.
|
||||
type Event interface {
|
||||
typ() eventType
|
||||
|
@ -238,17 +247,54 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
|||
}
|
||||
|
||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
|
||||
return typ == object.TypeTombstone
|
||||
})
|
||||
if err != nil || len(expired) == 0 {
|
||||
if err != nil {
|
||||
s.log.Warn("iterator over expired tombstones failed", zap.String("error", err.Error()))
|
||||
epoch := e.(newEpoch).epoch
|
||||
log := s.log.With(zap.Uint64("epoch", epoch))
|
||||
|
||||
log.Debug("started expired tombstones handling")
|
||||
|
||||
const tssDeleteBatch = 50
|
||||
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||
|
||||
iterPrm := new(meta.GraveyardIterationPrm).SetHandler(func(deletedObject meta.TombstonedObject) error {
|
||||
tss = append(tss, deletedObject)
|
||||
|
||||
if len(tss) == tssDeleteBatch {
|
||||
return meta.ErrInterruptIterator
|
||||
}
|
||||
return
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
log.Debug("iterating tombstones")
|
||||
|
||||
err := s.metaBase.IterateOverGraveyard(iterPrm)
|
||||
if err != nil {
|
||||
log.Error("iterator over graveyard failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
tssLen := len(tss)
|
||||
if tssLen == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, ts := range tss {
|
||||
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
|
||||
tssExp = append(tssExp, ts)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("handling expired tombstones batch", zap.Int("number", tssLen))
|
||||
s.expiredTombstonesCallback(ctx, tss)
|
||||
|
||||
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||
tss = tss[:0]
|
||||
tssExp = tssExp[:0]
|
||||
}
|
||||
|
||||
s.expiredTombstonesCallback(ctx, expired)
|
||||
log.Debug("finished expired tombstones handling")
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||
|
@ -285,60 +331,24 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu
|
|||
return expired, ctx.Err()
|
||||
}
|
||||
|
||||
// HandleExpiredTombstones marks to be removed all objects that are
|
||||
// protected by tombstones with string addresses from tss.
|
||||
// If successful, marks tombstones themselves as garbage.
|
||||
// HandleExpiredTombstones marks tombstones themselves as garbage
|
||||
// and clears up corresponding graveyard records.
|
||||
//
|
||||
// Does not modify tss.
|
||||
func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) {
|
||||
inhume := make([]*addressSDK.Address, 0, len(tss))
|
||||
|
||||
// Collect all objects covered by the tombstones.
|
||||
|
||||
err := s.metaBase.IterateCoveredByTombstones(tss, func(addr *addressSDK.Address) error {
|
||||
inhume = append(inhume, addr)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s.log.Warn("iterator over expired objects failed",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Mark collected objects as garbage.
|
||||
|
||||
func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
|
||||
// Mark tombstones as garbage.
|
||||
var pInhume meta.InhumePrm
|
||||
|
||||
tsAddrs := make([]*addressSDK.Address, 0, len(tss))
|
||||
for _, ts := range tss {
|
||||
tsAddrs = append(tsAddrs, ts.Tombstone())
|
||||
}
|
||||
|
||||
pInhume.WithGCMark()
|
||||
|
||||
if len(inhume) > 0 {
|
||||
// inhume objects
|
||||
pInhume.WithAddresses(inhume...)
|
||||
|
||||
_, err = s.metaBase.Inhume(&pInhume)
|
||||
if err != nil {
|
||||
s.log.Warn("could not inhume objects under the expired tombstone",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Mark the tombstones as garbage.
|
||||
|
||||
inhume = inhume[:0]
|
||||
|
||||
for _, addr := range tss {
|
||||
inhume = append(inhume, addr)
|
||||
}
|
||||
|
||||
pInhume.WithAddresses(inhume...) // GC mark is already set above
|
||||
pInhume.WithAddresses(tsAddrs...)
|
||||
|
||||
// inhume tombstones
|
||||
_, err = s.metaBase.Inhume(&pInhume)
|
||||
_, err := s.metaBase.Inhume(&pInhume)
|
||||
if err != nil {
|
||||
s.log.Warn("could not mark tombstones as garbage",
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -346,6 +356,13 @@ func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) {
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
// drop just processed expired tombstones
|
||||
// from graveyard
|
||||
err = s.metaBase.DropGraves(tss)
|
||||
if err != nil {
|
||||
s.log.Warn("could not drop expired grave records", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
||||
|
|
|
@ -25,11 +25,16 @@ type Shard struct {
|
|||
blobStor *blobstor.BlobStor
|
||||
|
||||
metaBase *meta.DB
|
||||
|
||||
tsSource TombstoneSource
|
||||
}
|
||||
|
||||
// Option represents Shard's constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
|
||||
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
|
||||
|
||||
// ExpiredObjectsCallback is a callback handling list of expired objects.
|
||||
type ExpiredObjectsCallback func(context.Context, []*addressSDK.Address)
|
||||
|
||||
|
@ -54,9 +59,11 @@ type cfg struct {
|
|||
|
||||
gcCfg *gcCfg
|
||||
|
||||
expiredTombstonesCallback ExpiredObjectsCallback
|
||||
expiredTombstonesCallback ExpiredTombstonesCallback
|
||||
|
||||
expiredLocksCallback ExpiredObjectsCallback
|
||||
|
||||
tsSource TombstoneSource
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -91,6 +98,7 @@ func New(opts ...Option) *Shard {
|
|||
blobStor: bs,
|
||||
metaBase: mb,
|
||||
writeCache: writeCache,
|
||||
tsSource: c.tsSource,
|
||||
}
|
||||
|
||||
s.fillInfo()
|
||||
|
@ -184,7 +192,7 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option {
|
|||
|
||||
// WithExpiredTombstonesCallback returns option to specify callback
|
||||
// of the expired tombstones handler.
|
||||
func WithExpiredTombstonesCallback(cb ExpiredObjectsCallback) Option {
|
||||
func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {
|
||||
return func(c *cfg) {
|
||||
c.expiredTombstonesCallback = cb
|
||||
}
|
||||
|
@ -214,6 +222,13 @@ func WithMode(v Mode) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithTombstoneSource returns option to set TombstoneSource.
|
||||
func WithTombstoneSource(v TombstoneSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.tsSource = v
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) fillInfo() {
|
||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||
|
|
Loading…
Reference in a new issue