frostfs-node/pkg/services/object_manager/tombstone/checker.go
Pavel Karpy e4cfeec449 [] service: Add tombstone checker service
The service fetches tombstones from the network via object service, every
request is handled in the following order:
1. checks local LRU cache;
2. checks local storage engine;
3. tries to find object in the placement nodes.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
2022-04-29 16:38:52 +03:00

93 lines
2.4 KiB
Go

package tombstone
import (
"context"
"strconv"
lru "github.com/hashicorp/golang-lru"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-sdk-go/object"
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
"go.uber.org/zap"
)
// Source is a tombstone source interface.
type Source interface {
// Tombstone must return tombstone from the source it was
// configured to fetch from and any error that appeared during
// fetching process.
//
// Tombstone MUST return (nil, nil) if requested tombstone is
// missing in the storage for the provided epoch.
Tombstone(ctx context.Context, a *addressSDK.Address, epoch uint64) (*object.Object, error)
}
// ExpirationChecker is a tombstone source wrapper.
// It checks tombstones presence via tombstone
// source, caches it checks its expiration.
//
// Must be created via NewChecker function. `var` and
// `ExpirationChecker{}` declarations leads to undefined behaviour
// and may lead to panics.
type ExpirationChecker struct {
cache *lru.Cache
log *zap.Logger
tsSource Source
}
// IsTombstoneAvailable checks the tombstone presence in the system in the
// following order:
// * 1. Local LRU cache;
// * 2. Tombstone source.
//
// If a tombstone was successfully fetched (regardless of its expiration)
// it is cached in the LRU cache.
func (g *ExpirationChecker) IsTombstoneAvailable(ctx context.Context, a *addressSDK.Address, epoch uint64) bool {
addrStr := a.String()
log := g.log.With(zap.String("address", addrStr))
expEpoch, ok := g.cache.Get(addrStr)
if ok {
return expEpoch.(uint64) > epoch
}
ts, err := g.tsSource.Tombstone(ctx, a, epoch)
if err != nil {
log.Warn(
"tombstone getter: could not get the tombstone the source",
zap.Error(err),
)
} else {
if ts != nil {
return g.handleTS(addrStr, ts, epoch)
}
}
// requested tombstone not
// found in the NeoFS network
return false
}
func (g *ExpirationChecker) handleTS(addr string, ts *object.Object, reqEpoch uint64) bool {
for _, atr := range ts.Attributes() {
if atr.Key() == objectV2.SysAttributeExpEpoch {
epoch, err := strconv.ParseUint(atr.Value(), 10, 64)
if err != nil {
g.log.Warn(
"tombstone getter: could not parse tombstone expiration epoch",
zap.Error(err),
)
return false
}
g.cache.Add(addr, epoch)
return epoch > reqEpoch
}
}
// unexpected tombstone without expiration epoch
return false
}