package engine

import (
	"context"
	"errors"
	"strconv"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
	tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
)

// RngPrm groups the parameters of GetRange operation.
type RngPrm struct {
	off, ln uint64

	addr oid.Address
}

// RngRes groups the resulting values of GetRange operation.
type RngRes struct {
	obj *objectSDK.Object
}

// WithAddress is a GetRng option to set the address of the requested object.
//
// Option is required.
func (p *RngPrm) WithAddress(addr oid.Address) {
	p.addr = addr
}

// WithPayloadRange is a GetRange option to set range of requested payload data.
//
// Missing an option or calling with zero length is equivalent
// to getting the full payload range.
func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) {
	p.off, p.ln = rng.GetOffset(), rng.GetLength()
}

// Object returns the requested object part.
//
// Instance payload contains the requested range of the original object.
func (r RngRes) Object() *objectSDK.Object {
	return r.obj
}

// GetRange reads part of an object from local storage.
//
// Returns any error encountered that
// did not allow to completely read the object part.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in local storage.
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object is inhumed.
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) GetRange(ctx context.Context, prm RngPrm) (res RngRes, err error) {
	err = e.execIfNotBlocked(func() error {
		res, err = e.getRange(ctx, prm)
		return err
	})

	return
}

func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error) {
	ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getRange",
		trace.WithAttributes(
			attribute.String("address", prm.addr.EncodeToString()),
			attribute.String("offset", strconv.FormatUint(prm.off, 10)),
			attribute.String("length", strconv.FormatUint(prm.ln, 10)),
		))
	defer span.End()

	if e.metrics != nil {
		defer elapsed("GetRange", e.metrics.AddMethodDuration)()
	}

	var shPrm shard.RngPrm
	shPrm.SetAddress(prm.addr)
	shPrm.SetRange(prm.off, prm.ln)

	it := &getRangeShardIterator{
		OutError: new(apistatus.ObjectNotFound),
		ShardPrm: shPrm,
		Address:  prm.addr,
		Engine:   e,
	}

	it.tryGetWithMeta(ctx)

	if it.SplitInfo != nil {
		return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
	}
	if it.ECInfo != nil {
		return RngRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
	}

	if it.Object == nil {
		// If any shard is in a degraded mode, we should assume that metabase could store
		// info about some object.
		if it.ShardWithMeta.Shard == nil && !it.HasDegraded || !client.IsErrObjectNotFound(it.OutError) {
			return RngRes{}, it.OutError
		}

		it.tryGetFromBlobstor(ctx)

		if it.Object == nil {
			return RngRes{}, it.OutError
		}
		if it.ShardWithMeta.Shard != nil && it.MetaError != nil {
			e.log.Warn(logs.ShardMetaInfoPresentButObjectNotFound,
				zap.Stringer("shard_id", it.ShardWithMeta.ID()),
				zap.String("error", it.MetaError.Error()),
				zap.Stringer("address", prm.addr),
				zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
		}
	}

	return RngRes{
		obj: it.Object,
	}, nil
}

// GetRange reads object payload range from local storage by provided address.
func GetRange(ctx context.Context, storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) {
	var rangePrm RngPrm
	rangePrm.WithAddress(addr)
	rangePrm.WithPayloadRange(rng)

	res, err := storage.GetRange(ctx, rangePrm)
	if err != nil {
		return nil, err
	}

	return res.Object().Payload(), nil
}

type getRangeShardIterator struct {
	Object         *objectSDK.Object
	SplitInfoError *objectSDK.SplitInfoError
	SplitInfo      *objectSDK.SplitInfo
	ECInfoError    *objectSDK.ECInfoError
	ECInfo         *objectSDK.ECInfo
	OutError       error
	ShardWithMeta  hashedShard
	MetaError      error
	HasDegraded    bool

	ShardPrm shard.RngPrm
	Address  oid.Address
	Engine   *StorageEngine
}

func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
	i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) {
		noMeta := sh.GetMode().NoMetabase()
		i.HasDegraded = i.HasDegraded || noMeta
		i.ShardPrm.SetIgnoreMeta(noMeta)

		res, err := sh.GetRange(ctx, i.ShardPrm)
		if err == nil {
			i.Object = res.Object()
			return true
		}

		if res.HasMeta() {
			i.ShardWithMeta = sh
			i.MetaError = err
		}
		switch {
		case client.IsErrObjectNotFound(err):
			return false // ignore, go to next shard
		case errors.As(err, &i.SplitInfoError):
			if i.SplitInfo == nil {
				i.SplitInfo = objectSDK.NewSplitInfo()
			}

			util.MergeSplitInfo(i.SplitInfoError.SplitInfo(), i.SplitInfo)

			_, withLink := i.SplitInfo.Link()
			_, withLast := i.SplitInfo.LastPart()

			// stop iterating over shards if SplitInfo structure is complete
			return withLink && withLast
		case errors.As(err, &i.ECInfoError):
			if i.ECInfo == nil {
				i.ECInfo = objectSDK.NewECInfo()
			}

			util.MergeECInfo(i.ECInfoError.ECInfo(), i.ECInfo)
			// stop iterating over shards if ECInfo structure is complete
			return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
		case
			client.IsErrObjectAlreadyRemoved(err),
			shard.IsErrOutOfRange(err):
			i.OutError = err

			return true // stop, return it back
		default:
			i.Engine.reportShardError(sh, "could not get object from shard", err)
			return false
		}
	})
}

func (i *getRangeShardIterator) tryGetFromBlobstor(ctx context.Context) {
	// If the object is not found but is present in metabase,
	// try to fetch it from blobstor directly. If it is found in any
	// blobstor, increase the error counter for the shard which contains the meta.
	i.ShardPrm.SetIgnoreMeta(true)

	i.Engine.iterateOverSortedShards(i.Address, func(_ int, sh hashedShard) (stop bool) {
		if sh.GetMode().NoMetabase() {
			// Already processed it without a metabase.
			return false
		}

		res, err := sh.GetRange(ctx, i.ShardPrm)
		if shard.IsErrOutOfRange(err) {
			i.OutError = new(apistatus.ObjectOutOfRange)
			return true
		}
		i.Object = res.Object()
		return err == nil
	})
}