package pilorama

import (
	"bytes"
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"math/rand"
	"os"
	"path/filepath"
	"strconv"
	"sync"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
	"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
	cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	"github.com/nspcc-dev/neo-go/pkg/io"
	"go.etcd.io/bbolt"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

type boltForest struct {
	db *bbolt.DB

	modeMtx sync.RWMutex
	mode    mode.Mode

	// mtx protects batches field.
	mtx     sync.Mutex
	batches []*batch

	cfg
}

const (
	childrenKeySize = 17
	maxKeySize      = childrenKeySize
)

var (
	dataBucket = []byte{0}
	logBucket  = []byte{1}
)

// ErrDegradedMode is returned when pilorama is in a degraded mode.
var ErrDegradedMode = logicerr.New("pilorama is in a degraded mode")

// ErrReadOnlyMode is returned when pilorama is in a read-only mode.
var ErrReadOnlyMode = logicerr.New("pilorama is in a read-only mode")

// NewBoltForest returns storage wrapper for storing operations on CRDT trees.
//
// Each tree is stored in a separate bucket by `CID + treeID` key.
// All integers are stored in little-endian unless explicitly specified otherwise.
//
// DB schema (for a single tree):
// timestamp is 8-byte, id is 4-byte.
//
// log storage (logBucket):
// timestamp in big-endian -> log operation
//
// tree storage (dataBucket):
// - 't' + node (id) -> timestamp when the node first appeared,
// - 'p' + node (id) -> parent (id),
// - 'm' + node (id) -> serialized meta,
// - 'c' + parent (id) + child (id) -> 0/1,
// - 'i' + 0 + attrKey + 0 + attrValue + 0 + parent (id) + node (id) -> 0/1 (1 for automatically created nodes).
func NewBoltForest(opts ...Option) ForestStorage {
	b := boltForest{
		cfg: cfg{
			perm:          os.ModePerm,
			maxBatchDelay: bbolt.DefaultMaxBatchDelay,
			maxBatchSize:  bbolt.DefaultMaxBatchSize,
			openFile:      os.OpenFile,
			metrics:       &noopMetrics{},
		},
	}

	for i := range opts {
		opts[i](&b.cfg)
	}

	return &b
}

func (t *boltForest) SetMode(m mode.Mode) error {
	t.modeMtx.Lock()
	defer t.modeMtx.Unlock()

	if t.mode == m {
		return nil
	}

	err := t.Close()
	if err == nil && !m.NoMetabase() {
		if err = t.Open(context.TODO(), m.ReadOnly()); err == nil {
			err = t.Init()
		}
	}
	if err != nil {
		return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
	}

	t.mode = m
	t.metrics.SetMode(m)
	return nil
}
func (t *boltForest) Open(_ context.Context, readOnly bool) error {
	err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
	if err != nil {
		return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
	}

	opts := *bbolt.DefaultOptions
	opts.ReadOnly = readOnly
	opts.NoSync = t.noSync
	opts.Timeout = 100 * time.Millisecond
	opts.OpenFile = t.openFile

	t.db, err = bbolt.Open(t.path, t.perm, &opts)
	if err != nil {
		return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err))
	}

	t.db.MaxBatchSize = t.maxBatchSize
	t.db.MaxBatchDelay = t.maxBatchDelay
	m := mode.ReadWrite
	if readOnly {
		m = mode.ReadOnly
	}
	t.metrics.SetMode(m)
	return nil
}
func (t *boltForest) Init() error {
	if t.mode.NoMetabase() || t.db.IsReadOnly() {
		return nil
	}
	return t.db.Update(func(tx *bbolt.Tx) error {
		_, err := tx.CreateBucketIfNotExists(dataBucket)
		if err != nil {
			return err
		}
		_, err = tx.CreateBucketIfNotExists(logBucket)
		return err
	})
}
func (t *boltForest) Close() error {
	var err error
	if t.db != nil {
		err = t.db.Close()
	}
	if err == nil {
		t.metrics.Close()
	}
	return err
}

func (t *boltForest) SetParentID(id string) {
	t.metrics.SetParentID(id)
}

// TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(ctx context.Context, d CIDDescriptor, treeID string, m *Move) (*Move, error) {
	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeMove",
		trace.WithAttributes(
			attribute.String("container_id", d.CID.EncodeToString()),
			attribute.Int("position", d.Position),
			attribute.Int("size", d.Size),
			attribute.String("tree_id", treeID),
		),
	)
	defer span.End()

	if !d.checkValid() {
		return nil, ErrInvalidCIDDescriptor
	}

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return nil, ErrDegradedMode
	} else if t.mode.ReadOnly() {
		return nil, ErrReadOnlyMode
	}

	lm := *m
	fullID := bucketName(d.CID, treeID)
	return &lm, metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
		bLog, bTree, err := t.getTreeBuckets(tx, fullID)
		if err != nil {
			return err
		}

		lm.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
		if lm.Child == RootID {
			lm.Child = t.findSpareID(bTree)
		}
		return t.do(bLog, bTree, make([]byte, maxKeySize), &lm)
	}))
}

func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeHeight",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return 0, ErrDegradedMode
	}

	var height uint64
	var retErr error
	err := t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(bucketName(cid, treeID))
		if treeRoot != nil {
			k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
			height = binary.BigEndian.Uint64(k)
		} else {
			retErr = ErrTreeNotFound
		}
		return nil
	})
	if err == nil {
		err = retErr
	}
	return height, metaerr.Wrap(err)
}

// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeExists", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return false, ErrDegradedMode
	}

	var exists bool

	err := t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(bucketName(cid, treeID))
		exists = treeRoot != nil
		return nil
	})
	success = err == nil
	return exists, metaerr.Wrap(err)
}

var syncHeightKey = []byte{'h'}

// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeUpdateLastSyncHeight", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeUpdateLastSyncHeight",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
			attribute.String("height", strconv.FormatUint(height, 10)),
		),
	)
	defer span.End()

	rawHeight := make([]byte, 8)
	binary.LittleEndian.PutUint64(rawHeight, height)

	buck := bucketName(cid, treeID)
	err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(buck)
		if treeRoot == nil {
			return ErrTreeNotFound
		}

		b := treeRoot.Bucket(dataBucket)
		return b.Put(syncHeightKey, rawHeight)
	}))
	success = err == nil
	return err
}

// TreeLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeLastSyncHeight", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeLastSyncHeight",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
		),
	)
	defer span.End()

	var height uint64

	buck := bucketName(cid, treeID)
	err := t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(buck)
		if treeRoot == nil {
			return ErrTreeNotFound
		}

		b := treeRoot.Bucket(dataBucket)
		data := b.Get(syncHeightKey)
		if len(data) == 8 {
			height = binary.LittleEndian.Uint64(data)
		}
		return nil
	})
	success = err == nil
	return height, metaerr.Wrap(err)
}

// TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeAddByPath", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeAddByPath",
		trace.WithAttributes(
			attribute.String("container_id", d.CID.EncodeToString()),
			attribute.Int("position", d.Position),
			attribute.Int("size", d.Size),
			attribute.String("tree_id", treeID),
			attribute.String("attr", attr),
			attribute.Int("path_count", len(path)),
			attribute.Int("meta_count", len(meta)),
		),
	)
	defer span.End()

	res, err := t.addByPathInternal(d, attr, treeID, path, meta)
	success = err == nil
	return res, err
}

func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID string, path []string, meta []KeyValue) ([]Move, error) {
	if !d.checkValid() {
		return nil, ErrInvalidCIDDescriptor
	}
	if !isAttributeInternal(attr) {
		return nil, ErrNotPathAttribute
	}

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return nil, ErrDegradedMode
	} else if t.mode.ReadOnly() {
		return nil, ErrReadOnlyMode
	}

	var lm []Move
	var key [maxKeySize]byte

	fullID := bucketName(d.CID, treeID)
	err := t.db.Batch(func(tx *bbolt.Tx) error {
		bLog, bTree, err := t.getTreeBuckets(tx, fullID)
		if err != nil {
			return err
		}

		i, node, err := t.getPathPrefix(bTree, attr, path)
		if err != nil {
			return err
		}

		ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
		lm = make([]Move, len(path)-i+1)
		for j := i; j < len(path); j++ {
			lm[j-i] = Move{
				Parent: node,
				Meta: Meta{
					Time:  ts,
					Items: []KeyValue{{Key: attr, Value: []byte(path[j])}},
				},
				Child: t.findSpareID(bTree),
			}

			err := t.do(bLog, bTree, key[:], &lm[j-i])
			if err != nil {
				return err
			}

			ts = nextTimestamp(ts, uint64(d.Position), uint64(d.Size))
			node = lm[j-i].Child
		}

		lm[len(lm)-1] = Move{
			Parent: node,
			Meta: Meta{
				Time:  ts,
				Items: meta,
			},
			Child: t.findSpareID(bTree),
		}
		return t.do(bLog, bTree, key[:], &lm[len(lm)-1])
	})
	return lm, metaerr.Wrap(err)
}

// getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than
// all timestamps corresponding to already stored operations.
func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint64 {
	var ts uint64

	c := bLog.Cursor()
	key, _ := c.Last()
	if len(key) != 0 {
		ts = binary.BigEndian.Uint64(key)
	}
	return nextTimestamp(ts, uint64(pos), uint64(size))
}

// findSpareID returns random unused ID.
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
	id := uint64(rand.Int63())
	key := make([]byte, 9)

	for {
		_, _, _, ok := t.getState(bTree, stateKey(key, id))
		if !ok {
			return id
		}
		id = uint64(rand.Int63())
	}
}

// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeApply", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApply",
		trace.WithAttributes(
			attribute.String("container_id", cnr.EncodeToString()),
			attribute.String("tree_id", treeID),
			attribute.Bool("background", backgroundSync),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return ErrDegradedMode
	} else if t.mode.ReadOnly() {
		return ErrReadOnlyMode
	}

	if backgroundSync {
		var seen bool
		err := t.db.View(func(tx *bbolt.Tx) error {
			treeRoot := tx.Bucket(bucketName(cnr, treeID))
			if treeRoot == nil {
				success = true
				return nil
			}

			b := treeRoot.Bucket(logBucket)

			var logKey [8]byte
			binary.BigEndian.PutUint64(logKey[:], m.Time)
			seen = b.Get(logKey[:]) != nil
			success = true
			return nil
		})
		if err != nil || seen {
			success = err == nil
			return metaerr.Wrap(err)
		}
	}

	if t.db.MaxBatchSize == 1 {
		fullID := bucketName(cnr, treeID)
		err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
			bLog, bTree, err := t.getTreeBuckets(tx, fullID)
			if err != nil {
				return err
			}

			var lm Move
			return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
		}))
		success = err == nil
		return err
	}

	ch := make(chan error, 1)
	t.addBatch(cnr, treeID, m, ch)
	err := <-ch
	success = err == nil
	return metaerr.Wrap(err)
}

func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
	t.mtx.Lock()
	for i := 0; i < len(t.batches); i++ {
		t.batches[i].mtx.Lock()
		if t.batches[i].timer == nil {
			t.batches[i].mtx.Unlock()
			copy(t.batches[i:], t.batches[i+1:])
			t.batches = t.batches[:len(t.batches)-1]
			i--
			continue
		}

		found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
		if found {
			t.batches[i].results = append(t.batches[i].results, ch)
			t.batches[i].operations = append(t.batches[i].operations, m)
			if len(t.batches[i].operations) == t.db.MaxBatchSize {
				t.batches[i].timer.Stop()
				t.batches[i].timer = nil
				t.batches[i].mtx.Unlock()
				b := t.batches[i]
				t.mtx.Unlock()
				b.trigger()
				return
			}
			t.batches[i].mtx.Unlock()
			t.mtx.Unlock()
			return
		}
		t.batches[i].mtx.Unlock()
	}
	b := &batch{
		forest:     t,
		cid:        cnr,
		treeID:     treeID,
		results:    []chan<- error{ch},
		operations: []*Move{m},
	}
	b.mtx.Lock()
	b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
	b.mtx.Unlock()
	t.batches = append(t.batches, b)
	t.mtx.Unlock()
}

func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, treeRoot []byte) (*bbolt.Bucket, *bbolt.Bucket, error) {
	child := tx.Bucket(treeRoot)
	if child != nil {
		return child.Bucket(logBucket), child.Bucket(dataBucket), nil
	}

	child, err := tx.CreateBucket(treeRoot)
	if err != nil {
		return nil, nil, err
	}
	bLog, err := child.CreateBucket(logBucket)
	if err != nil {
		return nil, nil, err
	}
	bData, err := child.CreateBucket(dataBucket)
	if err != nil {
		return nil, nil, err
	}
	return bLog, bData, nil
}

