diff --git a/pkg/services/object_manager/tombstone/checker.go b/pkg/services/object_manager/tombstone/checker.go new file mode 100644 index 00000000..7ee3edaf --- /dev/null +++ b/pkg/services/object_manager/tombstone/checker.go @@ -0,0 +1,93 @@ +package tombstone + +import ( + "context" + "strconv" + + lru "github.com/hashicorp/golang-lru" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-sdk-go/object" + addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + "go.uber.org/zap" +) + +// Source is a tombstone source interface. +type Source interface { + // Tombstone must return tombstone from the source it was + // configured to fetch from and any error that appeared during + // fetching process. + // + // Tombstone MUST return (nil, nil) if requested tombstone is + // missing in the storage for the provided epoch. + Tombstone(ctx context.Context, a *addressSDK.Address, epoch uint64) (*object.Object, error) +} + +// ExpirationChecker is a tombstone source wrapper. +// It checks tombstones presence via tombstone +// source, caches it checks its expiration. +// +// Must be created via NewChecker function. `var` and +// `ExpirationChecker{}` declarations leads to undefined behaviour +// and may lead to panics. +type ExpirationChecker struct { + cache *lru.Cache + + log *zap.Logger + + tsSource Source +} + +// IsTombstoneAvailable checks the tombstone presence in the system in the +// following order: +// * 1. Local LRU cache; +// * 2. Tombstone source. +// +// If a tombstone was successfully fetched (regardless of its expiration) +// it is cached in the LRU cache. +func (g *ExpirationChecker) IsTombstoneAvailable(ctx context.Context, a *addressSDK.Address, epoch uint64) bool { + addrStr := a.String() + log := g.log.With(zap.String("address", addrStr)) + + expEpoch, ok := g.cache.Get(addrStr) + if ok { + return expEpoch.(uint64) > epoch + } + + ts, err := g.tsSource.Tombstone(ctx, a, epoch) + if err != nil { + log.Warn( + "tombstone getter: could not get the tombstone the source", + zap.Error(err), + ) + } else { + if ts != nil { + return g.handleTS(addrStr, ts, epoch) + } + } + + // requested tombstone not + // found in the NeoFS network + return false +} + +func (g *ExpirationChecker) handleTS(addr string, ts *object.Object, reqEpoch uint64) bool { + for _, atr := range ts.Attributes() { + if atr.Key() == objectV2.SysAttributeExpEpoch { + epoch, err := strconv.ParseUint(atr.Value(), 10, 64) + if err != nil { + g.log.Warn( + "tombstone getter: could not parse tombstone expiration epoch", + zap.Error(err), + ) + + return false + } + + g.cache.Add(addr, epoch) + return epoch > reqEpoch + } + } + + // unexpected tombstone without expiration epoch + return false +} diff --git a/pkg/services/object_manager/tombstone/constructor.go b/pkg/services/object_manager/tombstone/constructor.go new file mode 100644 index 00000000..36f006c8 --- /dev/null +++ b/pkg/services/object_manager/tombstone/constructor.go @@ -0,0 +1,84 @@ +package tombstone + +import ( + "fmt" + + lru "github.com/hashicorp/golang-lru" + "go.uber.org/zap" +) + +const defaultLRUCacheSize = 100 + +type cfg struct { + log *zap.Logger + + cacheSize int + + tsSource Source +} + +// Option is an option of ExpirationChecker's constructor. +type Option func(*cfg) + +func defaultCfg() *cfg { + return &cfg{ + log: zap.NewNop(), + cacheSize: defaultLRUCacheSize, + } +} + +// NewChecker creates, initializes and returns tombstone ExpirationChecker. +// The returned structure is ready to use. +// +// Panics if any of the provided options does not allow +// constructing a valid tombstone ExpirationChecker. +func NewChecker(oo ...Option) *ExpirationChecker { + cfg := defaultCfg() + + for _, o := range oo { + o(cfg) + } + + panicOnNil := func(v interface{}, name string) { + if v == nil { + panic(fmt.Sprintf("tombstone getter constructor: %s is nil", name)) + } + } + + panicOnNil(cfg.tsSource, "Tombstone source") + + cache, err := lru.New(cfg.cacheSize) + if err != nil { + panic(fmt.Errorf("could not create LRU cache with %d size: %w", cfg.cacheSize, err)) + } + + return &ExpirationChecker{ + cache: cache, + log: cfg.log, + tsSource: cfg.tsSource, + } +} + +// WithLogger returns an option to specify +// logger. +func WithLogger(v *zap.Logger) Option { + return func(c *cfg) { + c.log = v + } +} + +// WithCacheSize returns an option to specify +// LRU cache size. +func WithCacheSize(v int) Option { + return func(c *cfg) { + c.cacheSize = v + } +} + +// WithTombstoneSource returns an option +// to specify tombstone source. +func WithTombstoneSource(v Source) Option { + return func(c *cfg) { + c.tsSource = v + } +} diff --git a/pkg/services/object_manager/tombstone/source/source.go b/pkg/services/object_manager/tombstone/source/source.go new file mode 100644 index 00000000..bb19782a --- /dev/null +++ b/pkg/services/object_manager/tombstone/source/source.go @@ -0,0 +1,84 @@ +package tsourse + +import ( + "context" + "errors" + "fmt" + + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" +) + +// Source represents wrapper over the object service that +// allows checking if a tombstone is available in NeoFS +// network. +// +// Must be created via NewSource function. `var` and `Source{}` +// declarations leads to undefined behaviour and may lead +// to panics. +type Source struct { + s *getsvc.Service +} + +// TombstoneSourcePrm groups required parameters for Source creation. +type TombstoneSourcePrm struct { + s *getsvc.Service +} + +// SetGetService sets object service. +func (s *TombstoneSourcePrm) SetGetService(v *getsvc.Service) { + s.s = v +} + +// NewSource creates, initialize and returns local tombstone Source. +// The returned structure is ready to use. +// +// Panics if any of the provided options does not allow +// constructing a valid tombstone local Source. +func NewSource(p TombstoneSourcePrm) Source { + if p.s == nil { + panic("Tombstone source: nil object service") + } + + return Source{s: p.s} +} + +type headerWriter struct { + o *objectSDK.Object +} + +func (h *headerWriter) WriteHeader(o *objectSDK.Object) error { + h.o = o + return nil +} + +// Tombstone checks if the engine stores tombstone. +// Returns nil, nil if the tombstone has been removed +// or marked for removal. +func (s Source) Tombstone(ctx context.Context, a *addressSDK.Address, _ uint64) (*object.Object, error) { + var hr headerWriter + + var headPrm getsvc.HeadPrm + headPrm.WithAddress(a) + headPrm.SetHeaderWriter(&hr) + headPrm.SetCommonParameters(&util.CommonPrm{}) //default values are ok for that operation + + err := s.s.Head(ctx, headPrm) + switch { + case errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)): + return nil, nil + case err != nil: + return nil, fmt.Errorf("could not get tombstone from the source: %w", err) + default: + } + + if hr.o.Type() != objectSDK.TypeTombstone { + return nil, fmt.Errorf("returned %s object is not a tombstone", a) + } + + return hr.o, nil +}