package meta import ( "bytes" "context" "errors" "fmt" "time" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" ) // InhumePrm encapsulates parameters for Inhume operation. type InhumePrm struct { tomb *oid.Address target []oid.Address lockObjectHandling bool forceRemoval bool } // DeletionInfo contains details on deleted object. type DeletionInfo struct { Size uint64 CID cid.ID IsUser bool } // InhumeRes encapsulates results of Inhume operation. type InhumeRes struct { deletedLockObj []oid.Address logicInhumed uint64 userInhumed uint64 inhumedByCnrID map[cid.ID]ObjectCounters deletionDetails []DeletionInfo } // LogicInhumed return number of logic object // that have been inhumed. func (i InhumeRes) LogicInhumed() uint64 { return i.logicInhumed } func (i InhumeRes) UserInhumed() uint64 { return i.userInhumed } // InhumedByCnrID return number of object // that have been inhumed by container ID. func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters { return i.inhumedByCnrID } // DeletedLockObjects returns deleted object of LOCK // type. Returns always nil if WithoutLockObjectHandling // was provided to the InhumePrm. func (i InhumeRes) DeletedLockObjects() []oid.Address { return i.deletedLockObj } // GetDeletionInfoLength returns amount of stored elements // in deleted sizes array. func (i InhumeRes) GetDeletionInfoLength() int { return len(i.deletionDetails) } // GetDeletionInfoByIndex returns both deleted object sizes and // associated container ID by index. func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo { return i.deletionDetails[target] } // StoreDeletionInfo stores size of deleted object and associated container ID // in corresponding arrays. func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) { i.deletionDetails = append(i.deletionDetails, DeletionInfo{ Size: deletedSize, CID: containerID, IsUser: isUser, }) i.logicInhumed++ if isUser { i.userInhumed++ } if v, ok := i.inhumedByCnrID[containerID]; ok { v.Logic++ if isUser { v.User++ } i.inhumedByCnrID[containerID] = v } else { v = ObjectCounters{ Logic: 1, } if isUser { v.User = 1 } i.inhumedByCnrID[containerID] = v } } // SetAddresses sets a list of object addresses that should be inhumed. func (p *InhumePrm) SetAddresses(addrs ...oid.Address) { p.target = addrs } // SetTombstoneAddress sets tombstone address as the reason for inhume operation. // // addr should not be nil. // Should not be called along with SetGCMark. func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) { p.tomb = &addr } // SetGCMark marks the object to be physically removed. // // Should not be called along with SetTombstoneAddress. func (p *InhumePrm) SetGCMark() { p.tomb = nil } // SetLockObjectHandling checks if there were // any LOCK object among the targets set via WithAddresses. func (p *InhumePrm) SetLockObjectHandling() { p.lockObjectHandling = true } // SetForceGCMark allows removal any object. Expected to be // called only in control service. func (p *InhumePrm) SetForceGCMark() { p.tomb = nil p.forceRemoval = true } var errBreakBucketForEach = errors.New("bucket ForEach break") // ErrLockObjectRemoval is returned when inhume operation is being // performed on lock object, and it is not a forced object removal. var ErrLockObjectRemoval = logicerr.New("lock object removal") // Inhume marks objects as removed but not removes it from metabase. // // Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked // if at least one object is locked. Returns ErrLockObjectRemoval if inhuming // is being performed on lock (not locked) object. // // NOTE: Marks any object with GC mark (despite any prohibitions on operations // with that object) if WithForceGCMark option has been provided. func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("Inhume", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.Inhume") defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return InhumeRes{}, ErrDegradedMode } else if db.mode.ReadOnly() { return InhumeRes{}, ErrReadOnlyMode } res := InhumeRes{ inhumedByCnrID: make(map[cid.ID]ObjectCounters), } currEpoch := db.epochState.CurrentEpoch() err := db.boltDB.Batch(func(tx *bbolt.Tx) error { return db.inhumeTx(tx, currEpoch, prm, &res) }) success = err == nil if success { for _, addr := range prm.target { storagelog.Write(db.log, storagelog.AddressField(addr), storagelog.OpField("metabase INHUME")) } } return res, metaerr.Wrap(err) } func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error { garbageBKT := tx.Bucket(garbageBucketName) graveyardBKT := tx.Bucket(graveyardBucketName) bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm) if err != nil { return err } buf := make([]byte, addressKeySize) for i := range prm.target { id := prm.target[i].Object() cnr := prm.target[i].Container() // prevent locked objects to be inhumed if !prm.forceRemoval && objectLocked(tx, cnr, id) { return new(apistatus.ObjectLocked) } var lockWasChecked bool // prevent lock objects to be inhumed // if `Inhume` was called not with the // `WithForceGCMark` option if !prm.forceRemoval { if isLockObject(tx, cnr, id) { return ErrLockObjectRemoval } lockWasChecked = true } obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) targetKey := addressKey(prm.target[i], buf) var ecErr *objectSDK.ECInfoError if err == nil { err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) if err != nil { return err } } else if errors.As(err, &ecErr) { err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value) if err != nil { return err } } if prm.tomb != nil { var isTomb bool isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey) if err != nil { return err } if isTomb { continue } } // consider checking if target is already in graveyard? err = bkt.Put(targetKey, value) if err != nil { return err } if prm.lockObjectHandling { // do not perform lock check if // it was already called if lockWasChecked { // inhumed object is not of // the LOCK type continue } if isLockObject(tx, cnr, id) { res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) } } } return db.applyInhumeResToCounters(tx, res) } func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket, ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, ) error { for _, chunk := range ecInfo.Chunks { chunkBuf := make([]byte, addressKeySize) var chunkAddr oid.Address chunkAddr.SetContainer(cnr) var chunkID oid.ID err := chunkID.ReadFromV2(chunk.ID) if err != nil { return err } chunkAddr.SetObject(chunkID) chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch) if err != nil { return err } chunkKey := addressKey(chunkAddr, chunkBuf) err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res) if err != nil { return err } if tomb != nil { _, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey) if err != nil { return err } } err = targetBucket.Put(chunkKey, value) if err != nil { return err } } return nil } func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { return err } if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { return err } return db.updateContainerCounter(tx, res.inhumedByCnrID, false) } // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. // // target bucket of the operation, one of the: // 1. Graveyard if Inhume was called with a Tombstone // 2. Garbage if Inhume was called with a GC mark // // value that will be put in the bucket, one of the: // 1. tombstone address if Inhume was called with // a Tombstone // 2. zeroValue if Inhume was called with a GC mark func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) { if prm.tomb != nil { targetBucket = graveyardBKT tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) // it is forbidden to have a tomb-on-tomb in FrostFS, // so graveyard keys must not be addresses of tombstones data := targetBucket.Get(tombKey) if data != nil { err := targetBucket.Delete(tombKey) if err != nil { return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err) } } value = tombKey } else { targetBucket = garbageBKT value = zeroValue } return targetBucket, value, nil } func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) { targetIsTomb, err := isTomb(graveyardBKT, key) if err != nil { return false, err } // do not add grave if target is a tombstone if targetIsTomb { return true, nil } // if tombstone appears object must be // additionally marked with GC return false, garbageBKT.Put(key, zeroValue) } func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { containerID, _ := obj.ContainerID() if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj)) } // if object is stored, and it is regular object then update bucket // with container size estimations if obj.Type() == objectSDK.TypeRegular { err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) if err != nil { return err } } return nil } func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) { targetIsTomb := false // iterate over graveyard and check if target address // is the address of tombstone in graveyard. err := graveyardBucket.ForEach(func(_, v []byte) error { // check if graveyard has record with key corresponding // to tombstone address (at least one) targetIsTomb = bytes.Equal(v, key) if targetIsTomb { // break bucket iterator return errBreakBucketForEach } return nil }) if err != nil && !errors.Is(err, errBreakBucketForEach) { return false, err } return targetIsTomb, nil }