package meta

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"strings"
	"time"

	v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
)

type (
	// filterGroup is a structure that have search filters grouped by access
	// method. We have fast filters that looks for indexes and do not unmarshal
	// objects, and slow filters, that applied after fast filters created
	// smaller set of objects to check.
	filterGroup struct {
		withCnrFilter bool

		cnr cid.ID

		fastFilters, slowFilters objectSDK.SearchFilters
	}
)

// SelectPrm groups the parameters of Select operation.
type SelectPrm struct {
	cnr     cid.ID
	filters objectSDK.SearchFilters
}

// SelectRes groups the resulting values of Select operation.
type SelectRes struct {
	addrList []oid.Address
}

// SetContainerID is a Select option to set the container id to search in.
func (p *SelectPrm) SetContainerID(cnr cid.ID) {
	p.cnr = cnr
}

// SetFilters is a Select option to set the object filters.
func (p *SelectPrm) SetFilters(fs objectSDK.SearchFilters) {
	p.filters = fs
}

// AddressList returns list of addresses of the selected objects.
func (r SelectRes) AddressList() []oid.Address {
	return r.addrList
}

// Select returns list of addresses of objects that match search filters.
func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		db.metrics.AddMethodDuration("Select", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "metabase.Select",
		trace.WithAttributes(
			attribute.String("container_id", prm.cnr.EncodeToString()),
		))
	defer span.End()

	db.modeMtx.RLock()
	defer db.modeMtx.RUnlock()

	if db.mode.NoMetabase() {
		return res, ErrDegradedMode
	}

	if checkNonEmpty(prm.filters) {
		success = true
		return res, nil
	}

	currEpoch := db.epochState.CurrentEpoch()

	return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
		res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch)
		success = err == nil
		return err
	}))
}

func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64) ([]oid.Address, error) {
	group, err := groupFilters(fs)
	if err != nil {
		return nil, err
	}

	// if there are conflicts in query and container then it means that there is no
	// objects to match this query.
	if group.withCnrFilter && !cnr.Equals(group.cnr) {
		return nil, nil
	}

	// keep matched addresses in this cache
	// value equal to number (index+1) of latest matched filter
	mAddr := make(map[string]int)

	expLen := len(group.fastFilters) // expected value of matched filters in mAddr

	if len(group.fastFilters) == 0 {
		expLen = 1

		db.selectAll(tx, cnr, mAddr)
	} else {
		for i := range group.fastFilters {
			db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i)
		}
	}

	res := make([]oid.Address, 0, len(mAddr))

	for a, ind := range mAddr {
		if ind != expLen {
			continue // ignore objects with unmatched fast filters
		}

		var id oid.ID
		err = id.Decode([]byte(a))
		if err != nil {
			return nil, err
		}

		var addr oid.Address
		addr.SetContainer(cnr)
		addr.SetObject(id)

		if objectStatus(tx, addr, currEpoch) > 0 {
			continue // ignore removed objects
		}

		if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) {
			continue // ignore objects with unmatched slow filters
		}

		res = append(res, addr)
	}

	return res, nil
}

// selectAll adds to resulting cache all available objects in metabase.
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
	bucketName := make([]byte, bucketKeySize)
	selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0)
	selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0)
	selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0)
	selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0)
}

// selectAllFromBucket goes through all keys in bucket and adds them in a
// resulting cache. Keys should be stringed object ids.
func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
	bkt := tx.Bucket(name)
	if bkt == nil {
		return
	}

	_ = bkt.ForEach(func(k, _ []byte) error {
		markAddressInCache(to, fNum, string(k))

		return nil
	})
}

// selectFastFilter makes fast optimized checks for well known buckets or
// looking through user attribute buckets otherwise.
func (db *DB) selectFastFilter(
	tx *bbolt.Tx,
	cnr cid.ID, // container we search on
	f objectSDK.SearchFilter, // fast filter
	to map[string]int, // resulting cache
	fNum int, // index of filter
) {
	currEpoch := db.epochState.CurrentEpoch()
	bucketName := make([]byte, bucketKeySize)
	switch f.Header() {
	case v2object.FilterHeaderObjectID:
		db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
	case v2object.FilterHeaderOwnerID:
		bucketName := ownerBucketName(cnr, bucketName)
		db.selectFromFKBT(tx, bucketName, f, to, fNum)
	case v2object.FilterHeaderPayloadHash:
		bucketName := payloadHashBucketName(cnr, bucketName)
		db.selectFromList(tx, bucketName, f, to, fNum)
	case v2object.FilterHeaderObjectType:
		for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
			selectAllFromBucket(tx, bucketName, to, fNum)
		}
	case v2object.FilterHeaderParent:
		bucketName := parentBucketName(cnr, bucketName)
		db.selectFromList(tx, bucketName, f, to, fNum)
	case v2object.FilterHeaderSplitID:
		bucketName := splitBucketName(cnr, bucketName)
		db.selectFromList(tx, bucketName, f, to, fNum)
	case v2object.FilterPropertyRoot:
		selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
	case v2object.FilterPropertyPhy:
		selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
		selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
		selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
	default: // user attribute
		bucketName := attributeBucketName(cnr, f.Header(), bucketName)

		if f.Operation() == objectSDK.MatchNotPresent {
			selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
		} else {
			db.selectFromFKBT(tx, bucketName, f, to, fNum)
		}
	}
}

