package writecache

import (
	"context"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
	storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

// Put puts object to write-cache.
//
// Returns ErrReadOnly if write-cache is in R/O mode.
// Returns ErrNotInitialized if write-cache has not been initialized yet.
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
// Returns ErrBigObject if an objects exceeds maximum object size.
func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
	ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
		trace.WithAttributes(
			attribute.String("address", prm.Address.EncodeToString()),
			attribute.Bool("dont_compress", prm.DontCompress),
		))
	defer span.End()

	startedAt := time.Now()
	added := false
	storageType := StorageTypeUndefined
	defer func() {
		c.metrics.Put(time.Since(startedAt), added, storageType)
	}()

	if !c.modeMtx.TryRLock() {
		return common.PutRes{}, ErrNotInitialized
	}
	defer c.modeMtx.RUnlock()
	if c.readOnly() {
		return common.PutRes{}, ErrReadOnly
	}
	if c.noMetabase() {
		return common.PutRes{}, ErrDegraded
	}

	sz := uint64(len(prm.RawData))
	if sz > c.maxObjectSize {
		return common.PutRes{}, ErrBigObject
	}

	oi := objectInfo{
		addr: prm.Address.EncodeToString(),
		obj:  prm.Object,
		data: prm.RawData,
	}

	if sz <= c.smallObjectSize {
		storageType = StorageTypeDB
		err := c.putSmall(oi)
		if err == nil {
			added = true
		}
		return common.PutRes{}, err
	}

	storageType = StorageTypeFSTree
	err := c.putBig(ctx, oi.addr, prm)
	if err == nil {
		added = true
	}
	return common.PutRes{}, metaerr.Wrap(err)
}

// putSmall persists small objects to the write-cache database and
// pushes the to the flush workers queue.
func (c *cache) putSmall(obj objectInfo) error {
	cacheSize := c.estimateCacheSize()
	if c.maxCacheSize < c.incSizeDB(cacheSize) {
		return ErrOutOfSpace
	}

	var newRecord bool
	err := c.db.Batch(func(tx *bbolt.Tx) error {
		b := tx.Bucket(defaultBucket)
		key := []byte(obj.addr)
		newRecord = b.Get(key) == nil
		if newRecord {
			return b.Put(key, obj.data)
		}
		return nil
	})
	if err == nil {
		storagelog.Write(c.log,
			storagelog.AddressField(obj.addr),
			storagelog.StorageTypeField(wcStorageType),
			storagelog.OpField("db PUT"),
		)
		if newRecord {
			c.objCounters.cDB.Add(1)
			c.estimateCacheSize()
		}
	}
	return err
}

// putBig writes object to FSTree and pushes it to the flush workers queue.
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
	cacheSz := c.estimateCacheSize()
	if c.maxCacheSize < c.incSizeFS(cacheSz) {
		return ErrOutOfSpace
	}

	_, err := c.fsTree.Put(ctx, prm)
	if err != nil {
		return err
	}

	if compressor := c.blobstor.Compressor(); compressor != nil && compressor.NeedsCompression(prm.Object) {
		c.mtx.Lock()
		c.compressFlags[addr] = struct{}{}
		c.mtx.Unlock()
	}
	storagelog.Write(c.log,
		storagelog.AddressField(addr),
		storagelog.StorageTypeField(wcStorageType),
		storagelog.OpField("fstree PUT"),
	)
	// counter changed by fstree
	c.estimateCacheSize()

	return nil
}