// applyOperations applies log operations. Assumes lm are sorted by timestamp.
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *Move) error {
	var tmp Move
	var cKey [maxKeySize]byte

	c := logBucket.Cursor()

	key, value := c.Last()

	b := bytes.NewReader(nil)
	r := io.NewBinReaderFromIO(b)

	// 1. Undo up until the desired timestamp is here.
	for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
		b.Reset(value)

		tmp.Child = r.ReadU64LE()
		tmp.Parent = r.ReadU64LE()
		tmp.Time = r.ReadVarUint()
		if r.Err != nil {
			return r.Err
		}
		if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
			return err
		}
		key, value = c.Prev()
	}

	for i := 0; i < len(ms); i++ {
		// Loop invariant: key represents the next stored timestamp after ms[i].Time.

		// 2. Insert the operation.
		*lm = *ms[i]
		if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
			return err
		}

		// Cursor can be invalid, seek again.
		binary.BigEndian.PutUint64(cKey[:], lm.Time)
		_, _ = c.Seek(cKey[:8])
		key, value = c.Next()

		// 3. Re-apply all other operations.
		for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
			if err := t.logFromBytes(&tmp, value); err != nil {
				return err
			}
			if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
				return err
			}
			key, value = c.Next()
		}
	}

	return nil
}

func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *Move) error {
	binary.BigEndian.PutUint64(key, op.Time)
	rawLog := t.logToBytes(op)
	if err := lb.Put(key[:8], rawLog); err != nil {
		return err
	}

	return t.redo(b, key, op, rawLog[16:])
}

func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *Move, rawMeta []byte) error {
	var err error

	parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
	if inTree {
		err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
	} else {
		ts = op.Time
		err = b.Delete(oldKey(key, op.Time))
	}

	if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
		return err
	}

	if inTree {
		if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
			return err
		}

		var meta Meta
		if err := meta.FromBytes(currMeta); err != nil {
			return err
		}
		for i := range meta.Items {
			if isAttributeInternal(meta.Items[i].Key) {
				key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
				err := b.Delete(key)
				if err != nil {
					return err
				}
			}
		}
	}
	return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
}

// removeNode removes node keys from the tree except the children key or its parent.
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
	k := stateKey(key, node)
	_, _, rawMeta, _ := t.getState(b, k)

	var meta Meta
	if err := meta.FromBytes(rawMeta); err == nil {
		for i := range meta.Items {
			if isAttributeInternal(meta.Items[i].Key) {
				err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
				if err != nil {
					return err
				}
			}
		}
	}
	return b.Delete(k)
}

// addNode adds node keys to the tree except the timestamp key.
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
	if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
		return err
	}

	err := b.Put(childrenKey(key, child, parent), []byte{1})
	if err != nil {
		return err
	}

	for i := range meta.Items {
		if !isAttributeInternal(meta.Items[i].Key) {
			continue
		}

		key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, child)
		if len(meta.Items) == 1 {
			err = b.Put(key, []byte{1})
		} else {
			err = b.Put(key, []byte{0})
		}
		if err != nil {
			return err
		}
	}
	return nil
}

func (t *boltForest) undo(m *Move, b *bbolt.Bucket, key []byte) error {
	if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
		return err
	}

	parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
	if !ok {
		return t.removeNode(b, key, m.Child, m.Parent)
	}

	var meta Meta
	if err := meta.FromBytes(rawMeta); err != nil {
		return err
	}
	return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
}

func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
	key := make([]byte, 9)
	key[0] = 's'
	for node := child; node != parent; {
		binary.LittleEndian.PutUint64(key[1:], node)
		parent, _, _, ok := t.getState(b, key)
		if !ok {
			return false
		}
		node = parent
	}
	return true
}

// TreeGetByPath implements the Forest interface.
func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeGetByPath", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetByPath",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
			attribute.String("attr", attr),
			attribute.Int("path_count", len(path)),
			attribute.Bool("latest", latest),
		),
	)
	defer span.End()

	if !isAttributeInternal(attr) {
		return nil, ErrNotPathAttribute
	}

	if len(path) == 0 {
		success = true
		return nil, nil
	}

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return nil, ErrDegradedMode
	}

	var nodes []Node

	err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(bucketName(cid, treeID))
		if treeRoot == nil {
			return ErrTreeNotFound
		}

		b := treeRoot.Bucket(dataBucket)

		i, curNode, err := t.getPathPrefix(b, attr, path[:len(path)-1])
		if err != nil {
			return err
		}
		if i < len(path)-1 {
			return nil
		}

		var maxTimestamp uint64

		c := b.Cursor()

		attrKey := internalKey(nil, attr, path[len(path)-1], curNode, 0)
		attrKey = attrKey[:len(attrKey)-8]
		childKey, _ := c.Seek(attrKey)
		for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
			child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
			if latest {
				_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
				if ts >= maxTimestamp {
					nodes = append(nodes[:0], child)
					maxTimestamp = ts
				}
			} else {
				nodes = append(nodes, child)
			}
			childKey, _ = c.Next()
		}
		return nil
	}))
	success = err == nil
	return nodes, err
}

