frostfs-node/pkg/local_object_storage/metabase/inhume.go

417 lines
11 KiB
Go
Raw Permalink Normal View History

package meta
import (
"bytes"
"context"
"errors"
"fmt"
"time"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
// InhumePrm encapsulates parameters for Inhume operation.
type InhumePrm struct {
tomb *oid.Address
target []oid.Address
lockObjectHandling bool
forceRemoval bool
}
// DeletionInfo contains details on deleted object.
type DeletionInfo struct {
Size uint64
CID cid.ID
IsUser bool
}
// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
deletedLockObj []oid.Address
logicInhumed uint64
userInhumed uint64
inhumedByCnrID map[cid.ID]ObjectCounters
deletionDetails []DeletionInfo
}
// LogicInhumed return number of logic object
// that have been inhumed.
func (i InhumeRes) LogicInhumed() uint64 {
return i.logicInhumed
}
func (i InhumeRes) UserInhumed() uint64 {
return i.userInhumed
}
// InhumedByCnrID return number of object
// that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
return i.inhumedByCnrID
}
// DeletedLockObjects returns deleted object of LOCK
// type. Returns always nil if WithoutLockObjectHandling
// was provided to the InhumePrm.
func (i InhumeRes) DeletedLockObjects() []oid.Address {
return i.deletedLockObj
}
// GetDeletionInfoLength returns amount of stored elements
// in deleted sizes array.
func (i InhumeRes) GetDeletionInfoLength() int {
return len(i.deletionDetails)
}
// GetDeletionInfoByIndex returns both deleted object sizes and
// associated container ID by index.
func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo {
return i.deletionDetails[target]
}
// StoreDeletionInfo stores size of deleted object and associated container ID
// in corresponding arrays.
func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) {
i.deletionDetails = append(i.deletionDetails, DeletionInfo{
Size: deletedSize,
CID: containerID,
IsUser: isUser,
})
i.logicInhumed++
if isUser {
i.userInhumed++
}
if v, ok := i.inhumedByCnrID[containerID]; ok {
v.Logic++
if isUser {
v.User++
}
i.inhumedByCnrID[containerID] = v
} else {
v = ObjectCounters{
Logic: 1,
}
if isUser {
v.User = 1
}
i.inhumedByCnrID[containerID] = v
}
}
// SetAddresses sets a list of object addresses that should be inhumed.
func (p *InhumePrm) SetAddresses(addrs ...oid.Address) {
p.target = addrs
}
// SetTombstoneAddress sets tombstone address as the reason for inhume operation.
//
// addr should not be nil.
// Should not be called along with SetGCMark.
func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) {
p.tomb = &addr
}
// SetGCMark marks the object to be physically removed.
//
// Should not be called along with SetTombstoneAddress.
func (p *InhumePrm) SetGCMark() {
p.tomb = nil
}
// SetLockObjectHandling checks if there were
// any LOCK object among the targets set via WithAddresses.
func (p *InhumePrm) SetLockObjectHandling() {
p.lockObjectHandling = true
}
// SetForceGCMark allows removal any object. Expected to be
// called only in control service.
func (p *InhumePrm) SetForceGCMark() {
p.tomb = nil
p.forceRemoval = true
}
var errBreakBucketForEach = errors.New("bucket ForEach break")
// ErrLockObjectRemoval is returned when inhume operation is being
// performed on lock object, and it is not a forced object removal.
var ErrLockObjectRemoval = logicerr.New("lock object removal")
// Inhume marks objects as removed but not removes it from metabase.
//
// Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked
// if at least one object is locked. Returns ErrLockObjectRemoval if inhuming
// is being performed on lock (not locked) object.
//
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
// with that object) if WithForceGCMark option has been provided.
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("Inhume", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.Inhume")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() {
return InhumeRes{}, ErrReadOnlyMode
}
res := InhumeRes{
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
}
currEpoch := db.epochState.CurrentEpoch()
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res)
})
success = err == nil
if success {
for _, addr := range prm.target {
storagelog.Write(db.log,
storagelog.AddressField(addr),
storagelog.OpField("metabase INHUME"))
}
}
return res, metaerr.Wrap(err)
}
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
if err != nil {
return err
}
buf := make([]byte, addressKeySize)
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
continue
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
return err
}
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
}
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
) error {
for _, chunk := range ecInfo.Chunks {
chunkBuf := make([]byte, addressKeySize)
var chunkAddr oid.Address
chunkAddr.SetContainer(cnr)
var chunkID oid.ID
err := chunkID.ReadFromV2(chunk.ID)
if err != nil {
return err
}
chunkAddr.SetObject(chunkID)
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
if err != nil {
return err
}
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
if err != nil {
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
if tomb != nil {
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
if err != nil {
return err
}
}
err = targetBucket.Put(chunkKey, value)
if err != nil {
return err
}
}
return nil
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
return err
}
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
return err
}
return db.updateContainerCounter(tx, res.inhumedByCnrID, false)
}
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
//
// target bucket of the operation, one of the:
// 1. Graveyard if Inhume was called with a Tombstone
// 2. Garbage if Inhume was called with a GC mark
//
// value that will be put in the bucket, one of the:
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil {
targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
// it is forbidden to have a tomb-on-tomb in FrostFS,
// so graveyard keys must not be addresses of tombstones
data := targetBucket.Get(tombKey)
if data != nil {
err := targetBucket.Delete(tombKey)
if err != nil {
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
}
}
value = tombKey
} else {
targetBucket = garbageBKT
value = zeroValue
}
return targetBucket, value, nil
}
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) {
targetIsTomb, err := isTomb(graveyardBKT, key)
if err != nil {
return false, err
}
// do not add grave if target is a tombstone
if targetIsTomb {
return true, nil
}
// if tombstone appears object must be
// additionally marked with GC
return false, garbageBKT.Put(key, zeroValue)
}
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
}
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == objectSDK.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
if err != nil {
return err
}
}
return nil
}
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) {
targetIsTomb := false
// iterate over graveyard and check if target address
// is the address of tombstone in graveyard.
err := graveyardBucket.ForEach(func(_, v []byte) error {
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, key)
if targetIsTomb {
// break bucket iterator
return errBreakBucketForEach
}
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return false, err
}
return targetIsTomb, nil
}