package writecachebadger

import (
	"bytes"
	"context"
	"encoding/hex"
	"errors"
	"fmt"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	"github.com/dgraph-io/badger/v4"
	"github.com/dgraph-io/ristretto/z"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
)

const (
	// flushBatchSize is amount of keys which will be read from cache to be flushed
	// to the main storage. It is used to reduce contention between cache put
	// and cache persist.
	flushBatchSize = 512
	// defaultFlushWorkersCount is number of workers for putting objects in main storage.
	defaultFlushWorkersCount = 20
	// defaultFlushInterval is default time interval between successive flushes.
	defaultFlushInterval = time.Second
)

type collector struct {
	cache     *cache
	scheduled int
	processed int
	cancel    func()
}

func (c *collector) Send(buf *z.Buffer) error {
	list, err := badger.BufferToKVList(buf)
	if err != nil {
		return err
	}
	for _, kv := range list.Kv {
		select {
		case <-c.cache.closeCh:
			c.cancel()
			return nil
		default:
		}
		if kv.StreamDone {
			return nil
		}
		if c.scheduled >= flushBatchSize {
			c.cancel()
			return nil
		}
		if got, want := len(kv.Key), len(internalKey{}); got != want {
			c.cache.log.Debug(
				fmt.Sprintf("not expected db key len: got %d, want %d", got, want))
			continue
		}
		c.processed++
		obj := objectSDK.New()
		val := bytes.Clone(kv.Value)
		if err = obj.Unmarshal(val); err != nil {
			continue
		}
		addr := objectCore.AddressOf(obj)
		c.cache.scheduled4FlushMtx.RLock()
		_, ok := c.cache.scheduled4Flush[addr]
		c.cache.scheduled4FlushMtx.RUnlock()
		if ok {
			c.cache.log.Debug(logs.WritecacheBadgerObjAlreadyScheduled, zap.Stringer("obj", addr))
			continue
		}
		c.cache.scheduled4FlushMtx.Lock()
		c.cache.scheduled4Flush[addr] = struct{}{}
		c.cache.scheduled4FlushMtx.Unlock()
		c.scheduled++
		select {
		case c.cache.flushCh <- obj:
		case <-c.cache.closeCh:
			c.cancel()
			return nil
		}
	}
	return nil
}

// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
	for i := 0; i < c.workersCount; i++ {
		c.wg.Add(1)
		go c.workerFlushSmall()
	}

	c.wg.Add(1)
	go func() {
		defer c.wg.Done()

		tt := time.NewTimer(defaultFlushInterval)
		defer tt.Stop()

		for {
			select {
			case <-tt.C:
				c.flushSmallObjects()
				tt.Reset(defaultFlushInterval)
			case <-c.closeCh:
				return
			}
		}
	}()
}

func (c *cache) flushSmallObjects() {
	for {
		select {
		case <-c.closeCh:
			return
		default:
		}
		c.modeMtx.RLock()
		if c.readOnly() {
			c.modeMtx.RUnlock()
			time.Sleep(time.Second)
			continue
		}

		// Using the db after Close will panic and badger won't wait for outstanding txs,
		// so we need to check manually.
		if c.db.IsClosed() {
			c.modeMtx.RUnlock()
			return
		}
		ctx, cancel := context.WithCancel(context.TODO())
		coll := collector{
			cache:  c,
			cancel: cancel,
		}
		stream := c.db.NewStream()
		// All calls to Send are done by a single goroutine
		stream.Send = coll.Send
		if err := stream.Orchestrate(ctx); err != nil {
			c.log.Debug(fmt.Sprintf(
				"error during flushing object from wc: %s", err))
		}
		c.modeMtx.RUnlock()
		if coll.scheduled == 0 {
			break
		}
		c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
			zap.Int("scheduled", coll.scheduled), zap.Int("processed", coll.processed))
	}
}

func (c *cache) reportFlushError(msg string, addr string, err error) {
	if c.reportError != nil {
		c.reportError(msg, err)
	} else {
		c.log.Error(msg,
			zap.String("address", addr),
			zap.Error(err))
	}
}

// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
	defer c.wg.Done()

	var obj *objectSDK.Object
	for {
		// Give priority to direct put.
		select {
		case obj = <-c.flushCh:
		case <-c.closeCh:
			return
		}

		addr := objectCore.AddressOf(obj)
		err := c.flushObject(context.TODO(), obj, nil, writecache.StorageTypeDB)
		if err == nil {
			c.deleteFromDB([]internalKey{addr2key(addr)})
		}
		c.scheduled4FlushMtx.Lock()
		delete(c.scheduled4Flush, addr)
		c.scheduled4FlushMtx.Unlock()
	}
}

// flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st writecache.StorageType) error {
	var err error

	defer func() {
		c.metrics.Flush(err == nil, st)
	}()

	addr := objectCore.AddressOf(obj)

	var prm common.PutPrm
	prm.Object = obj
	prm.RawData = data

	res, err := c.blobstor.Put(ctx, prm)
	if err != nil {
		if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) &&
			!errors.Is(err, blobstor.ErrNoPlaceFound) {
			c.reportFlushError(logs.FrostFSNodeCantFlushObjectToBlobstor,
				addr.EncodeToString(), err)
		}
		return err
	}

	var updPrm meta.UpdateStorageIDPrm
	updPrm.SetAddress(addr)
	updPrm.SetStorageID(res.StorageID)

	_, err = c.metabase.UpdateStorageID(updPrm)
	if err != nil {
		c.reportFlushError(logs.FrostFSNodeCantUpdateObjectStorageID,
			addr.EncodeToString(), err)
	}
	return err
}

// Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers.
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
	ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
		trace.WithAttributes(
			attribute.Bool("ignore_errors", ignoreErrors),
		))
	defer span.End()

	c.modeMtx.RLock()
	defer c.modeMtx.RUnlock()

	return c.flush(ctx, ignoreErrors)
}

func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
	return c.db.View(func(tx *badger.Txn) error {
		it := tx.NewIterator(badger.DefaultIteratorOptions)
		defer it.Close()
		var key internalKey
		for it.Rewind(); it.Valid(); it.Next() {
			if got, want := int(it.Item().KeySize()), len(key); got != want {
				err := fmt.Errorf("invalid db key len: got %d, want %d", got, want)
				c.reportFlushError(logs.FrostFSNodeCantDecodeObjectAddressFromDB, hex.EncodeToString(it.Item().Key()), metaerr.Wrap(err))
				if ignoreErrors {
					continue
				}
				return err
			}
			if err := it.Item().Value(func(data []byte) error {
				var obj objectSDK.Object
				if err := obj.Unmarshal(data); err != nil {
					copy(key[:], it.Item().Key())
					c.reportFlushError(logs.FrostFSNodeCantUnmarshalObjectFromDB, key.address().EncodeToString(), metaerr.Wrap(err))
					if ignoreErrors {
						return nil
					}
					return err
				}

				return c.flushObject(ctx, &obj, data, writecache.StorageTypeDB)
			}); err != nil {
				return err
			}
		}
		return nil
	})
}