package meta

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

var errFailedToRemoveUniqueIndexes = errors.New("can't remove unique indexes")

// DeletePrm groups the parameters of Delete operation.
type DeletePrm struct {
	addrs []oid.Address
}

// DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct {
	phyCount       uint64
	logicCount     uint64
	userCount      uint64
	phySize        uint64
	logicSize      uint64
	removedByCnrID map[cid.ID]ObjectCounters
}

// LogicCount returns the number of removed logic
// objects.
func (d DeleteRes) LogicCount() uint64 {
	return d.logicCount
}

func (d DeleteRes) UserCount() uint64 {
	return d.userCount
}

// RemovedByCnrID returns the number of removed objects by container ID.
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
	return d.removedByCnrID
}

// PhyCount returns the number of removed physical objects.
func (d DeleteRes) PhyCount() uint64 {
	return d.phyCount
}

// PhySize returns the size of removed physical objects.
func (d DeleteRes) PhySize() uint64 {
	return d.phySize
}

// LogicSize returns the size of removed logical objects.
func (d DeleteRes) LogicSize() uint64 {
	return d.logicSize
}

// SetAddresses is a Delete option to set the addresses of the objects to delete.
//
// Option is required.
func (p *DeletePrm) SetAddresses(addrs ...oid.Address) {
	p.addrs = addrs
}

type referenceNumber struct {
	all, cur int

	obj *objectSDK.Object
}

type referenceCounter map[string]*referenceNumber

// Delete removed object records from metabase indexes.
func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
	var (
		startedAt = time.Now()
		deleted   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("Delete", time.Since(startedAt), deleted)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "metabase.Delete",
		trace.WithAttributes(
			attribute.Int("addr_count", len(prm.addrs)),
		))
	defer span.End()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return DeleteRes{}, ErrDegradedMode
	} else if db.mode.ReadOnly() {
		return DeleteRes{}, ErrReadOnlyMode
	}

	var err error
	var res DeleteRes

	err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
		res, err = db.deleteGroup(tx, prm.addrs)
		return err
	})
	if err == nil {
		deleted = true
		for i := range prm.addrs {
			storagelog.Write(ctx, db.log,
				storagelog.AddressField(prm.addrs[i]),
				storagelog.OpField("metabase DELETE"))
		}
	}
	return res, metaerr.Wrap(err)
}

// deleteGroup deletes object from the metabase. Handles removal of the
// references of the split objects.
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
	res := DeleteRes{
		removedByCnrID: make(map[cid.ID]ObjectCounters),
	}
	refCounter := make(referenceCounter, len(addrs))
	currEpoch := db.epochState.CurrentEpoch()

	for i := range addrs {
		r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
		if err != nil {
			return DeleteRes{}, err
		}

		applyDeleteSingleResult(r, &res, addrs, i)
	}

	if err := db.updateCountersDelete(tx, res); err != nil {
		return DeleteRes{}, err
	}

	for _, refNum := range refCounter {
		if refNum.cur == refNum.all {
			err := db.deleteObject(tx, refNum.obj, true)
			if err != nil {
				return DeleteRes{}, err
			}
		}
	}

	return res, nil
}

func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
	if res.phyCount > 0 {
		err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
		if err != nil {
			return fmt.Errorf("could not decrease phy object counter: %w", err)
		}
	}

	if res.logicCount > 0 {
		err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
		if err != nil {
			return fmt.Errorf("could not decrease logical object counter: %w", err)
		}
	}

	if res.userCount > 0 {
		err := db.updateShardObjectCounter(tx, user, res.userCount, false)
		if err != nil {
			return fmt.Errorf("could not decrease user object counter: %w", err)
		}
	}

	if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
		return fmt.Errorf("could not decrease container object counter: %w", err)
	}
	return nil
}

func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
	if r.Phy {
		if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
			v.Phy++
			res.removedByCnrID[addrs[i].Container()] = v
		} else {
			res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
				Phy: 1,
			}
		}

		res.phyCount++
		res.phySize += r.Size
	}

	if r.Logic {
		if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
			v.Logic++
			res.removedByCnrID[addrs[i].Container()] = v
		} else {
			res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
				Logic: 1,
			}
		}

		res.logicCount++
		res.logicSize += r.Size
	}

	if r.User {
		if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
			v.User++
			res.removedByCnrID[addrs[i].Container()] = v
		} else {
			res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
				User: 1,
			}
		}

		res.userCount++
	}
}