// TreeGetMeta implements the forest interface.
func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeGetMeta", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetMeta",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
			attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return Meta{}, 0, ErrDegradedMode
	}

	key := stateKey(make([]byte, 9), nodeID)

	var m Meta
	var parentID uint64

	err := t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(bucketName(cid, treeID))
		if treeRoot == nil {
			return ErrTreeNotFound
		}

		b := treeRoot.Bucket(dataBucket)
		if data := b.Get(key); len(data) != 0 {
			parentID = binary.LittleEndian.Uint64(data)
		}
		_, _, meta, _ := t.getState(b, stateKey(key, nodeID))
		return m.FromBytes(meta)
	})
	success = err == nil
	return m, parentID, metaerr.Wrap(err)
}

// TreeGetChildren implements the Forest interface.
func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeGetChildren", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetChildren",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
			attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return nil, ErrDegradedMode
	}

	key := make([]byte, 9)
	key[0] = 'c'
	binary.LittleEndian.PutUint64(key[1:], nodeID)

	var result []NodeInfo

	err := t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(bucketName(cid, treeID))
		if treeRoot == nil {
			return ErrTreeNotFound
		}

		b := treeRoot.Bucket(dataBucket)
		c := b.Cursor()
		for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
			childID := binary.LittleEndian.Uint64(k[9:])
			childInfo := NodeInfo{
				ID: childID,
			}
			parentID, _, metaBytes, found := t.getState(b, stateKey(key, childID))
			if found {
				childInfo.ParentID = parentID
				if err := childInfo.Meta.FromBytes(metaBytes); err != nil {
					return err
				}
			}
			result = append(result, childInfo)
		}
		return nil
	})
	success = err == nil
	return result, metaerr.Wrap(err)
}

// TreeList implements the Forest interface.
func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeList", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeList",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return nil, ErrDegradedMode
	}

	var ids []string
	cidRaw := make([]byte, 32)
	cid.Encode(cidRaw)

	cidLen := len(cidRaw)

	err := t.db.View(func(tx *bbolt.Tx) error {
		c := tx.Cursor()
		for k, _ := c.Seek(cidRaw); k != nil; k, _ = c.Next() {
			if !bytes.HasPrefix(k, cidRaw) {
				return nil
			}

			ids = append(ids, string(k[cidLen:]))
		}

		return nil
	})
	if err != nil {
		return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err))
	}
	success = true
	return ids, nil
}

// TreeGetOpLog implements the pilorama.Forest interface.
func (t *boltForest) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error) {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeGetOpLog", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetOpLog",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
			attribute.String("height", strconv.FormatUint(height, 10)),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return Move{}, ErrDegradedMode
	}

	key := make([]byte, 8)
	binary.BigEndian.PutUint64(key, height)

	var lm Move

	err := t.db.View(func(tx *bbolt.Tx) error {
		treeRoot := tx.Bucket(bucketName(cid, treeID))
		if treeRoot == nil {
			return ErrTreeNotFound
		}

		c := treeRoot.Bucket(logBucket).Cursor()
		if _, data := c.Seek(key); data != nil {
			return t.moveFromBytes(&lm, data)
		}
		return nil
	})
	success = err == nil
	return lm, metaerr.Wrap(err)
}

// TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error {
	var (
		startedAt = time.Now()
		success   = false
	)
	defer func() {
		t.metrics.AddMethodDuration("TreeDrop", time.Since(startedAt), success)
	}()

	_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeDrop",
		trace.WithAttributes(
			attribute.String("container_id", cid.EncodeToString()),
			attribute.String("tree_id", treeID),
		),
	)
	defer span.End()

	t.modeMtx.RLock()
	defer t.modeMtx.RUnlock()

	if t.mode.NoMetabase() {
		return ErrDegradedMode
	} else if t.mode.ReadOnly() {
		return ErrReadOnlyMode
	}

	err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
		if treeID == "" {
			c := tx.Cursor()
			prefix := make([]byte, 32)
			cid.Encode(prefix)
			for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
				err := tx.DeleteBucket(k)
				if err != nil {
					return err
				}
			}
			return nil
		}
		err := tx.DeleteBucket(bucketName(cid, treeID))
		if errors.Is(err, bbolt.ErrBucketNotFound) {
			return ErrTreeNotFound
		}
		return err
	}))
	success = err == nil
	return err
}

