package shard

import (
	"context"
	"errors"
	"fmt"

	"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	"go.uber.org/zap"
)

func (s *Shard) handleMetabaseFailure(stage string, err error) error {
	s.log.Error(logs.ShardMetabaseFailureSwitchingMode,
		zap.String("stage", stage),
		zap.Stringer("mode", mode.ReadOnly),
		zap.Error(err))

	err = s.SetMode(mode.ReadOnly)
	if err == nil {
		return nil
	}

	s.log.Error(logs.ShardCantMoveShardToReadonlySwitchMode,
		zap.String("stage", stage),
		zap.Stringer("mode", mode.DegradedReadOnly),
		zap.Error(err))

	err = s.SetMode(mode.DegradedReadOnly)
	if err != nil {
		return fmt.Errorf("could not switch to mode %s", mode.DegradedReadOnly)
	}
	return nil
}

// Open opens all Shard's components.
func (s *Shard) Open() error {
	components := []interface{ Open(bool) error }{
		s.blobStor, s.metaBase,
	}

	if s.hasWriteCache() {
		components = append(components, s.writeCache)
	}

	if s.pilorama != nil {
		components = append(components, s.pilorama)
	}

	for i, component := range components {
		if err := component.Open(false); err != nil {
			if component == s.metaBase {
				// We must first open all other components to avoid
				// opening non-existent DB in read-only mode.
				for j := i + 1; j < len(components); j++ {
					if err := components[j].Open(false); err != nil {
						// Other components must be opened, fail.
						return fmt.Errorf("could not open %T: %w", components[j], err)
					}
				}
				err = s.handleMetabaseFailure("open", err)
				if err != nil {
					return err
				}

				break
			}

			return fmt.Errorf("could not open %T: %w", component, err)
		}
	}
	return nil
}

type metabaseSynchronizer Shard

func (x *metabaseSynchronizer) Init() error {
	ctx, span := tracing.StartSpanFromContext(context.TODO(), "metabaseSynchronizer.Init")
	defer span.End()

	return (*Shard)(x).refillMetabase(ctx)
}

// Init initializes all Shard's components.
func (s *Shard) Init(ctx context.Context) error {
	type initializer interface {
		Init() error
	}

	var components []initializer

	if !s.GetMode().NoMetabase() {
		var initMetabase initializer

		if s.needRefillMetabase() {
			initMetabase = (*metabaseSynchronizer)(s)
		} else {
			initMetabase = s.metaBase
		}

		components = []initializer{
			s.blobStor, initMetabase,
		}
	} else {
		components = []initializer{s.blobStor}
	}

	if s.hasWriteCache() {
		components = append(components, s.writeCache)
	}

	if s.pilorama != nil {
		components = append(components, s.pilorama)
	}

	for _, component := range components {
		if err := component.Init(); err != nil {
			if component == s.metaBase {
				if errors.Is(err, meta.ErrOutdatedVersion) {
					return fmt.Errorf("metabase initialization: %w", err)
				}

				err = s.handleMetabaseFailure("init", err)
				if err != nil {
					return err
				}

				break
			}

			return fmt.Errorf("could not initialize %T: %w", component, err)
		}
	}

	s.updateMetrics(ctx)

	s.gc = &gc{
		gcCfg:       &s.gcCfg,
		remover:     s.removeGarbage,
		stopChannel: make(chan struct{}),
		eventChan:   make(chan Event),
		mEventHandler: map[eventType]*eventHandlers{
			eventNewEpoch: {
				cancelFunc: func() {},
				handlers: []eventHandler{
					s.collectExpiredLocks,
					s.collectExpiredObjects,
					s.collectExpiredTombstones,
				},
			},
		},
	}

	s.gc.init(ctx)

	return nil
}

