package meta

import (
	"bytes"
	"context"
	"fmt"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

var bucketNameLocked = []byte{lockedPrefix}

type keyValue struct {
	Key   []byte
	Value []byte
}

// returns name of the bucket with objects of type LOCK for specified container.
func bucketNameLockers(idCnr cid.ID, key []byte) []byte {
	return bucketName(idCnr, lockersPrefix, key)
}

// Lock marks objects as locked with another object. All objects are from the
// specified container.
//
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
//
// Locked list should be unique. Panics if it is empty.
func (db *DB) Lock(ctx context.Context, cnr cid.ID, locker oid.ID, locked []oid.ID) error {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("Lock", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "metabase.Lock",
		trace.WithAttributes(
			attribute.String("container_id", cnr.EncodeToString()),
			attribute.String("locker", locker.EncodeToString()),
			attribute.Int("locked_count", len(locked)),
		))
	defer span.End()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return ErrDegradedMode
	} else if db.mode.ReadOnly() {
		return ErrReadOnlyMode
	}

	if len(locked) == 0 {
		panic("empty locked list")
	}

	err := db.lockInternal(locked, cnr, locker)
	success = err == nil
	return err
}

func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
	bucketKeysLocked := make([][]byte, len(locked))
	for i := range locked {
		bucketKeysLocked[i] = objectKey(locked[i], make([]byte, objectKeySize))
	}
	key := make([]byte, cidSize)

	return metaerr.Wrap(db.boltDB.Update(func(tx *bbolt.Tx) error {
		if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
			return logicerr.Wrap(new(apistatus.LockNonRegularObject))
		}

		bucketLocked := tx.Bucket(bucketNameLocked)

		cnr.Encode(key)
		bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
		if err != nil {
			return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err)
		}

		keyLocker := objectKey(locker, key)
		var exLockers [][]byte
		var updLockers []byte

	loop:
		for i := range bucketKeysLocked {
			exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i]))
			if err != nil {
				return fmt.Errorf("decode list of object lockers: %w", err)
			}

			for i := range exLockers {
				if bytes.Equal(exLockers[i], keyLocker) {
					continue loop
				}
			}

			updLockers, err = encodeList(append(exLockers, keyLocker))
			if err != nil {
				return fmt.Errorf("encode list of object lockers: %w", err)
			}

			err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers)
			if err != nil {
				return fmt.Errorf("update list of object lockers: %w", err)
			}
		}

		return nil
	}))
}

// FreeLockedBy unlocks all objects in DB which are locked by lockers.
// Returns slice of unlocked object ID's or an error.
func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("FreeLockedBy", time.Since(startedAt), success)
	}()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return nil, ErrDegradedMode
	}

	var unlockedObjects []oid.Address

	if err := db.boltDB.Update(func(tx *bbolt.Tx) error {
		for i := range lockers {
			unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
			if err != nil {
				return err
			}
			unlockedObjects = append(unlockedObjects, unlocked...)
		}

		return nil
	}); err != nil {
		return nil, metaerr.Wrap(err)
	}
	success = true
	return unlockedObjects, nil
}

// checks if specified object is locked in the specified container.
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
	bucketLocked := tx.Bucket(bucketNameLocked)
	if bucketLocked != nil {
		key := make([]byte, cidSize)
		idCnr.Encode(key)
		bucketLockedContainer := bucketLocked.Bucket(key)
		if bucketLockedContainer != nil {
			return bucketLockedContainer.Get(objectKey(idObj, key)) != nil
		}
	}

	return false
}

// releases all records about the objects locked by the locker.
// Returns slice of unlocked object ID's or an error.
//
// Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution.
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
	var unlockedObjects []oid.Address
	bucketLocked := tx.Bucket(bucketNameLocked)
	if bucketLocked == nil {
		return unlockedObjects, nil
	}

	key := make([]byte, cidSize)
	idCnr.Encode(key)

	bucketLockedContainer := bucketLocked.Bucket(key)
	if bucketLockedContainer == nil {
		return unlockedObjects, nil
	}

	keyLocker := objectKey(locker, key)
	updates := make([]keyValue, 0)
	err := bucketLockedContainer.ForEach(func(k, v []byte) error {
		keyLockers, err := decodeList(v)
		if err != nil {
			return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
		}

		for i := range keyLockers {
			if bytes.Equal(keyLockers[i], keyLocker) {
				if len(keyLockers) == 1 {
					updates = append(updates, keyValue{
						Key:   k,
						Value: nil,
					})

					var id oid.ID
					err = id.Decode(k)
					if err != nil {
						return fmt.Errorf("decode unlocked object id error: %w", err)
					}

					var addr oid.Address
					addr.SetContainer(idCnr)
					addr.SetObject(id)

					unlockedObjects = append(unlockedObjects, addr)
				} else {
					// exclude locker
					keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)

					v, err = encodeList(keyLockers)
					if err != nil {
						return fmt.Errorf("encode updated list of lockers: %w", err)
					}

					updates = append(updates, keyValue{
						Key:   k,
						Value: v,
					})
				}

				return nil
			}
		}

		return nil
	})
	if err != nil {
		return nil, err
	}

	if err = applyBucketUpdates(bucketLockedContainer, updates); err != nil {
		return nil, err
	}

	return unlockedObjects, nil
}

func applyBucketUpdates(bucket *bbolt.Bucket, updates []keyValue) error {
	for _, update := range updates {
		if update.Value == nil {
			err := bucket.Delete(update.Key)
			if err != nil {
				return fmt.Errorf("delete locked object record from locked bucket: %w", err)
			}
		} else {
			err := bucket.Put(update.Key, update.Value)
			if err != nil {
				return fmt.Errorf("update list of lockers: %w", err)
			}
		}
	}
	return nil
}

// IsLockedPrm groups the parameters of IsLocked operation.
type IsLockedPrm struct {
	addr oid.Address
}

// SetAddress sets object address that will be checked for lock relations.
func (i *IsLockedPrm) SetAddress(addr oid.Address) {
	i.addr = addr
}

// IsLockedRes groups the resulting values of IsLocked operation.
type IsLockedRes struct {
	locked bool
}

// Locked describes the requested object status according to the metabase
// current state.
func (i IsLockedRes) Locked() bool {
	return i.locked
}

// IsLocked checks is the provided object is locked by any `LOCK`. Not found
// object is considered as non-locked.
//
// Returns only non-logical errors related to underlying database.
func (db *DB) IsLocked(ctx context.Context, prm IsLockedPrm) (res IsLockedRes, err error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("IsLocked", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "metabase.IsLocked",
		trace.WithAttributes(
			attribute.String("address", prm.addr.EncodeToString()),
		))
	defer span.End()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return res, ErrDegradedMode
	}
	err = metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
		res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object())
		return nil
	}))
	success = err == nil
	return res, err
}