package writecache

import (
	"context"
	"errors"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
)

const (
	// defaultFlushWorkersCount is number of workers for putting objects in main storage.
	defaultFlushWorkersCount = 20
	// defaultFlushInterval is default time interval between successive flushes.
	defaultFlushInterval = 10 * time.Second
)

var errIterationCompleted = errors.New("iteration completed")

// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) {
	if c.disableBackgroundFlush {
		return
	}
	fl := newFlushLimiter(c.flushSizeLimit)
	c.wg.Add(1)
	go func() {
		defer c.wg.Done()
		c.pushToFlushQueue(ctx, fl)
	}()

	for range c.workersCount {
		c.wg.Add(1)
		go c.workerFlush(ctx, fl)
	}
}

func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) {
	stopf := context.AfterFunc(ctx, func() {
		fl.close()
	})
	defer stopf()

	tick := time.NewTicker(defaultFlushInterval)
	for {
		select {
		case <-tick.C:
			c.modeMtx.RLock()
			if c.readOnly() || c.noMetabase() {
				c.modeMtx.RUnlock()
				continue
			}

			err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error {
				if err := fl.acquire(oi.DataSize); err != nil {
					return err
				}
				select {
				case c.flushCh <- objectInfo{
					addr: oi.Address,
					size: oi.DataSize,
				}:
					return nil
				case <-ctx.Done():
					fl.release(oi.DataSize)
					return ctx.Err()
				}
			})
			if err != nil {
				c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err))
			}

			c.modeMtx.RUnlock()
		case <-ctx.Done():
			return
		}
	}
}

func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) {
	defer c.wg.Done()

	var objInfo objectInfo
	for {
		select {
		case objInfo = <-c.flushCh:
			c.flushIfAnObjectExistsWorker(ctx, objInfo, fl)
		case <-ctx.Done():
			return
		}
	}
}

func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) {
	defer fl.release(objInfo.size)

	res, err := c.fsTree.Get(ctx, common.GetPrm{
		Address: objInfo.addr,
	})
	if err != nil {
		if !client.IsErrObjectNotFound(err) {
			c.reportFlushError(ctx, logs.WritecacheCantGetObject, objInfo.addr.EncodeToString(), metaerr.Wrap(err))
		}
		return
	}

	err = c.flushObject(ctx, res.Object, res.RawData, StorageTypeFSTree)
	if err != nil {
		// Error is handled in flushObject.
		return
	}

	c.deleteFromDisk(ctx, objInfo.addr, uint64(len(res.RawData)))
}

func (c *cache) reportFlushError(ctx context.Context, msg string, addr string, err error) {
	if c.reportError != nil {
		c.reportError(ctx, msg, err)
	} else {
		c.log.Error(ctx, msg,
			zap.String("address", addr),
			zap.Error(err))
	}
}

func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
	var prm common.IteratePrm
	prm.IgnoreErrors = ignoreErrors
	prm.Handler = func(e common.IterationElement) error {
		sAddr := e.Address.EncodeToString()

		var obj objectSDK.Object
		err := obj.Unmarshal(e.ObjectData)
		if err != nil {
			c.reportFlushError(ctx, logs.FSTreeCantUnmarshalObject, sAddr, metaerr.Wrap(err))
			if ignoreErrors {
				return nil
			}
			return err
		}

		err = c.flushObject(ctx, &obj, e.ObjectData, StorageTypeFSTree)
		if err != nil {
			return err
		}

		c.deleteFromDisk(ctx, e.Address, uint64(len(e.ObjectData)))
		return nil
	}

	_, err := c.fsTree.Iterate(ctx, prm)
	return err
}

// flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []byte, st StorageType) error {
	var err error

	defer func() {
		c.metrics.Flush(err == nil, st)
	}()

	addr := objectCore.AddressOf(obj)

	var prm common.PutPrm
	prm.Object = obj
	prm.RawData = data

	res, err := c.blobstor.Put(ctx, prm)
	if err != nil {
		if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) &&
			!errors.Is(err, blobstor.ErrNoPlaceFound) {
			c.reportFlushError(ctx, logs.FSTreeCantFushObjectBlobstor,
				addr.EncodeToString(), err)
		}
		return err
	}

	var updPrm meta.UpdateStorageIDPrm
	updPrm.SetAddress(addr)
	updPrm.SetStorageID(res.StorageID)

	_, err = c.metabase.UpdateStorageID(ctx, updPrm)
	if err != nil {
		c.reportFlushError(ctx, logs.FSTreeCantUpdateID,
			addr.EncodeToString(), err)
	}
	return err
}

// Flush flushes all objects from the write-cache to the main storage.
func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
	ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
		trace.WithAttributes(
			attribute.Bool("ignore_errors", ignoreErrors),
			attribute.Bool("seal", seal),
		))
	defer span.End()

	c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
	defer c.modeMtx.Unlock()
	if c.noMetabase() {
		return ErrDegraded
	}

	if err := c.flush(ctx, ignoreErrors); err != nil {
		return err
	}

	if seal {
		m := c.mode | mode.ReadOnly
		if err := c.setMode(ctx, m, setModePrm{ignoreErrors: ignoreErrors}); err != nil {
			return err
		}
		c.metrics.SetMode(mode.ConvertToComponentModeDegraded(m))
	}
	return nil
}

func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
	return c.flushFSTree(ctx, ignoreErrors)
}