type deleteSingleResult struct {
	Phy   bool
	Logic bool
	User  bool
	Size  uint64
}

// delete removes object indexes from the metabase. Counts the references
// of the object that is being removed.
// The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter). The third return value The fourth return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
	key := make([]byte, addressKeySize)
	addrKey := addressKey(addr, key)
	garbageBKT := tx.Bucket(garbageBucketName)
	graveyardBKT := tx.Bucket(graveyardBucketName)

	removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0

	// unmarshal object, work only with physically stored (raw == true) objects
	obj, err := db.get(tx, addr, key, false, true, currEpoch)
	if err != nil {
		if client.IsErrObjectNotFound(err) {
			addrKey = addressKey(addr, key)
			if garbageBKT != nil {
				err := garbageBKT.Delete(addrKey)
				if err != nil {
					return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
				}
			}
			return deleteSingleResult{}, nil
		}
		var siErr *objectSDK.SplitInfoError
		var ecErr *objectSDK.ECInfoError
		if errors.As(err, &siErr) || errors.As(err, &ecErr) {
			// if object is virtual (parent) then do nothing, it will be deleted with last child
			// if object is erasure-coded it will be deleted with the last chunk presented on the shard
			return deleteSingleResult{}, nil
		}

		return deleteSingleResult{}, err
	}

	addrKey = addressKey(addr, key)
	// remove record from the garbage bucket
	if garbageBKT != nil {
		err := garbageBKT.Delete(addrKey)
		if err != nil {
			return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
		}
	}

	// if object is an only link to a parent, then remove parent
	if parent := obj.Parent(); parent != nil {
		parAddr := object.AddressOf(parent)
		sParAddr := addressKey(parAddr, key)
		k := string(sParAddr)

		nRef, ok := refCounter[k]
		if !ok {
			nRef = &referenceNumber{
				all: parentLength(tx, parAddr),
				obj: parent,
			}

			refCounter[k] = nRef
		}

		nRef.cur++
	}

	isUserObject := IsUserObject(obj)

	// remove object
	err = db.deleteObject(tx, obj, false)
	if err != nil {
		return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
	}

	if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
		return deleteSingleResult{}, err
	}

	return deleteSingleResult{
		Phy:   true,
		Logic: removeAvailableObject,
		User:  isUserObject && removeAvailableObject,
		Size:  obj.PayloadSize(),
	}, nil
}

func (db *DB) deleteObject(
	tx *bbolt.Tx,
	obj *objectSDK.Object,
	isParent bool,
) error {
	err := delUniqueIndexes(tx, obj, isParent)
	if err != nil {
		return errFailedToRemoveUniqueIndexes
	}

	err = updateListIndexes(tx, obj, delListIndexItem)
	if err != nil {
		return fmt.Errorf("can't remove list indexes: %w", err)
	}

	err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
	if err != nil {
		return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
	}

	if isParent {
		// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
		garbageBKT := tx.Bucket(garbageBucketName)
		if garbageBKT != nil {
			key := make([]byte, addressKeySize)
			addrKey := addressKey(object.AddressOf(obj), key)
			err := garbageBKT.Delete(addrKey)
			if err != nil {
				return fmt.Errorf("could not remove from garbage bucket: %w", err)
			}
		}
	}

	return nil
}

// parentLength returns amount of available children from parentid index.
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
	bucketName := make([]byte, bucketKeySize)

	bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
	if bkt == nil {
		return 0
	}

	lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:])))
	if err != nil {
		return 0
	}

	return len(lst)
}

func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
	bkt := tx.Bucket(item.name)
	if bkt != nil {
		_ = bkt.Delete(item.key) // ignore error, best effort there
	}
}

func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
	bkt := tx.Bucket(item.name)
	if bkt == nil {
		return nil
	}

	lst, err := decodeList(bkt.Get(item.key))
	if err != nil || len(lst) == 0 {
		return nil
	}

	// remove element from the list
	for i := range lst {
		if bytes.Equal(item.val, lst[i]) {
			copy(lst[i:], lst[i+1:])
			lst = lst[:len(lst)-1]
			break
		}
	}

	// if list empty, remove the key from <list> bucket
	if len(lst) == 0 {
		_ = bkt.Delete(item.key) // ignore error, best effort there

		return nil
	}

	// if list is not empty, then update it
	encodedLst, err := encodeList(lst)
	if err != nil {
		return nil // ignore error, best effort there
	}

	_ = bkt.Put(item.key, encodedLst) // ignore error, best effort there
	return nil
}