var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
	v2object.TypeRegular.String():   {primaryBucketName, parentBucketName},
	v2object.TypeTombstone.String(): {tombstoneBucketName},
	v2object.TypeLock.String():      {bucketNameLockers},
}

func allBucketNames(cnr cid.ID) (names [][]byte) {
	for _, fns := range mBucketNaming {
		for _, fn := range fns {
			names = append(names, fn(cnr, make([]byte, bucketKeySize)))
		}
	}

	return
}

func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
	appendNames := func(key string) {
		fns, ok := mBucketNaming[key]
		if ok {
			for _, fn := range fns {
				names = append(names, fn(cnr, make([]byte, bucketKeySize)))
			}
		}
	}

	switch mType {
	default:
	case objectSDK.MatchStringNotEqual:
		for key := range mBucketNaming {
			if key != typeVal {
				appendNames(key)
			}
		}
	case objectSDK.MatchStringEqual:
		appendNames(typeVal)
	case objectSDK.MatchCommonPrefix:
		for key := range mBucketNaming {
			if strings.HasPrefix(key, typeVal) {
				appendNames(key)
			}
		}
	}

	return
}

// selectFromList looks into <fkbt> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromFKBT(
	tx *bbolt.Tx,
	name []byte, // fkbt root bucket name
	f objectSDK.SearchFilter, // filter for operation and value
	to map[string]int, // resulting cache
	fNum int, // index of filter
) { //
	matchFunc, ok := db.matchers[f.Operation()]
	if !ok {
		db.log.Debug(logs.MetabaseMissingMatcher, zap.Uint32("operation", uint32(f.Operation())))

		return
	}

	fkbtRoot := tx.Bucket(name)
	if fkbtRoot == nil {
		return
	}

	err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
		fkbtLeaf := fkbtRoot.Bucket(k)
		if fkbtLeaf == nil {
			return nil
		}

		return fkbtLeaf.ForEach(func(k, _ []byte) error {
			markAddressInCache(to, fNum, string(k))

			return nil
		})
	})
	if err != nil {
		db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error()))
	}
}

// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
// resulting cache.
func selectOutsideFKBT(
	tx *bbolt.Tx,
	incl [][]byte, // buckets
	name []byte, // fkbt root bucket name
	to map[string]int, // resulting cache
	fNum int, // index of filter
) {
	mExcl := make(map[string]struct{})

	bktExcl := tx.Bucket(name)
	if bktExcl != nil {
		_ = bktExcl.ForEachBucket(func(k []byte) error {
			exclBktLeaf := bktExcl.Bucket(k)
			return exclBktLeaf.ForEach(func(k, _ []byte) error {
				mExcl[string(k)] = struct{}{}

				return nil
			})
		})
	}

	for i := range incl {
		bktIncl := tx.Bucket(incl[i])
		if bktIncl == nil {
			continue
		}

		_ = bktIncl.ForEach(func(k, _ []byte) error {
			if _, ok := mExcl[string(k)]; !ok {
				markAddressInCache(to, fNum, string(k))
			}

			return nil
		})
	}
}

// selectFromList looks into <list> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromList(
	tx *bbolt.Tx,
	name []byte, // list root bucket name
	f objectSDK.SearchFilter, // filter for operation and value
	to map[string]int, // resulting cache
	fNum int, // index of filter
) { //
	bkt := tx.Bucket(name)
	if bkt == nil {
		return
	}

	var (
		lst [][]byte
		err error
	)

	switch op := f.Operation(); op {
	case objectSDK.MatchStringEqual:
		lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
		if err != nil {
			db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf, zap.String("error", err.Error()))
			return
		}
	default:
		fMatch, ok := db.matchers[op]
		if !ok {
			db.log.Debug(logs.MetabaseUnknownOperation, zap.Uint32("operation", uint32(op)))

			return
		}

		if err = fMatch.matchBucket(bkt, f.Header(), f.Value(), func(_, val []byte) error {
			l, err := decodeList(val)
			if err != nil {
				db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf,
					zap.String("error", err.Error()),
				)

				return err
			}

			lst = append(lst, l...)

			return nil
		}); err != nil {
			db.log.Debug(logs.MetabaseCantIterateOverTheBucket,
				zap.String("error", err.Error()),
			)

			return
		}
	}

	for i := range lst {
		markAddressInCache(to, fNum, string(lst[i]))
	}
}

