package meta

import (
	"context"
	"errors"
	"fmt"
	"strconv"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"

	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

// ExpiredObject is a descriptor of expired object from DB.
type ExpiredObject struct {
	typ objectSDK.Type

	addr oid.Address
}

// Type returns type of the expired object.
func (e *ExpiredObject) Type() objectSDK.Type {
	return e.typ
}

// Address returns address of the expired object.
func (e *ExpiredObject) Address() oid.Address {
	return e.addr
}

// ExpiredObjectHandler is an ExpiredObject handling function.
type ExpiredObjectHandler func(*ExpiredObject) error

// ErrInterruptIterator is returned by iteration handlers
// as a "break" keyword.
var ErrInterruptIterator = logicerr.New("iterator is interrupted")

// IterateExpired iterates over all objects in DB which are out of date
// relative to epoch. Locked objects are not included (do not confuse
// with objects of type LOCK).
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectHandler) error {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("IterateExpired", time.Since(startedAt), success)
	}()
	_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateExpired",
		trace.WithAttributes(
			attribute.String("epoch", strconv.FormatUint(epoch, 10)),
		))
	defer span.End()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return ErrDegradedMode
	}

	err := metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
		return db.iterateExpired(tx, epoch, h)
	}))
	success = err == nil
	return err
}

func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
	b := tx.Bucket(expEpochToObjectBucketName)
	c := b.Cursor()
	for k, _ := c.First(); k != nil; k, _ = c.Next() {
		expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
		if err != nil {
			return err
		}
		// bucket keys ordered by epoch, no need to continue lookup
		if expiresAfter >= epoch {
			return nil
		}
		if objectLocked(tx, cnr, obj) {
			continue
		}
		var addr oid.Address
		addr.SetContainer(cnr)
		addr.SetObject(obj)
		objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
		err = h(&ExpiredObject{
			typ:  firstIrregularObjectType(tx, cnr, objKey),
			addr: addr,
		})
		if err == nil {
			continue
		}
		if errors.Is(err, ErrInterruptIterator) {
			return nil
		}
		return err
	}
	return nil
}

// IterateCoveredByTombstones iterates over all objects in DB which are covered
// by tombstone with string address from tss. Locked objects are not included
// (do not confuse with objects of type LOCK).
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
//
// Does not modify tss.
func (db *DB) IterateCoveredByTombstones(ctx context.Context, tss map[string]oid.Address, h func(oid.Address) error) error {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("IterateCoveredByTombstones", time.Since(startedAt), success)
	}()
	_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateCoveredByTombstones")
	defer span.End()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return ErrDegradedMode
	}

	return db.boltDB.View(func(tx *bbolt.Tx) error {
		return db.iterateCoveredByTombstones(tx, tss, h)
	})
}

func (db *DB) iterateCoveredByTombstones(tx *bbolt.Tx, tss map[string]oid.Address, h func(oid.Address) error) error {
	bktGraveyard := tx.Bucket(graveyardBucketName)

	err := bktGraveyard.ForEach(func(k, v []byte) error {
		var addr oid.Address
		if err := decodeAddressFromKey(&addr, v); err != nil {
			return err
		}
		if _, ok := tss[addr.EncodeToString()]; ok {
			var addr oid.Address

			err := decodeAddressFromKey(&addr, k)
			if err != nil {
				return fmt.Errorf("could not parse address of the object under tombstone: %w", err)
			}

			if objectLocked(tx, addr.Container(), addr.Object()) {
				return nil
			}

			return h(addr)
		}

		return nil
	})

	if errors.Is(err, ErrInterruptIterator) {
		err = nil
	}

	return err
}

func iteratePhyObjects(tx *bbolt.Tx, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
	var cid cid.ID
	var oid oid.ID
	obj := objectSDK.New()

	return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
		b58CID, postfix := parseContainerIDWithPrefix(&cid, name)
		if len(b58CID) == 0 {
			return nil
		}

		switch postfix {
		case primaryPrefix,
			lockersPrefix,
			tombstonePrefix:
		default:
			return nil
		}

		return b.ForEach(func(k, v []byte) error {
			if oid.Decode(k) == nil && obj.Unmarshal(v) == nil {
				return f(cid, oid, obj)
			}

			return nil
		})
	})
}