[#1318] engine: Change tombstone clear process
- Delete objects physically on tombstone's arrival; - Store information about tombstones in the Graveyard; - Clear Graveyard every epoch based on the information about TS in the network. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
e4cfeec449
commit
7799f8e4cf
4 changed files with 93 additions and 66 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
@ -153,15 +154,9 @@ func (e *StorageEngine) inhumeAddr(addr *addressSDK.Address, prm *shard.InhumePr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*addressSDK.Address) {
|
func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []meta.TombstonedObject) {
|
||||||
tss := make(map[string]*addressSDK.Address, len(addrs))
|
|
||||||
|
|
||||||
for i := range addrs {
|
|
||||||
tss[addrs[i].String()] = addrs[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
sh.HandleExpiredTombstones(tss)
|
sh.HandleExpiredTombstones(addrs)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -2,10 +2,10 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,15 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TombstoneSource is an interface that checks
|
||||||
|
// tombstone status in the NeoFS network.
|
||||||
|
type TombstoneSource interface {
|
||||||
|
// IsTombstoneAvailable must return boolean value that means
|
||||||
|
// provided tombstone's presence in the NeoFS network at the
|
||||||
|
// time of the passed epoch.
|
||||||
|
IsTombstoneAvailable(ctx context.Context, addr *addressSDK.Address, epoch uint64) bool
|
||||||
|
}
|
||||||
|
|
||||||
// Event represents class of external events.
|
// Event represents class of external events.
|
||||||
type Event interface {
|
type Event interface {
|
||||||
typ() eventType
|
typ() eventType
|
||||||
|
@ -238,17 +247,54 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
|
epoch := e.(newEpoch).epoch
|
||||||
return typ == object.TypeTombstone
|
log := s.log.With(zap.Uint64("epoch", epoch))
|
||||||
})
|
|
||||||
if err != nil || len(expired) == 0 {
|
log.Debug("started expired tombstones handling")
|
||||||
if err != nil {
|
|
||||||
s.log.Warn("iterator over expired tombstones failed", zap.String("error", err.Error()))
|
const tssDeleteBatch = 50
|
||||||
|
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||||
|
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
|
||||||
|
|
||||||
|
iterPrm := new(meta.GraveyardIterationPrm).SetHandler(func(deletedObject meta.TombstonedObject) error {
|
||||||
|
tss = append(tss, deletedObject)
|
||||||
|
|
||||||
|
if len(tss) == tssDeleteBatch {
|
||||||
|
return meta.ErrInterruptIterator
|
||||||
}
|
}
|
||||||
return
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
for {
|
||||||
|
log.Debug("iterating tombstones")
|
||||||
|
|
||||||
|
err := s.metaBase.IterateOverGraveyard(iterPrm)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("iterator over graveyard failed", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tssLen := len(tss)
|
||||||
|
if tssLen == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ts := range tss {
|
||||||
|
if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) {
|
||||||
|
tssExp = append(tssExp, ts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("handling expired tombstones batch", zap.Int("number", tssLen))
|
||||||
|
s.expiredTombstonesCallback(ctx, tss)
|
||||||
|
|
||||||
|
iterPrm.SetOffset(tss[tssLen-1].Address())
|
||||||
|
tss = tss[:0]
|
||||||
|
tssExp = tssExp[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
s.expiredTombstonesCallback(ctx, expired)
|
log.Debug("finished expired tombstones handling")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
|
@ -285,60 +331,24 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu
|
||||||
return expired, ctx.Err()
|
return expired, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleExpiredTombstones marks to be removed all objects that are
|
// HandleExpiredTombstones marks tombstones themselves as garbage
|
||||||
// protected by tombstones with string addresses from tss.
|
// and clears up corresponding graveyard records.
|
||||||
// If successful, marks tombstones themselves as garbage.
|
|
||||||
//
|
//
|
||||||
// Does not modify tss.
|
// Does not modify tss.
|
||||||
func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) {
|
func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
|
||||||
inhume := make([]*addressSDK.Address, 0, len(tss))
|
// Mark tombstones as garbage.
|
||||||
|
|
||||||
// Collect all objects covered by the tombstones.
|
|
||||||
|
|
||||||
err := s.metaBase.IterateCoveredByTombstones(tss, func(addr *addressSDK.Address) error {
|
|
||||||
inhume = append(inhume, addr)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
s.log.Warn("iterator over expired objects failed",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark collected objects as garbage.
|
|
||||||
|
|
||||||
var pInhume meta.InhumePrm
|
var pInhume meta.InhumePrm
|
||||||
|
|
||||||
|
tsAddrs := make([]*addressSDK.Address, 0, len(tss))
|
||||||
|
for _, ts := range tss {
|
||||||
|
tsAddrs = append(tsAddrs, ts.Tombstone())
|
||||||
|
}
|
||||||
|
|
||||||
pInhume.WithGCMark()
|
pInhume.WithGCMark()
|
||||||
|
pInhume.WithAddresses(tsAddrs...)
|
||||||
if len(inhume) > 0 {
|
|
||||||
// inhume objects
|
|
||||||
pInhume.WithAddresses(inhume...)
|
|
||||||
|
|
||||||
_, err = s.metaBase.Inhume(&pInhume)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Warn("could not inhume objects under the expired tombstone",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark the tombstones as garbage.
|
|
||||||
|
|
||||||
inhume = inhume[:0]
|
|
||||||
|
|
||||||
for _, addr := range tss {
|
|
||||||
inhume = append(inhume, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
pInhume.WithAddresses(inhume...) // GC mark is already set above
|
|
||||||
|
|
||||||
// inhume tombstones
|
// inhume tombstones
|
||||||
_, err = s.metaBase.Inhume(&pInhume)
|
_, err := s.metaBase.Inhume(&pInhume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn("could not mark tombstones as garbage",
|
s.log.Warn("could not mark tombstones as garbage",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -346,6 +356,13 @@ func (s *Shard) HandleExpiredTombstones(tss map[string]*addressSDK.Address) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drop just processed expired tombstones
|
||||||
|
// from graveyard
|
||||||
|
err = s.metaBase.DropGraves(tss)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn("could not drop expired grave records", zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
||||||
|
|
|
@ -25,11 +25,16 @@ type Shard struct {
|
||||||
blobStor *blobstor.BlobStor
|
blobStor *blobstor.BlobStor
|
||||||
|
|
||||||
metaBase *meta.DB
|
metaBase *meta.DB
|
||||||
|
|
||||||
|
tsSource TombstoneSource
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents Shard's constructor option.
|
// Option represents Shard's constructor option.
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
|
// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
|
||||||
|
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
|
||||||
|
|
||||||
// ExpiredObjectsCallback is a callback handling list of expired objects.
|
// ExpiredObjectsCallback is a callback handling list of expired objects.
|
||||||
type ExpiredObjectsCallback func(context.Context, []*addressSDK.Address)
|
type ExpiredObjectsCallback func(context.Context, []*addressSDK.Address)
|
||||||
|
|
||||||
|
@ -54,9 +59,11 @@ type cfg struct {
|
||||||
|
|
||||||
gcCfg *gcCfg
|
gcCfg *gcCfg
|
||||||
|
|
||||||
expiredTombstonesCallback ExpiredObjectsCallback
|
expiredTombstonesCallback ExpiredTombstonesCallback
|
||||||
|
|
||||||
expiredLocksCallback ExpiredObjectsCallback
|
expiredLocksCallback ExpiredObjectsCallback
|
||||||
|
|
||||||
|
tsSource TombstoneSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -91,6 +98,7 @@ func New(opts ...Option) *Shard {
|
||||||
blobStor: bs,
|
blobStor: bs,
|
||||||
metaBase: mb,
|
metaBase: mb,
|
||||||
writeCache: writeCache,
|
writeCache: writeCache,
|
||||||
|
tsSource: c.tsSource,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.fillInfo()
|
s.fillInfo()
|
||||||
|
@ -184,7 +192,7 @@ func WithGCRemoverSleepInterval(dur time.Duration) Option {
|
||||||
|
|
||||||
// WithExpiredTombstonesCallback returns option to specify callback
|
// WithExpiredTombstonesCallback returns option to specify callback
|
||||||
// of the expired tombstones handler.
|
// of the expired tombstones handler.
|
||||||
func WithExpiredTombstonesCallback(cb ExpiredObjectsCallback) Option {
|
func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.expiredTombstonesCallback = cb
|
c.expiredTombstonesCallback = cb
|
||||||
}
|
}
|
||||||
|
@ -214,6 +222,13 @@ func WithMode(v Mode) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithTombstoneSource returns option to set TombstoneSource.
|
||||||
|
func WithTombstoneSource(v TombstoneSource) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.tsSource = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) fillInfo() {
|
func (s *Shard) fillInfo() {
|
||||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||||
|
|
Loading…
Reference in a new issue