// selectObjectID processes objectID filter with in-place optimizations.
func (db *DB) selectObjectID(
	tx *bbolt.Tx,
	f objectSDK.SearchFilter,
	cnr cid.ID,
	to map[string]int, // resulting cache
	fNum int, // index of filter
	currEpoch uint64,
) {
	appendOID := func(id oid.ID) {
		var addr oid.Address
		addr.SetContainer(cnr)
		addr.SetObject(id)

		var splitInfoError *objectSDK.SplitInfoError
		ok, err := db.exists(tx, addr, currEpoch)
		if (err == nil && ok) || errors.As(err, &splitInfoError) {
			raw := make([]byte, objectKeySize)
			id.Encode(raw)
			markAddressInCache(to, fNum, string(raw))
		}
	}

	switch op := f.Operation(); op {
	case objectSDK.MatchStringEqual:
		var id oid.ID
		if err := id.DecodeString(f.Value()); err == nil {
			appendOID(id)
		}
	default:
		fMatch, ok := db.matchers[op]
		if !ok {
			db.log.Debug(logs.MetabaseUnknownOperation,
				zap.Uint32("operation", uint32(f.Operation())),
			)

			return
		}

		for _, bucketName := range bucketNamesForType(cnr, objectSDK.MatchStringNotEqual, "") {
			// copy-paste from DB.selectAllFrom
			bkt := tx.Bucket(bucketName)
			if bkt == nil {
				return
			}

			err := fMatch.matchBucket(bkt, f.Header(), f.Value(), func(k, _ []byte) error {
				var id oid.ID
				if err := id.Decode(k); err == nil {
					appendOID(id)
				}
				return nil
			})
			if err != nil {
				db.log.Debug(logs.MetabaseCouldNotIterateOverTheBuckets,
					zap.String("error", err.Error()),
				)
			}
		}
	}
}

// matchSlowFilters return true if object header is matched by all slow filters.
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool {
	if len(f) == 0 {
		return true
	}

	buf := make([]byte, addressKeySize)
	obj, err := db.get(tx, addr, buf, true, false, currEpoch)
	if err != nil {
		return false
	}

	for i := range f {
		matchFunc, ok := db.matchers[f[i].Operation()]
		if !ok {
			return false
		}

		var data []byte

		switch f[i].Header() {
		case v2object.FilterHeaderVersion:
			data = []byte(obj.Version().String())
		case v2object.FilterHeaderHomomorphicHash:
			cs, _ := obj.PayloadHomomorphicHash()
			data = cs.Value()
		case v2object.FilterHeaderCreationEpoch:
			data = make([]byte, 8)
			binary.LittleEndian.PutUint64(data, obj.CreationEpoch())
		case v2object.FilterHeaderPayloadLength:
			data = make([]byte, 8)
			binary.LittleEndian.PutUint64(data, obj.PayloadSize())
		default:
			continue // ignore unknown search attributes
		}

		if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
			return false
		}
	}

	return true
}

// groupFilters divides filters in two groups: fast and slow. Fast filters
// processed by indexes and slow filters processed after by unmarshaling
// object headers.
func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
	res := filterGroup{
		fastFilters: make(objectSDK.SearchFilters, 0, len(filters)),
		slowFilters: make(objectSDK.SearchFilters, 0, len(filters)),
	}

	for i := range filters {
		switch filters[i].Header() {
		case v2object.FilterHeaderContainerID: // support deprecated field
			err := res.cnr.DecodeString(filters[i].Value())
			if err != nil {
				return filterGroup{}, fmt.Errorf("can't parse container id: %w", err)
			}

			res.withCnrFilter = true
		case // slow filters
			v2object.FilterHeaderVersion,
			v2object.FilterHeaderCreationEpoch,
			v2object.FilterHeaderPayloadLength,
			v2object.FilterHeaderHomomorphicHash:
			res.slowFilters = append(res.slowFilters, filters[i])
		default: // fast filters or user attributes if unknown
			res.fastFilters = append(res.fastFilters, filters[i])
		}
	}

	return res, nil
}

func markAddressInCache(cache map[string]int, fNum int, addr string) {
	if num := cache[addr]; num == fNum {
		cache[addr] = num + 1
	}
}

// Returns true if at least 1 object can satisfy fs.
func checkNonEmpty(fs objectSDK.SearchFilters) bool {
	for i := range fs {
		if fs[i].Operation() == objectSDK.MatchNotPresent && isSystemKey(fs[i].Header()) {
			return true
		}
	}

	return false
}

// returns true if string key is a reserved system filter key.
func isSystemKey(key string) bool {
	return strings.HasPrefix(key, v2object.ReservedFilterPrefix)
}