package writecache

import (
	"bytes"
	"context"
	"errors"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"github.com/mr-tron/base58"
	"github.com/nspcc-dev/neo-go/pkg/util/slice"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
)

const (
	// flushBatchSize is amount of keys which will be read from cache to be flushed
	// to the main storage. It is used to reduce contention between cache put
	// and cache persist.
	flushBatchSize = 512
	// defaultFlushWorkersCount is number of workers for putting objects in main storage.
	defaultFlushWorkersCount = 20
	// defaultFlushInterval is default time interval between successive flushes.
	defaultFlushInterval = time.Second
)

// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop() {
	for i := 0; i < c.workersCount; i++ {
		c.wg.Add(1)
		go c.workerFlushSmall()
	}

	c.wg.Add(1)
	go func() {
		c.workerFlushBig(context.TODO())
		c.wg.Done()
	}()

	c.wg.Add(1)
	go func() {
		defer c.wg.Done()

		tt := time.NewTimer(defaultFlushInterval)
		defer tt.Stop()

		for {
			select {
			case <-tt.C:
				c.flushSmallObjects()
				tt.Reset(defaultFlushInterval)
			case <-c.closeCh:
				return
			}
		}
	}()
}

func (c *cache) flushSmallObjects() {
	var lastKey []byte
	var m []objectInfo
	for {
		select {
		case <-c.closeCh:
			return
		default:
		}

		m = m[:0]

		c.modeMtx.RLock()
		if c.readOnly() {
			c.modeMtx.RUnlock()
			time.Sleep(time.Second)
			continue
		}

		// We put objects in batches of fixed size to not interfere with main put cycle a lot.
		_ = c.db.View(func(tx *bbolt.Tx) error {
			b := tx.Bucket(defaultBucket)
			cs := b.Cursor()

			var k, v []byte

			if len(lastKey) == 0 {
				k, v = cs.First()
			} else {
				k, v = cs.Seek(lastKey)
				if bytes.Equal(k, lastKey) {
					k, v = cs.Next()
				}
			}

			for ; k != nil && len(m) < flushBatchSize; k, v = cs.Next() {
				if len(lastKey) == len(k) {
					copy(lastKey, k)
				} else {
					lastKey = slice.Copy(k)
				}

				m = append(m, objectInfo{
					addr: string(k),
					data: slice.Copy(v),
				})
			}
			return nil
		})

		var count int
		for i := range m {
			obj := object.New()
			if err := obj.Unmarshal(m[i].data); err != nil {
				continue
			}

			count++
			select {
			case c.flushCh <- obj:
			case <-c.closeCh:
				c.modeMtx.RUnlock()
				return
			}
		}

		if count == 0 {
			c.modeMtx.RUnlock()
			break
		}

		c.modeMtx.RUnlock()

		c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
			zap.Int("count", count),
			zap.String("start", base58.Encode(lastKey)))
	}
}

func (c *cache) workerFlushBig(ctx context.Context) {
	tick := time.NewTicker(defaultFlushInterval * 10)
	for {
		select {
		case <-tick.C:
			c.modeMtx.RLock()
			if c.readOnly() {
				c.modeMtx.RUnlock()
				break
			}

			_ = c.flushFSTree(ctx, true)

			c.modeMtx.RUnlock()
		case <-c.closeCh:
			return
		}
	}
}

func (c *cache) reportFlushError(msg string, addr string, err error) {
	if c.reportError != nil {
		c.reportError(msg, err)
	} else {
		c.log.Error(msg,
			zap.String("address", addr),
			zap.Error(err))
	}
}

func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
	var prm common.IteratePrm
	prm.IgnoreErrors = ignoreErrors
	prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
		sAddr := addr.EncodeToString()

		data, err := f()
		if err != nil {
			c.reportFlushError("can't read a file", sAddr, err)
			if ignoreErrors {
				return nil
			}
			return err
		}

		var obj object.Object
		err = obj.Unmarshal(data)
		if err != nil {
			c.reportFlushError("can't unmarshal an object", sAddr, err)
			if ignoreErrors {
				return nil
			}
			return err
		}

		err = c.flushObject(ctx, &obj, data, StorageTypeFSTree)
		if err != nil {
			if ignoreErrors {
				return nil
			}
			return err
		}

		c.deleteFromDisk(ctx, []string{sAddr})
		return nil
	}

	_, err := c.fsTree.Iterate(prm)
	return err
}

// workerFlushSmall writes small objects to the main storage.
func (c *cache) workerFlushSmall() {
	defer c.wg.Done()

	var obj *object.Object
	for {
		// Give priority to direct put.
		select {
		case obj = <-c.flushCh:
		case <-c.closeCh:
			return
		}

		err := c.flushObject(context.TODO(), obj, nil, StorageTypeDB)
		if err != nil {
			// Error is handled in flushObject.
			continue
		}

		c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()})
	}
}

// flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st StorageType) error {
	var err error

	defer func() {
		c.metrics.Flush(err == nil, st)
	}()

	addr := objectCore.AddressOf(obj)

	var prm common.PutPrm
	prm.Object = obj
	prm.RawData = data

	res, err := c.blobstor.Put(ctx, prm)
	if err != nil {
		if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) &&
			!errors.Is(err, blobstor.ErrNoPlaceFound) {
			c.reportFlushError("can't flush an object to blobstor",
				addr.EncodeToString(), err)
		}
		return err
	}

	var updPrm meta.UpdateStorageIDPrm
	updPrm.SetAddress(addr)
	updPrm.SetStorageID(res.StorageID)

	_, err = c.metabase.UpdateStorageID(updPrm)
	if err != nil {
		c.reportFlushError("can't update object storage ID",
			addr.EncodeToString(), err)
	}
	return err
}

// Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers.
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
	ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
		trace.WithAttributes(
			attribute.Bool("ignore_errors", ignoreErrors),
		))
	defer span.End()

	c.modeMtx.RLock()
	defer c.modeMtx.RUnlock()

	return c.flush(ctx, ignoreErrors)
}

func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
	if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
		return err
	}

	return c.db.View(func(tx *bbolt.Tx) error {
		var addr oid.Address

		b := tx.Bucket(defaultBucket)
		cs := b.Cursor()
		for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
			sa := string(k)
			if err := addr.DecodeString(sa); err != nil {
				c.reportFlushError("can't decode object address from the DB", sa, err)
				if ignoreErrors {
					continue
				}
				return err
			}

			var obj object.Object
			if err := obj.Unmarshal(data); err != nil {
				c.reportFlushError("can't unmarshal an object from the DB", sa, err)
				if ignoreErrors {
					continue
				}
				return err
			}

			if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
				return err
			}
		}
		return nil
	})
}