func (s *Shard) refillMetabase(ctx context.Context) error {
	err := s.metaBase.Reset()
	if err != nil {
		return fmt.Errorf("could not reset metabase: %w", err)
	}

	obj := objectSDK.New()

	err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
		if err := obj.Unmarshal(data); err != nil {
			s.log.Warn(logs.ShardCouldNotUnmarshalObject,
				zap.Stringer("address", addr),
				zap.String("err", err.Error()))
			return nil
		}

		var err error
		switch obj.Type() {
		case objectSDK.TypeTombstone:
			err = s.refillTombstoneObject(ctx, obj)
		case objectSDK.TypeLock:
			err = s.refillLockObject(ctx, obj)
		default:
		}
		if err != nil {
			return err
		}

		var mPrm meta.PutPrm
		mPrm.SetObject(obj)
		mPrm.SetStorageID(descriptor)

		_, err = s.metaBase.Put(ctx, mPrm)
		if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
			return err
		}

		return nil
	})
	if err != nil {
		return fmt.Errorf("could not put objects to the meta: %w", err)
	}

	err = s.metaBase.SyncCounters()
	if err != nil {
		return fmt.Errorf("could not sync object counters: %w", err)
	}

	return nil
}

func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
	var lock objectSDK.Lock
	if err := lock.Unmarshal(obj.Payload()); err != nil {
		return fmt.Errorf("could not unmarshal lock content: %w", err)
	}

	locked := make([]oid.ID, lock.NumberOfMembers())
	lock.ReadMembers(locked)

	cnr, _ := obj.ContainerID()
	id, _ := obj.ID()
	err := s.metaBase.Lock(ctx, cnr, id, locked)
	if err != nil {
		return fmt.Errorf("could not lock objects: %w", err)
	}
	return nil
}

func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
	tombstone := objectSDK.NewTombstone()

	if err := tombstone.Unmarshal(obj.Payload()); err != nil {
		return fmt.Errorf("could not unmarshal tombstone content: %w", err)
	}

	tombAddr := object.AddressOf(obj)
	memberIDs := tombstone.Members()
	tombMembers := make([]oid.Address, 0, len(memberIDs))

	for i := range memberIDs {
		a := tombAddr
		a.SetObject(memberIDs[i])

		tombMembers = append(tombMembers, a)
	}

	var inhumePrm meta.InhumePrm

	inhumePrm.SetTombstoneAddress(tombAddr)
	inhumePrm.SetAddresses(tombMembers...)

	_, err := s.metaBase.Inhume(ctx, inhumePrm)
	if err != nil {
		return fmt.Errorf("could not inhume objects: %w", err)
	}
	return nil
}

// Close releases all Shard's components.
func (s *Shard) Close() error {
	components := []interface{ Close() error }{}

	if s.pilorama != nil {
		components = append(components, s.pilorama)
	}

	if s.hasWriteCache() {
		components = append(components, s.writeCache)
	}

	components = append(components, s.blobStor, s.metaBase)

	var lastErr error
	for _, component := range components {
		if err := component.Close(); err != nil {
			lastErr = err
			s.log.Error(logs.ShardCouldNotCloseShardComponent, zap.Error(err))
		}
	}

	// If Init/Open was unsuccessful gc can be nil.
	if s.gc != nil {
		s.gc.stop()
	}

	return lastErr
}

// Reload reloads configuration portions that are necessary.
// If a config option is invalid, it logs an error and returns nil.
// If there was a problem with applying new configuration, an error is returned.
func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
	ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Reload")
	defer span.End()

	// Do not use defaultCfg here missing options need not be reloaded.
	var c cfg
	for i := range opts {
		opts[i](&c)
	}

	unlock := s.lockExclusive()
	defer unlock()

	ok, err := s.metaBase.Reload(c.metaOpts...)
	if err != nil {
		if errors.Is(err, meta.ErrDegradedMode) {
			s.log.Error(logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
			_ = s.setMode(mode.DegradedReadOnly)
		}
		return err
	}
	if ok {
		var err error
		if c.refillMetabase {
			// Here we refill metabase only if a new instance was opened. This is a feature,
			// we don't want to hang for some time just because we forgot to change
			// config after the node was updated.
			err = s.refillMetabase(ctx)
		} else {
			err = s.metaBase.Init()
		}
		if err != nil {
			s.log.Error(logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
			_ = s.setMode(mode.DegradedReadOnly)
			return err
		}
	}

	s.log.Info(logs.ShardTryingToRestoreReadwriteMode)
	return s.setMode(mode.ReadWrite)
}

func (s *Shard) lockExclusive() func() {
	s.setModeRequested.Store(true)
	val := s.gcCancel.Load()
	if val != nil {
		cancelGC := val.(context.CancelFunc)
		cancelGC()
	}
	s.m.Lock()
	s.setModeRequested.Store(false)
	return s.m.Unlock
}