package meta

import (

	storagelog ""
	apistatus ""
	cid ""
	objectSDK ""
	oid ""

// InhumePrm encapsulates parameters for Inhume operation.
type InhumePrm struct {
	tomb *oid.Address

	target []oid.Address

	lockObjectHandling bool

	forceRemoval bool

// DeletionInfo contains details on deleted object.
type DeletionInfo struct {
	Size   uint64
	CID    cid.ID
	IsUser bool

// InhumeRes encapsulates results of Inhume operation.
type InhumeRes struct {
	deletedLockObj  []oid.Address
	logicInhumed    uint64
	userInhumed     uint64
	inhumedByCnrID  map[cid.ID]ObjectCounters
	deletionDetails []DeletionInfo

// LogicInhumed return number of logic object
// that have been inhumed.
func (i InhumeRes) LogicInhumed() uint64 {
	return i.logicInhumed

func (i InhumeRes) UserInhumed() uint64 {
	return i.userInhumed

// InhumedByCnrID return number of object
// that have been inhumed by container ID.
func (i InhumeRes) InhumedByCnrID() map[cid.ID]ObjectCounters {
	return i.inhumedByCnrID

// DeletedLockObjects returns deleted object of LOCK
// type. Returns always nil if WithoutLockObjectHandling
// was provided to the InhumePrm.
func (i InhumeRes) DeletedLockObjects() []oid.Address {
	return i.deletedLockObj

// GetDeletionInfoLength returns amount of stored elements
// in deleted sizes array.
func (i InhumeRes) GetDeletionInfoLength() int {
	return len(i.deletionDetails)

// GetDeletionInfoByIndex returns both deleted object sizes and
// associated container ID by index.
func (i InhumeRes) GetDeletionInfoByIndex(target int) DeletionInfo {
	return i.deletionDetails[target]

// StoreDeletionInfo stores size of deleted object and associated container ID
// in corresponding arrays.
func (i *InhumeRes) storeDeletionInfo(containerID cid.ID, deletedSize uint64, isUser bool) {
	i.deletionDetails = append(i.deletionDetails, DeletionInfo{
		Size:   deletedSize,
		CID:    containerID,
		IsUser: isUser,
	if isUser {

	if v, ok := i.inhumedByCnrID[containerID]; ok {
		if isUser {
		i.inhumedByCnrID[containerID] = v
	} else {
		v = ObjectCounters{
			Logic: 1,
		if isUser {
			v.User = 1
		i.inhumedByCnrID[containerID] = v

// SetAddresses sets a list of object addresses that should be inhumed.
func (p *InhumePrm) SetAddresses(addrs ...oid.Address) { = addrs

// SetTombstoneAddress sets tombstone address as the reason for inhume operation.
// addr should not be nil.
// Should not be called along with SetGCMark.
func (p *InhumePrm) SetTombstoneAddress(addr oid.Address) {
	p.tomb = &addr

// SetGCMark marks the object to be physically removed.
// Should not be called along with SetTombstoneAddress.
func (p *InhumePrm) SetGCMark() {
	p.tomb = nil

// SetLockObjectHandling checks if there were
// any LOCK object among the targets set via WithAddresses.
func (p *InhumePrm) SetLockObjectHandling() {
	p.lockObjectHandling = true

// SetForceGCMark allows removal any object. Expected to be
// called only in control service.
func (p *InhumePrm) SetForceGCMark() {
	p.tomb = nil
	p.forceRemoval = true

func (p *InhumePrm) validate() error {
	if p == nil {
		return nil
	if p.tomb != nil {
		for _, addr := range {
			if addr.Container() != p.tomb.Container() {
				return fmt.Errorf("object %s and tombstone %s have different container ID", addr, p.tomb)
	return nil

var errBreakBucketForEach = errors.New("bucket ForEach break")

// ErrLockObjectRemoval is returned when inhume operation is being
// performed on lock object, and it is not a forced object removal.
var ErrLockObjectRemoval = logicerr.New("lock object removal")

// Inhume marks objects as removed but not removes it from metabase.
// Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked
// if at least one object is locked. Returns ErrLockObjectRemoval if inhuming
// is being performed on lock (not locked) object.
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
// with that object) if WithForceGCMark option has been provided.
func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
	var (
		startedAt = time.Now()
		success   = false
	defer func() {
		db.metrics.AddMethodDuration("Inhume", time.Since(startedAt), success)
	_, span := tracing.StartSpanFromContext(ctx, "metabase.Inhume")
	defer span.End()

	defer db.modeMtx.RUnlock()

	if err := prm.validate(); err != nil {
		return InhumeRes{}, err

	if db.mode.NoMetabase() {
		return InhumeRes{}, ErrDegradedMode
	} else if db.mode.ReadOnly() {
		return InhumeRes{}, ErrReadOnlyMode

	res := InhumeRes{
		inhumedByCnrID: make(map[cid.ID]ObjectCounters),
	currEpoch := db.epochState.CurrentEpoch()
	err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
		return db.inhumeTx(tx, currEpoch, prm, &res)
	success = err == nil
	if success {
		for _, addr := range {
			storagelog.Write(ctx, db.log,
				storagelog.OpField("metabase INHUME"))
	return res, metaerr.Wrap(err)

func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
	garbageBKT := tx.Bucket(garbageBucketName)
	graveyardBKT := tx.Bucket(graveyardBucketName)

	bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
	if err != nil {
		return err

	buf := make([]byte, addressKeySize)
	for i := range {
		if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT,[i], buf, epoch, prm, res); err != nil {
			return err

	return db.applyInhumeResToCounters(tx, res)

func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
	id := addr.Object()
	cnr := addr.Container()
	tx := bkt.Tx()

	// prevent locked objects to be inhumed
	if !prm.forceRemoval && objectLocked(tx, cnr, id) {
		return new(apistatus.ObjectLocked)

	var lockWasChecked bool

	// prevent lock objects to be inhumed
	// if `Inhume` was called not with the
	// `WithForceGCMark` option
	if !prm.forceRemoval {
		if isLockObject(tx, cnr, id) {
			return ErrLockObjectRemoval

		lockWasChecked = true

	obj, err := db.get(tx, addr, buf, false, true, epoch)
	targetKey := addressKey(addr, buf)
	var ecErr *objectSDK.ECInfoError
	if err == nil {
		err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
		if err != nil {
			return err
	} else if errors.As(err, &ecErr) {
		err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
		if err != nil {
			return err

	if prm.tomb != nil {
		var isTomb bool
		isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
		if err != nil {
			return err

		if isTomb {
			return nil

	// consider checking if target is already in graveyard?
	err = bkt.Put(targetKey, value)
	if err != nil {
		return err

	if prm.lockObjectHandling {
		// do not perform lock check if
		// it was already called
		if lockWasChecked {
			// inhumed object is not of
			// the LOCK type
			return nil

		if isLockObject(tx, cnr, id) {
			res.deletedLockObj = append(res.deletedLockObj, addr)
	return nil

func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
	garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
	ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
) error {
	for _, chunk := range ecInfo.Chunks {
		chunkBuf := make([]byte, addressKeySize)
		var chunkAddr oid.Address
		var chunkID oid.ID
		err := chunkID.ReadFromV2(chunk.ID)
		if err != nil {
			return err
		chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
		if err != nil {
			return err
		chunkKey := addressKey(chunkAddr, chunkBuf)
		err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
		if err != nil {
			return err
		if tomb != nil {
			_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
			if err != nil {
				return err
		err = targetBucket.Put(chunkKey, value)
		if err != nil {
			return err
	return nil

func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
	if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
		return err
	if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
		return err

	return db.updateContainerCounter(tx, res.inhumedByCnrID, false)

// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
// target bucket of the operation, one of the:
//  1. Graveyard if Inhume was called with a Tombstone
//  2. Garbage if Inhume was called with a GC mark
// value that will be put in the bucket, one of the:
//  1. tombstone address if Inhume was called with
//     a Tombstone
//  2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
	if prm.tomb != nil {
		targetBucket = graveyardBKT
		tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))

		// it is forbidden to have a tomb-on-tomb in FrostFS,
		// so graveyard keys must not be addresses of tombstones
		data := targetBucket.Get(tombKey)
		if data != nil {
			err := targetBucket.Delete(tombKey)
			if err != nil {
				return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)

		value = tombKey
	} else {
		targetBucket = garbageBKT
		value = zeroValue
	return targetBucket, value, nil

func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte) (bool, error) {
	targetIsTomb := isTomb(graveyardBKT, addressKey)

	// do not add grave if target is a tombstone
	if targetIsTomb {
		return true, nil

	// if tombstone appears object must be
	// additionally marked with GC
	return false, garbageBKT.Put(addressKey, zeroValue)

func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
	containerID, _ := obj.ContainerID()
	if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
		res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))

	// if object is stored, and it is regular object then update bucket
	// with container size estimations
	if obj.Type() == objectSDK.TypeRegular {
		err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
		if err != nil {
			return err
	return nil

func isTomb(graveyardBucket *bbolt.Bucket, addressKey []byte) bool {
	targetIsTomb := false

	// iterate over graveyard and check if target address
	// is the address of tombstone in graveyard.
	// tombstone must have the same container ID as key.
	c := graveyardBucket.Cursor()
	containerPrefix := addressKey[:cidSize]
	for k, v := c.Seek(containerPrefix); k != nil && bytes.HasPrefix(k, containerPrefix); k, v = c.Next() {
		// check if graveyard has record with key corresponding
		// to tombstone address (at least one)
		targetIsTomb = bytes.Equal(v, addressKey)
		if targetIsTomb {
	return targetIsTomb