frostfs-node/pkg/local_object_storage/pilorama/boltdb.go
Evgenii Stratonikov 226dd25dd0 [#1568] pilorama: Replace "containerID" with "container ID" in the error message
It is "container ID" in every other place.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-18 15:52:26 +00:00

1688 lines
41 KiB
Go

package pilorama
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"os"
"path/filepath"
"slices"
"strconv"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/io"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type boltForest struct {
db *bbolt.DB
modeMtx sync.RWMutex
mode mode.Mode
// mtx protects batches field.
mtx sync.Mutex
batches []*batch
cfg
}
const (
childrenKeySize = 17
maxKeySize = childrenKeySize
)
var (
dataBucket = []byte{0}
logBucket = []byte{1}
)
// ErrDegradedMode is returned when pilorama is in a degraded mode.
var ErrDegradedMode = logicerr.New("pilorama is in a degraded mode")
// ErrReadOnlyMode is returned when pilorama is in a read-only mode.
var ErrReadOnlyMode = logicerr.New("pilorama is in a read-only mode")
// NewBoltForest returns storage wrapper for storing operations on CRDT trees.
//
// Each tree is stored in a separate bucket by `CID + treeID` key.
// All integers are stored in little-endian unless explicitly specified otherwise.
//
// DB schema (for a single tree):
// timestamp is 8-byte, id is 4-byte.
//
// log storage (logBucket):
// timestamp in big-endian -> log operation
//
// tree storage (dataBucket):
// - 't' + node (id) -> timestamp when the node first appeared,
// - 'p' + node (id) -> parent (id),
// - 'm' + node (id) -> serialized meta,
// - 'c' + parent (id) + child (id) -> 0/1,
// - 'i' + 0 + attrKey + 0 + attrValue + 0 + parent (id) + node (id) -> 0/1 (1 for automatically created nodes).
func NewBoltForest(opts ...Option) ForestStorage {
b := boltForest{
cfg: cfg{
perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize,
openFile: os.OpenFile,
metrics: &noopMetrics{},
},
mode: mode.Disabled,
}
for i := range opts {
opts[i](&b.cfg)
}
return &b
}
func (t *boltForest) SetMode(ctx context.Context, m mode.Mode) error {
t.modeMtx.Lock()
defer t.modeMtx.Unlock()
if t.mode == m {
return nil
}
err := t.Close(ctx)
if err == nil && !m.NoMetabase() {
if err = t.openBolt(m); err == nil {
err = t.Init(ctx)
}
}
if err != nil {
return fmt.Errorf("set pilorama mode (old=%s, new=%s): %w", t.mode, m, err)
}
t.mode = m
t.metrics.SetMode(mode.ConvertToComponentModeDegraded(m))
return nil
}
func (t *boltForest) Open(_ context.Context, mode mode.Mode) error {
t.modeMtx.Lock()
defer t.modeMtx.Unlock()
t.mode = mode
if mode.NoMetabase() {
return nil
}
return t.openBolt(mode)
}
func (t *boltForest) openBolt(m mode.Mode) error {
readOnly := m.ReadOnly()
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
if err != nil {
return metaerr.Wrap(fmt.Errorf("create dir %s for the pilorama: %w", t.path, err))
}
opts := *bbolt.DefaultOptions
opts.ReadOnly = readOnly
opts.NoSync = t.noSync
opts.Timeout = 100 * time.Millisecond
opts.OpenFile = t.openFile
t.db, err = bbolt.Open(t.path, t.perm, &opts)
if err != nil {
return metaerr.Wrap(fmt.Errorf("open the pilorama DB: %w", err))
}
t.db.MaxBatchSize = t.maxBatchSize
t.db.MaxBatchDelay = t.maxBatchDelay
t.metrics.SetMode(mode.ConvertToComponentModeDegraded(m))
return nil
}
func (t *boltForest) Init(context.Context) error {
if t.mode.NoMetabase() || t.db.IsReadOnly() {
return nil
}
return t.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(dataBucket)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(logBucket)
return err
})
}
func (t *boltForest) Close(context.Context) error {
var err error
if t.db != nil {
err = t.db.Close()
}
if err == nil {
t.metrics.Close()
}
return err
}
func (t *boltForest) SetParentID(id string) {
t.metrics.SetParentID(id)
}
// TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(ctx context.Context, d CIDDescriptor, treeID string, m *Move) (*Move, error) {
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeMove",
trace.WithAttributes(
attribute.String("container_id", d.CID.EncodeToString()),
attribute.Int("position", d.Position),
attribute.Int("size", d.Size),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor
}
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
} else if t.mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
lm := *m
fullID := bucketName(d.CID, treeID)
return &lm, metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
lm.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
if lm.Child == RootID {
lm.Child = t.findSpareID(bTree)
}
return t.do(bLog, bTree, make([]byte, maxKeySize), &lm)
}))
}
func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, metaerr.Wrap(err)
}
// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeExists", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return false, ErrDegradedMode
}
var exists bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
exists = treeRoot != nil
return nil
})
success = err == nil
return exists, metaerr.Wrap(err)
}
var syncHeightKey = []byte{'h'}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeUpdateLastSyncHeight", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeUpdateLastSyncHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("height", strconv.FormatUint(height, 10)),
),
)
defer span.End()
rawHeight := make([]byte, 8)
binary.LittleEndian.PutUint64(rawHeight, height)
buck := bucketName(cid, treeID)
err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck)
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
return b.Put(syncHeightKey, rawHeight)
}))
success = err == nil
return err
}
// TreeLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeLastSyncHeight", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeLastSyncHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
var height uint64
buck := bucketName(cid, treeID)
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck)
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
data := b.Get(syncHeightKey)
if len(data) == 8 {
height = binary.LittleEndian.Uint64(data)
}
return nil
})
success = err == nil
return height, metaerr.Wrap(err)
}
// TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeAddByPath", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeAddByPath",
trace.WithAttributes(
attribute.String("container_id", d.CID.EncodeToString()),
attribute.Int("position", d.Position),
attribute.Int("size", d.Size),
attribute.String("tree_id", treeID),
attribute.String("attr", attr),
attribute.Int("path_count", len(path)),
attribute.Int("meta_count", len(meta)),
),
)
defer span.End()
res, err := t.addByPathInternal(d, attr, treeID, path, meta)
success = err == nil
return res, err
}
func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID string, path []string, meta []KeyValue) ([]Move, error) {
if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor
}
if !isAttributeInternal(attr) {
return nil, ErrNotPathAttribute
}
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
} else if t.mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
var lm []Move
var key [maxKeySize]byte
fullID := bucketName(d.CID, treeID)
err := t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
i, node, err := t.getPathPrefix(bTree, attr, path)
if err != nil {
return err
}
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
lm = make([]Move, len(path)-i+1)
for j := i; j < len(path); j++ {
lm[j-i] = Move{
Parent: node,
Meta: Meta{
Time: ts,
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}},
},
Child: t.findSpareID(bTree),
}
err := t.do(bLog, bTree, key[:], &lm[j-i])
if err != nil {
return err
}
ts = nextTimestamp(ts, uint64(d.Position), uint64(d.Size))
node = lm[j-i].Child
}
lm[len(lm)-1] = Move{
Parent: node,
Meta: Meta{
Time: ts,
Items: meta,
},
Child: t.findSpareID(bTree),
}
return t.do(bLog, bTree, key[:], &lm[len(lm)-1])
})
return lm, metaerr.Wrap(err)
}
// getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than
// all timestamps corresponding to already stored operations.
func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint64 {
var ts uint64
c := bLog.Cursor()
key, _ := c.Last()
if len(key) != 0 {
ts = binary.BigEndian.Uint64(key)
}
return nextTimestamp(ts, uint64(pos), uint64(size))
}
// findSpareID returns random unused ID.
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
id := uint64(rand.Int63())
key := make([]byte, 9)
for {
_, _, _, ok := t.getState(bTree, stateKey(key, id))
if !ok {
return id
}
id = uint64(rand.Int63())
}
}
// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApply", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApply",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.Bool("background", backgroundSync),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
if backgroundSync {
var seen bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil {
success = true
return nil
}
b := treeRoot.Bucket(logBucket)
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], m.Time)
seen = b.Get(logKey[:]) != nil
success = true
return nil
})
if err != nil || seen {
success = err == nil
return metaerr.Wrap(err)
}
}
if t.db.MaxBatchSize == 1 {
fullID := bucketName(cnr, treeID)
err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
var lm Move
return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
}))
success = err == nil
return err
}
ch := make(chan error, 1)
t.addBatch(cnr, treeID, m, ch)
err := <-ch
success = err == nil
return metaerr.Wrap(err)
}
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
m, err := t.filterSeen(cnr, treeID, m)
if err != nil {
return err
}
if len(m) == 0 {
success = true
return nil
}
ch := make(chan error)
b := &batch{
forest: t,
cid: cnr,
treeID: treeID,
results: []chan<- error{ch},
operations: m,
}
go func() {
b.run()
}()
err = <-ch
success = err == nil
return metaerr.Wrap(err)
}
func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
ops := make([]*Move, 0, len(m))
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil {
ops = m
return nil
}
b := treeRoot.Bucket(logBucket)
for _, op := range m {
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], op.Time)
seen := b.Get(logKey[:]) != nil
if !seen {
ops = append(ops, op)
}
}
return nil
})
if err != nil {
return nil, metaerr.Wrap(err)
}
return ops, nil
}
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyStream", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyStream",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
fullID := bucketName(cnr, treeID)
err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
var lm Move
if e := t.applyOperation(bLog, bTree, []*Move{m}, &lm); e != nil {
return e
}
}
}
}))
success = err == nil
return err
}
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
t.mtx.Lock()
for i := 0; i < len(t.batches); i++ {
t.batches[i].mtx.Lock()
if t.batches[i].timer == nil {
t.batches[i].mtx.Unlock()
copy(t.batches[i:], t.batches[i+1:])
t.batches = t.batches[:len(t.batches)-1]
i--
continue
}
found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
if found {
t.batches[i].results = append(t.batches[i].results, ch)
t.batches[i].operations = append(t.batches[i].operations, m)
if len(t.batches[i].operations) == t.db.MaxBatchSize {
t.batches[i].timer.Stop()
t.batches[i].timer = nil
t.batches[i].mtx.Unlock()
b := t.batches[i]
t.mtx.Unlock()
b.trigger()
return
}
t.batches[i].mtx.Unlock()
t.mtx.Unlock()
return
}
t.batches[i].mtx.Unlock()
}
b := &batch{
forest: t,
cid: cnr,
treeID: treeID,
results: []chan<- error{ch},
operations: []*Move{m},
}
b.mtx.Lock()
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
b.mtx.Unlock()
t.batches = append(t.batches, b)
t.mtx.Unlock()
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, treeRoot []byte) (*bbolt.Bucket, *bbolt.Bucket, error) {
child := tx.Bucket(treeRoot)
if child != nil {
return child.Bucket(logBucket), child.Bucket(dataBucket), nil
}
child, err := tx.CreateBucket(treeRoot)
if err != nil {
return nil, nil, err
}
bLog, err := child.CreateBucket(logBucket)
if err != nil {
return nil, nil, err
}
bData, err := child.CreateBucket(dataBucket)
if err != nil {
return nil, nil, err
}
return bLog, bData, nil
}
// applyOperations applies log operations. Assumes lm are sorted by timestamp.
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *Move) error {
var tmp Move
var cKey [maxKeySize]byte
c := logBucket.Cursor()
key, value := c.Last()
b := bytes.NewReader(nil)
r := io.NewBinReaderFromIO(b)
// 1. Undo up until the desired timestamp is here.
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
b.Reset(value)
tmp.Child = r.ReadU64LE()
tmp.Parent = r.ReadU64LE()
tmp.Time = r.ReadVarUint()
if r.Err != nil {
return r.Err
}
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
return err
}
key, value = c.Prev()
}
for i := range ms {
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation.
*lm = *ms[i]
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err
}
// Cursor can be invalid, seek again.
binary.BigEndian.PutUint64(cKey[:], lm.Time)
_, _ = c.Seek(cKey[:8])
key, value = c.Next()
// 3. Re-apply all other operations.
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
if err := t.logFromBytes(&tmp, value); err != nil {
return err
}
if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
return err
}
key, value = c.Next()
}
}
return nil
}
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *Move) error {
binary.BigEndian.PutUint64(key, op.Time)
rawLog := t.logToBytes(op)
if err := lb.Put(key[:8], rawLog); err != nil {
return err
}
return t.redo(b, key, op, rawLog[16:])
}
func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *Move, rawMeta []byte) error {
var err error
parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
if inTree {
err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
} else {
ts = op.Time
err = b.Delete(oldKey(key, op.Time))
}
if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
return err
}
if inTree {
if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
return err
}
var meta Meta
if err := meta.FromBytes(currMeta); err != nil {
return err
}
for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) {
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
err := b.Delete(key)
if err != nil {
return err
}
}
}
}
return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
}
// removeNode removes node keys from the tree except the children key or its parent.
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
k := stateKey(key, node)
_, _, rawMeta, _ := t.getState(b, k)
var meta Meta
if err := meta.FromBytes(rawMeta); err == nil {
for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) {
err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
if err != nil {
return err
}
}
}
}
return b.Delete(k)
}
// addNode adds node keys to the tree except the timestamp key.
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
return err
}
err := b.Put(childrenKey(key, child, parent), []byte{1})
if err != nil {
return err
}
for i := range meta.Items {
if !isAttributeInternal(meta.Items[i].Key) {
continue
}
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, child)
if len(meta.Items) == 1 {
err = b.Put(key, []byte{1})
} else {
err = b.Put(key, []byte{0})
}
if err != nil {
return err
}
}
return nil
}
func (t *boltForest) undo(m *Move, b *bbolt.Bucket, key []byte) error {
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
return err
}
parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
if !ok {
return t.removeNode(b, key, m.Child, m.Parent)
}
var meta Meta
if err := meta.FromBytes(rawMeta); err != nil {
return err
}
return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
}
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
key := make([]byte, 9)
key[0] = 's'
for node := child; node != parent; {
binary.LittleEndian.PutUint64(key[1:], node)
parent, _, _, ok := t.getState(b, key)
if !ok {
return false
}
node = parent
}
return true
}
// TreeGetByPath implements the Forest interface.
func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetByPath", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetByPath",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("attr", attr),
attribute.Int("path_count", len(path)),
attribute.Bool("latest", latest),
),
)
defer span.End()
if !isAttributeInternal(attr) {
return nil, ErrNotPathAttribute
}
if len(path) == 0 {
success = true
return nil, nil
}
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var nodes []Node
err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1])
if err != nil {
return err
}
if i < len(path)-1 {
return nil
}
var maxTimestamp uint64
c := b.Cursor()
for i := range curNodes {
attrKey := internalKey(nil, attr, path[len(path)-1], curNodes[i], 0)
attrKey = attrKey[:len(attrKey)-8]
childKey, _ := c.Seek(attrKey)
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
if latest {
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
if ts >= maxTimestamp {
nodes = append(nodes[:0], child)
maxTimestamp = ts
}
} else {
nodes = append(nodes, child)
}
childKey, _ = c.Next()
}
}
return nil
}))
success = err == nil
return nodes, err
}
// TreeGetMeta implements the forest interface.
func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetMeta", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetMeta",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return Meta{}, 0, ErrDegradedMode
}
key := stateKey(make([]byte, 9), nodeID)
var m Meta
var parentID uint64
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
if data := b.Get(key); len(data) != 0 {
parentID = binary.LittleEndian.Uint64(data)
}
_, _, meta, _ := t.getState(b, stateKey(key, nodeID))
return m.FromBytes(meta)
})
success = err == nil
return m, parentID, metaerr.Wrap(err)
}
func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeIDs MultiNode, threshold int) bool {
key := make([]byte, 9)
key[0] = 'c'
count := 0
for _, nodeID := range nodeIDs {
binary.LittleEndian.PutUint64(key[1:], nodeID)
c := b.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
if count++; count > threshold {
return false
}
}
}
return true
}
// TreeSortedByFilename implements the Forest interface.
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeIDs MultiNode, last *string, count int) ([]MultiNodeInfo, *string, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeSortedByFilename", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, last, ErrDegradedMode
}
if len(nodeIDs) == 0 {
return nil, last, errors.New("empty node list")
}
h := newHeap(last, count)
key := make([]byte, 9)
var result []NodeInfo
var fewChildren bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
// If the node is a leaf, we could scan all filenames in the tree.
// To prevent this we first count the number of children: if it is less than
// the number of nodes we need to return, fallback to TreeGetChildren() implementation.
if fewChildren = t.hasFewChildren(b, nodeIDs, count); fewChildren {
var err error
result, err = t.getChildren(b, nodeIDs)
return err
}
t.fillSortedChildren(b, nodeIDs, h)
for info, ok := h.pop(); ok; info, ok = h.pop() {
for _, id := range info.id {
childInfo, err := t.getChildInfo(b, key, id)
if err != nil {
return err
}
result = append(result, childInfo)
}
}
return nil
})
success = err == nil
if err != nil {
return nil, last, metaerr.Wrap(err)
}
if fewChildren {
result = sortAndCut(result, last)
}
res := mergeNodeInfos(result)
if len(res) > count {
res = res[:count]
}
if len(res) != 0 {
s := string(findAttr(res[len(res)-1].Meta, AttributeFilename))
last = &s
}
return res, last, metaerr.Wrap(err)
}
func sortByFilename(nodes []NodeInfo) {
slices.SortFunc(nodes, func(a, b NodeInfo) int {
return bytes.Compare(a.Meta.GetAttr(AttributeFilename), b.Meta.GetAttr(AttributeFilename))
})
}
func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
var lastBytes []byte
if last != nil {
lastBytes = []byte(*last)
}
sortByFilename(result)
for i := range result {
if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
return result[i:]
}
}
return nil
}
func (t *boltForest) getChildInfo(b *bbolt.Bucket, key []byte, childID Node) (NodeInfo, error) {
childInfo := NodeInfo{ID: childID}
parentID, _, metaBytes, found := t.getState(b, stateKey(key, childID))
if found {
childInfo.ParentID = parentID
if err := childInfo.Meta.FromBytes(metaBytes); err != nil {
return NodeInfo{}, err
}
}
return childInfo, nil
}
func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap) {
c := b.Cursor()
prefix := internalKeyPrefix(nil, AttributeFilename)
length := uint16(0)
count := 0
var nodes []uint64
var lastFilename *string
for k, _ := c.Seek(prefix); len(k) > 0 && k[0] == 'i'; k, _ = c.Next() {
if len(k) < len(prefix)+2+16 {
continue
}
parentID := binary.LittleEndian.Uint64(k[len(k)-16:])
if !slices.Contains(nodeIDs, parentID) {
continue
}
actualLength := binary.LittleEndian.Uint16(k[len(prefix):])
childID := binary.LittleEndian.Uint64(k[len(k)-8:])
filename := string(k[len(prefix)+2 : len(k)-16])
if lastFilename == nil {
lastFilename = &filename
nodes = append(nodes, childID)
} else if *lastFilename == filename {
nodes = append(nodes, childID)
} else {
processed := h.push(nodes, *lastFilename)
nodes = MultiNode{childID}
lastFilename = &filename
if actualLength != length {
length = actualLength
count = 1
} else if processed {
if count++; count > h.count {
lastFilename = nil
nodes = nil
length = actualLength + 1
count = 0
c.Seek(binary.LittleEndian.AppendUint16(prefix, length))
c.Prev() // c.Next() will be performed by for loop
}
}
}
}
if len(nodes) != 0 && lastFilename != nil {
h.push(nodes, *lastFilename)
}
}
// TreeGetChildren implements the Forest interface.
func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetChildren", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetChildren",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("node_id", strconv.FormatUint(nodeID, 10)),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var result []NodeInfo
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
b := treeRoot.Bucket(dataBucket)
var err error
result, err = t.getChildren(b, []Node{nodeID})
return err
})
success = err == nil
return result, metaerr.Wrap(err)
}
func (t *boltForest) getChildren(b *bbolt.Bucket, nodeIDs MultiNode) ([]NodeInfo, error) {
var result []NodeInfo
key := make([]byte, 9)
for _, nodeID := range nodeIDs {
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
c := b.Cursor()
for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() {
childID := binary.LittleEndian.Uint64(k[9:])
childInfo, err := t.getChildInfo(b, key, childID)
if err != nil {
return nil, err
}
result = append(result, childInfo)
}
}
return result, nil
}
// TreeList implements the Forest interface.
func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeList", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeList",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var ids []string
cidRaw := make([]byte, 32)
cid.Encode(cidRaw)
cidLen := len(cidRaw)
err := t.db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
for k, _ := c.Seek(cidRaw); k != nil; k, _ = c.Next() {
if !bytes.HasPrefix(k, cidRaw) {
return nil
}
ids = append(ids, string(k[cidLen:]))
}
return nil
})
if err != nil {
return nil, metaerr.Wrap(fmt.Errorf("list trees: %w", err))
}
success = true
return ids, nil
}
// TreeGetOpLog implements the pilorama.Forest interface.
func (t *boltForest) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeGetOpLog", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetOpLog",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
attribute.String("height", strconv.FormatUint(height, 10)),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return Move{}, ErrDegradedMode
}
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, height)
var lm Move
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot == nil {
return ErrTreeNotFound
}
c := treeRoot.Bucket(logBucket).Cursor()
if _, data := c.Seek(key); data != nil {
return t.moveFromBytes(&lm, data)
}
return nil
})
success = err == nil
return lm, metaerr.Wrap(err)
}
// TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeDrop", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeDrop",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error {
if treeID == "" {
c := tx.Cursor()
prefix := make([]byte, 32)
cid.Encode(prefix)
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Seek(prefix) {
err := tx.DeleteBucket(k)
if err != nil {
return err
}
_, _ = c.First() // rewind the cursor to the root page
}
return nil
}
err := tx.DeleteBucket(bucketName(cid, treeID))
if errors.Is(err, bbolt.ErrBucketNotFound) {
return ErrTreeNotFound
}
return err
}))
success = err == nil
return err
}
// TreeListTrees implements ForestStorage.
func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeListTrees", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeListTrees")
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
var res TreeListTreesResult
err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
checkNextPageToken := true
for k, _ := c.Seek(prm.NextPageToken); k != nil; k, _ = c.Next() {
if bytes.Equal(k, dataBucket) || bytes.Equal(k, logBucket) {
continue
}
if checkNextPageToken && bytes.Equal(k, prm.NextPageToken) {
checkNextPageToken = false
continue
}
var contID cidSDK.ID
if err := contID.Decode(k[:32]); err != nil {
return fmt.Errorf("decode container ID: %w", err)
}
res.Items = append(res.Items, ContainerIDTreeID{
CID: contID,
TreeID: string(k[32:]),
})
if len(res.Items) == batchSize {
res.NextPageToken = make([]byte, len(k))
copy(res.NextPageToken, k)
break
}
}
return nil
}))
success = err == nil
if err != nil {
return nil, err
}
return &res, nil
}
func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) {
c := bTree.Cursor()
var curNodes []Node
nextNodes := []Node{RootID}
var attrKey []byte
for i := range path {
curNodes, nextNodes = nextNodes, curNodes[:0]
for j := range curNodes {
attrKey = internalKey(attrKey, attr, path[i], curNodes[j], 0)
attrKey = attrKey[:len(attrKey)-8]
childKey, value := c.Seek(attrKey)
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
if len(value) == 1 && value[0] == 1 {
nextNodes = append(nextNodes, binary.LittleEndian.Uint64(childKey[len(childKey)-8:]))
}
childKey, value = c.Next()
}
}
if len(nextNodes) == 0 {
return i, curNodes, nil
}
}
return len(path), nextNodes, nil
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
c := bTree.Cursor()
var curNode Node
var attrKey []byte
loop:
for i := range path {
attrKey = internalKey(attrKey, attr, path[i], curNode, 0)
attrKey = attrKey[:len(attrKey)-8]
childKey, value := c.Seek(attrKey)
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
if len(value) == 1 && value[0] == 1 {
curNode = binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
continue loop
}
childKey, value = c.Next()
}
return i, curNode, nil
}
return len(path), curNode, nil
}
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
return t.logFromBytes(m, data)
}
func (t *boltForest) logFromBytes(lm *Move, data []byte) error {
lm.Child = binary.LittleEndian.Uint64(data)
lm.Parent = binary.LittleEndian.Uint64(data[8:])
return lm.Meta.FromBytes(data[16:])
}
func (t *boltForest) logToBytes(lm *Move) []byte {
w := io.NewBufBinWriter()
size := 8 + 8 + lm.Meta.Size() + 1
// if lm.HasOld {
// size += 8 + lm.Old.Meta.Size()
// }
w.Grow(size)
w.WriteU64LE(lm.Child)
w.WriteU64LE(lm.Parent)
lm.Meta.EncodeBinary(w.BinWriter)
// w.WriteBool(lm.HasOld)
// if lm.HasOld {
// w.WriteU64LE(lm.Old.Parent)
// lm.Old.Meta.EncodeBinary(w.BinWriter)
// }
return w.Bytes()
}
func bucketName(cid cidSDK.ID, treeID string) []byte {
treeRoot := make([]byte, 32+len(treeID))
cid.Encode(treeRoot)
copy(treeRoot[32:], treeID)
return treeRoot
}
// 'o' + time -> old meta.
func oldKey(key []byte, ts Timestamp) []byte {
key[0] = 'o'
binary.LittleEndian.PutUint64(key[1:], ts)
return key[:9]
}
// 's' + child ID -> parent + timestamp of the first appearance + meta.
func stateKey(key []byte, child Node) []byte {
key[0] = 's'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
}
func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
data := make([]byte, len(meta)+8+8)
binary.LittleEndian.PutUint64(data, parent)
binary.LittleEndian.PutUint64(data[8:], timestamp)
copy(data[16:], meta)
return b.Put(key, data)
}
func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
data := b.Get(key)
if data == nil {
return 0, 0, nil, false
}
parent := binary.LittleEndian.Uint64(data)
timestamp := binary.LittleEndian.Uint64(data[8:])
return parent, timestamp, data[16:], true
}
// 'c' + parent (id) + child (id) -> 0/1.
func childrenKey(key []byte, child, parent Node) []byte {
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], parent)
binary.LittleEndian.PutUint64(key[9:], child)
return key[:childrenKeySize]
}
func internalKeyPrefix(key []byte, k string) []byte {
key = key[:0]
key = append(key, 'i')
l := len(k)
key = binary.LittleEndian.AppendUint16(key, uint16(l))
key = append(key, k...)
return key
}
// 'i' + attribute name (string) + attribute value (string) + parent (id) + node (id) -> 0/1.
func internalKey(key []byte, k, v string, parent, node Node) []byte {
size := 1 /* prefix */ + 2*2 /* len */ + 2*8 /* nodes */ + len(k) + len(v)
if cap(key) < size {
key = make([]byte, 0, size)
}
key = internalKeyPrefix(key, k)
l := len(v)
key = binary.LittleEndian.AppendUint16(key, uint16(l))
key = append(key, v...)
key = binary.LittleEndian.AppendUint64(key, parent)
key = binary.LittleEndian.AppendUint64(key, node)
return key
}