forked from TrueCloudLab/frostfs-node
[#1318] service: Add tombstone checker service
The service fetches tombstones from the network via object service, every request is handled in the following order: 1. checks local LRU cache; 2. checks local storage engine; 3. tries to find object in the placement nodes. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
2583f608e8
commit
e4cfeec449
3 changed files with 261 additions and 0 deletions
93
pkg/services/object_manager/tombstone/checker.go
Normal file
93
pkg/services/object_manager/tombstone/checker.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
package tombstone
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Source is a tombstone source interface.
|
||||
type Source interface {
|
||||
// Tombstone must return tombstone from the source it was
|
||||
// configured to fetch from and any error that appeared during
|
||||
// fetching process.
|
||||
//
|
||||
// Tombstone MUST return (nil, nil) if requested tombstone is
|
||||
// missing in the storage for the provided epoch.
|
||||
Tombstone(ctx context.Context, a *addressSDK.Address, epoch uint64) (*object.Object, error)
|
||||
}
|
||||
|
||||
// ExpirationChecker is a tombstone source wrapper.
|
||||
// It checks tombstones presence via tombstone
|
||||
// source, caches it checks its expiration.
|
||||
//
|
||||
// Must be created via NewChecker function. `var` and
|
||||
// `ExpirationChecker{}` declarations leads to undefined behaviour
|
||||
// and may lead to panics.
|
||||
type ExpirationChecker struct {
|
||||
cache *lru.Cache
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
tsSource Source
|
||||
}
|
||||
|
||||
// IsTombstoneAvailable checks the tombstone presence in the system in the
|
||||
// following order:
|
||||
// * 1. Local LRU cache;
|
||||
// * 2. Tombstone source.
|
||||
//
|
||||
// If a tombstone was successfully fetched (regardless of its expiration)
|
||||
// it is cached in the LRU cache.
|
||||
func (g *ExpirationChecker) IsTombstoneAvailable(ctx context.Context, a *addressSDK.Address, epoch uint64) bool {
|
||||
addrStr := a.String()
|
||||
log := g.log.With(zap.String("address", addrStr))
|
||||
|
||||
expEpoch, ok := g.cache.Get(addrStr)
|
||||
if ok {
|
||||
return expEpoch.(uint64) > epoch
|
||||
}
|
||||
|
||||
ts, err := g.tsSource.Tombstone(ctx, a, epoch)
|
||||
if err != nil {
|
||||
log.Warn(
|
||||
"tombstone getter: could not get the tombstone the source",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
if ts != nil {
|
||||
return g.handleTS(addrStr, ts, epoch)
|
||||
}
|
||||
}
|
||||
|
||||
// requested tombstone not
|
||||
// found in the NeoFS network
|
||||
return false
|
||||
}
|
||||
|
||||
func (g *ExpirationChecker) handleTS(addr string, ts *object.Object, reqEpoch uint64) bool {
|
||||
for _, atr := range ts.Attributes() {
|
||||
if atr.Key() == objectV2.SysAttributeExpEpoch {
|
||||
epoch, err := strconv.ParseUint(atr.Value(), 10, 64)
|
||||
if err != nil {
|
||||
g.log.Warn(
|
||||
"tombstone getter: could not parse tombstone expiration epoch",
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
g.cache.Add(addr, epoch)
|
||||
return epoch > reqEpoch
|
||||
}
|
||||
}
|
||||
|
||||
// unexpected tombstone without expiration epoch
|
||||
return false
|
||||
}
|
84
pkg/services/object_manager/tombstone/constructor.go
Normal file
84
pkg/services/object_manager/tombstone/constructor.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package tombstone
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const defaultLRUCacheSize = 100
|
||||
|
||||
type cfg struct {
|
||||
log *zap.Logger
|
||||
|
||||
cacheSize int
|
||||
|
||||
tsSource Source
|
||||
}
|
||||
|
||||
// Option is an option of ExpirationChecker's constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: zap.NewNop(),
|
||||
cacheSize: defaultLRUCacheSize,
|
||||
}
|
||||
}
|
||||
|
||||
// NewChecker creates, initializes and returns tombstone ExpirationChecker.
|
||||
// The returned structure is ready to use.
|
||||
//
|
||||
// Panics if any of the provided options does not allow
|
||||
// constructing a valid tombstone ExpirationChecker.
|
||||
func NewChecker(oo ...Option) *ExpirationChecker {
|
||||
cfg := defaultCfg()
|
||||
|
||||
for _, o := range oo {
|
||||
o(cfg)
|
||||
}
|
||||
|
||||
panicOnNil := func(v interface{}, name string) {
|
||||
if v == nil {
|
||||
panic(fmt.Sprintf("tombstone getter constructor: %s is nil", name))
|
||||
}
|
||||
}
|
||||
|
||||
panicOnNil(cfg.tsSource, "Tombstone source")
|
||||
|
||||
cache, err := lru.New(cfg.cacheSize)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not create LRU cache with %d size: %w", cfg.cacheSize, err))
|
||||
}
|
||||
|
||||
return &ExpirationChecker{
|
||||
cache: cache,
|
||||
log: cfg.log,
|
||||
tsSource: cfg.tsSource,
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns an option to specify
|
||||
// logger.
|
||||
func WithLogger(v *zap.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithCacheSize returns an option to specify
|
||||
// LRU cache size.
|
||||
func WithCacheSize(v int) Option {
|
||||
return func(c *cfg) {
|
||||
c.cacheSize = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithTombstoneSource returns an option
|
||||
// to specify tombstone source.
|
||||
func WithTombstoneSource(v Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.tsSource = v
|
||||
}
|
||||
}
|
84
pkg/services/object_manager/tombstone/source/source.go
Normal file
84
pkg/services/object_manager/tombstone/source/source.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package tsourse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
)
|
||||
|
||||
// Source represents wrapper over the object service that
|
||||
// allows checking if a tombstone is available in NeoFS
|
||||
// network.
|
||||
//
|
||||
// Must be created via NewSource function. `var` and `Source{}`
|
||||
// declarations leads to undefined behaviour and may lead
|
||||
// to panics.
|
||||
type Source struct {
|
||||
s *getsvc.Service
|
||||
}
|
||||
|
||||
// TombstoneSourcePrm groups required parameters for Source creation.
|
||||
type TombstoneSourcePrm struct {
|
||||
s *getsvc.Service
|
||||
}
|
||||
|
||||
// SetGetService sets object service.
|
||||
func (s *TombstoneSourcePrm) SetGetService(v *getsvc.Service) {
|
||||
s.s = v
|
||||
}
|
||||
|
||||
// NewSource creates, initialize and returns local tombstone Source.
|
||||
// The returned structure is ready to use.
|
||||
//
|
||||
// Panics if any of the provided options does not allow
|
||||
// constructing a valid tombstone local Source.
|
||||
func NewSource(p TombstoneSourcePrm) Source {
|
||||
if p.s == nil {
|
||||
panic("Tombstone source: nil object service")
|
||||
}
|
||||
|
||||
return Source{s: p.s}
|
||||
}
|
||||
|
||||
type headerWriter struct {
|
||||
o *objectSDK.Object
|
||||
}
|
||||
|
||||
func (h *headerWriter) WriteHeader(o *objectSDK.Object) error {
|
||||
h.o = o
|
||||
return nil
|
||||
}
|
||||
|
||||
// Tombstone checks if the engine stores tombstone.
|
||||
// Returns nil, nil if the tombstone has been removed
|
||||
// or marked for removal.
|
||||
func (s Source) Tombstone(ctx context.Context, a *addressSDK.Address, _ uint64) (*object.Object, error) {
|
||||
var hr headerWriter
|
||||
|
||||
var headPrm getsvc.HeadPrm
|
||||
headPrm.WithAddress(a)
|
||||
headPrm.SetHeaderWriter(&hr)
|
||||
headPrm.SetCommonParameters(&util.CommonPrm{}) //default values are ok for that operation
|
||||
|
||||
err := s.s.Head(ctx, headPrm)
|
||||
switch {
|
||||
case errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)):
|
||||
return nil, nil
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("could not get tombstone from the source: %w", err)
|
||||
default:
|
||||
}
|
||||
|
||||
if hr.o.Type() != objectSDK.TypeTombstone {
|
||||
return nil, fmt.Errorf("returned %s object is not a tombstone", a)
|
||||
}
|
||||
|
||||
return hr.o, nil
|
||||
}
|
Loading…
Reference in a new issue