func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
	bkt := tx.Bucket(item.name)
	if bkt == nil {
		return nil
	}

	fkbtRoot := bkt.Bucket(item.key)
	if fkbtRoot == nil {
		return nil
	}

	if err := fkbtRoot.Delete(item.val); err != nil {
		return err
	}

	if hasAnyItem(fkbtRoot) {
		return nil
	}

	if err := bkt.DeleteBucket(item.key); err != nil {
		return err
	}

	if hasAnyItem(bkt) {
		return nil
	}

	return tx.DeleteBucket(item.name)
}

func hasAnyItem(b *bbolt.Bucket) bool {
	var hasAnyItem bool
	c := b.Cursor()
	for k, _ := c.First(); k != nil; {
		hasAnyItem = true
		break
	}
	return hasAnyItem
}

func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
	addr := object.AddressOf(obj)

	objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
	cnr := addr.Container()
	bucketName := make([]byte, bucketKeySize)

	// add value to primary unique bucket
	if !isParent {
		switch obj.Type() {
		case objectSDK.TypeRegular:
			bucketName = primaryBucketName(cnr, bucketName)
		case objectSDK.TypeTombstone:
			bucketName = tombstoneBucketName(cnr, bucketName)
		case objectSDK.TypeLock:
			bucketName = bucketNameLockers(cnr, bucketName)
		default:
			return ErrUnknownObjectType
		}

		delUniqueIndexItem(tx, namedBucketItem{
			name: bucketName,
			key:  objKey,
		})
	} else {
		delUniqueIndexItem(tx, namedBucketItem{
			name: parentBucketName(cnr, bucketName),
			key:  objKey,
		})
	}

	delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
		name: smallBucketName(cnr, bucketName),
		key:  objKey,
	})
	delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
		name: rootBucketName(cnr, bucketName),
		key:  objKey,
	})

	if expEpoch, ok := hasExpirationEpoch(obj); ok {
		delUniqueIndexItem(tx, namedBucketItem{
			name: expEpochToObjectBucketName,
			key:  expirationEpochKey(expEpoch, cnr, addr.Object()),
		})
		delUniqueIndexItem(tx, namedBucketItem{
			name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
			key:  objKey,
		})
	}

	return nil
}

func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
	ech := obj.ECHeader()
	if ech == nil {
		return nil
	}

	hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
	// drop EC parent GC mark if current EC chunk is the last one
	if !hasAnyChunks && garbageBKT != nil {
		var ecParentAddress oid.Address
		ecParentAddress.SetContainer(cnr)
		ecParentAddress.SetObject(ech.Parent())
		addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
		err := garbageBKT.Delete(addrKey)
		if err != nil {
			return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
		}
	}

	// also drop EC parent root info if current EC chunk is the last one
	if !hasAnyChunks {
		delUniqueIndexItem(tx, namedBucketItem{
			name: rootBucketName(cnr, make([]byte, bucketKeySize)),
			key:  objectKey(ech.Parent(), make([]byte, objectKeySize)),
		})
	}

	if ech.ParentSplitParentID() == nil {
		return nil
	}

	var splitParentAddress oid.Address
	splitParentAddress.SetContainer(cnr)
	splitParentAddress.SetObject(*ech.ParentSplitParentID())

	if ref, ok := refCounter[string(addressKey(splitParentAddress, make([]byte, addressKeySize)))]; ok {
		// linking object is already processing
		// so just inform that one more reference was deleted
		// split info and gc marks will be deleted after linking object delete
		ref.cur++
		return nil
	}

	if parentLength(tx, splitParentAddress) > 0 {
		// linking object still exists, so leave split info and gc mark deletion for linking object processing
		return nil
	}

	// drop split parent gc mark
	if garbageBKT != nil {
		addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
		err := garbageBKT.Delete(addrKey)
		if err != nil {
			return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
		}
	}

	// drop split info
	delUniqueIndexItem(tx, namedBucketItem{
		name: rootBucketName(cnr, make([]byte, bucketKeySize)),
		key:  objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
	})
	return nil
}

func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
	data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
		objectKey(ech.Parent(), make([]byte, objectKeySize)))
	return len(data) > 0
}