func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
	c := bTree.Cursor()

	var curNode Node
	var attrKey []byte

loop:
	for i := range path {
		attrKey = internalKey(attrKey, attr, path[i], curNode, 0)
		attrKey = attrKey[:len(attrKey)-8]

		childKey, value := c.Seek(attrKey)
		for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
			if len(value) == 1 && value[0] == 1 {
				curNode = binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
				continue loop
			}
			childKey, value = c.Next()
		}

		return i, curNode, nil
	}

	return len(path), curNode, nil
}

func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
	return t.logFromBytes(m, data)
}

func (t *boltForest) logFromBytes(lm *Move, data []byte) error {
	lm.Child = binary.LittleEndian.Uint64(data)
	lm.Parent = binary.LittleEndian.Uint64(data[8:])
	return lm.Meta.FromBytes(data[16:])
}

func (t *boltForest) logToBytes(lm *Move) []byte {
	w := io.NewBufBinWriter()
	size := 8 + 8 + lm.Meta.Size() + 1
	// if lm.HasOld {
	//	size += 8 + lm.Old.Meta.Size()
	// }

	w.Grow(size)
	w.WriteU64LE(lm.Child)
	w.WriteU64LE(lm.Parent)
	lm.Meta.EncodeBinary(w.BinWriter)
	// w.WriteBool(lm.HasOld)
	// if lm.HasOld {
	//	w.WriteU64LE(lm.Old.Parent)
	//	lm.Old.Meta.EncodeBinary(w.BinWriter)
	// }
	return w.Bytes()
}

func bucketName(cid cidSDK.ID, treeID string) []byte {
	treeRoot := make([]byte, 32+len(treeID))
	cid.Encode(treeRoot)
	copy(treeRoot[32:], treeID)
	return treeRoot
}

// 'o' + time -> old meta.
func oldKey(key []byte, ts Timestamp) []byte {
	key[0] = 'o'
	binary.LittleEndian.PutUint64(key[1:], ts)
	return key[:9]
}

// 's' + child ID -> parent + timestamp of the first appearance + meta.
func stateKey(key []byte, child Node) []byte {
	key[0] = 's'
	binary.LittleEndian.PutUint64(key[1:], child)
	return key[:9]
}

func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
	data := make([]byte, len(meta)+8+8)
	binary.LittleEndian.PutUint64(data, parent)
	binary.LittleEndian.PutUint64(data[8:], timestamp)
	copy(data[16:], meta)
	return b.Put(key, data)
}

func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
	data := b.Get(key)
	if data == nil {
		return 0, 0, nil, false
	}

	parent := binary.LittleEndian.Uint64(data)
	timestamp := binary.LittleEndian.Uint64(data[8:])
	return parent, timestamp, data[16:], true
}

// 'c' + parent (id) + child (id) -> 0/1.
func childrenKey(key []byte, child, parent Node) []byte {
	key[0] = 'c'
	binary.LittleEndian.PutUint64(key[1:], parent)
	binary.LittleEndian.PutUint64(key[9:], child)
	return key[:childrenKeySize]
}

// 'i' + attribute name (string) + attribute value (string) + parent (id) + node (id) -> 0/1.
func internalKey(key []byte, k, v string, parent, node Node) []byte {
	size := 1 /* prefix */ + 2*2 /* len */ + 2*8 /* nodes */ + len(k) + len(v)
	if cap(key) < size {
		key = make([]byte, 0, size)
	}

	key = key[:0]
	key = append(key, 'i')

	l := len(k)
	key = append(key, byte(l), byte(l>>8))
	key = append(key, k...)

	l = len(v)
	key = append(key, byte(l), byte(l>>8))
	key = append(key, v...)

	var raw [8]byte
	binary.LittleEndian.PutUint64(raw[:], parent)
	key = append(key, raw[:]...)

	binary.LittleEndian.PutUint64(raw[:], node)
	key = append(key, raw[:]...)
	return key
}