package pilorama import ( "bytes" "context" "encoding/binary" "errors" "fmt" "math/rand" "os" "path/filepath" "sort" "strconv" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "github.com/nspcc-dev/neo-go/pkg/io" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) type boltForest struct { db *bbolt.DB modeMtx sync.RWMutex mode mode.Mode // mtx protects batches field. mtx sync.Mutex batches []*batch cfg } const ( childrenKeySize = 17 maxKeySize = childrenKeySize ) var ( dataBucket = []byte{0} logBucket = []byte{1} ) // ErrDegradedMode is returned when pilorama is in a degraded mode. var ErrDegradedMode = logicerr.New("pilorama is in a degraded mode") // ErrReadOnlyMode is returned when pilorama is in a read-only mode. var ErrReadOnlyMode = logicerr.New("pilorama is in a read-only mode") // NewBoltForest returns storage wrapper for storing operations on CRDT trees. // // Each tree is stored in a separate bucket by `CID + treeID` key. // All integers are stored in little-endian unless explicitly specified otherwise. // // DB schema (for a single tree): // timestamp is 8-byte, id is 4-byte. // // log storage (logBucket): // timestamp in big-endian -> log operation // // tree storage (dataBucket): // - 't' + node (id) -> timestamp when the node first appeared, // - 'p' + node (id) -> parent (id), // - 'm' + node (id) -> serialized meta, // - 'c' + parent (id) + child (id) -> 0/1, // - 'i' + 0 + attrKey + 0 + attrValue + 0 + parent (id) + node (id) -> 0/1 (1 for automatically created nodes). func NewBoltForest(opts ...Option) ForestStorage { b := boltForest{ cfg: cfg{ perm: os.ModePerm, maxBatchDelay: bbolt.DefaultMaxBatchDelay, maxBatchSize: bbolt.DefaultMaxBatchSize, openFile: os.OpenFile, metrics: &noopMetrics{}, }, mode: mode.Disabled, } for i := range opts { opts[i](&b.cfg) } return &b } func (t *boltForest) SetMode(m mode.Mode) error { t.modeMtx.Lock() defer t.modeMtx.Unlock() if t.mode == m { return nil } err := t.Close() if err == nil && !m.NoMetabase() { if err = t.openBolt(m); err == nil { err = t.Init() } } if err != nil { return fmt.Errorf("can't set pilorama mode (old=%s, new=%s): %w", t.mode, m, err) } t.mode = m t.metrics.SetMode(m) return nil } func (t *boltForest) Open(_ context.Context, mode mode.Mode) error { t.modeMtx.Lock() defer t.modeMtx.Unlock() t.mode = mode if mode.NoMetabase() { return nil } return t.openBolt(mode) } func (t *boltForest) openBolt(mode mode.Mode) error { readOnly := mode.ReadOnly() err := util.MkdirAllX(filepath.Dir(t.path), t.perm) if err != nil { return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err)) } opts := *bbolt.DefaultOptions opts.ReadOnly = readOnly opts.NoSync = t.noSync opts.Timeout = 100 * time.Millisecond opts.OpenFile = t.openFile t.db, err = bbolt.Open(t.path, t.perm, &opts) if err != nil { return metaerr.Wrap(fmt.Errorf("can't open the pilorama DB: %w", err)) } t.db.MaxBatchSize = t.maxBatchSize t.db.MaxBatchDelay = t.maxBatchDelay t.metrics.SetMode(mode) return nil } func (t *boltForest) Init() error { if t.mode.NoMetabase() || t.db.IsReadOnly() { return nil } return t.db.Update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(dataBucket) if err != nil { return err } _, err = tx.CreateBucketIfNotExists(logBucket) return err }) } func (t *boltForest) Close() error { var err error if t.db != nil { err = t.db.Close() } if err == nil { t.metrics.Close() } return err } func (t *boltForest) SetParentID(id string) { t.metrics.SetParentID(id) } // TreeMove implements the Forest interface. func (t *boltForest) TreeMove(ctx context.Context, d CIDDescriptor, treeID string, m *Move) (*Move, error) { _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeMove", trace.WithAttributes( attribute.String("container_id", d.CID.EncodeToString()), attribute.Int("position", d.Position), attribute.Int("size", d.Size), attribute.String("tree_id", treeID), ), ) defer span.End() if !d.checkValid() { return nil, ErrInvalidCIDDescriptor } t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, ErrDegradedMode } else if t.mode.ReadOnly() { return nil, ErrReadOnlyMode } lm := *m fullID := bucketName(d.CID, treeID) return &lm, metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, fullID) if err != nil { return err } lm.Time = t.getLatestTimestamp(bLog, d.Position, d.Size) if lm.Child == RootID { lm.Child = t.findSpareID(bTree) } return t.do(bLog, bTree, make([]byte, maxKeySize), &lm) })) } func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeHeight", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return 0, ErrDegradedMode } var height uint64 var retErr error err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) if treeRoot != nil { k, _ := treeRoot.Bucket(logBucket).Cursor().Last() height = binary.BigEndian.Uint64(k) } else { retErr = ErrTreeNotFound } return nil }) if err == nil { err = retErr } return height, metaerr.Wrap(err) } // TreeExists implements the Forest interface. func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeExists", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return false, ErrDegradedMode } var exists bool err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) exists = treeRoot != nil return nil }) success = err == nil return exists, metaerr.Wrap(err) } var syncHeightKey = []byte{'h'} // TreeUpdateLastSyncHeight implements the pilorama.Forest interface. func (t *boltForest) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeUpdateLastSyncHeight", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeUpdateLastSyncHeight", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("height", strconv.FormatUint(height, 10)), ), ) defer span.End() rawHeight := make([]byte, 8) binary.LittleEndian.PutUint64(rawHeight, height) buck := bucketName(cid, treeID) err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(buck) if treeRoot == nil { return ErrTreeNotFound } b := treeRoot.Bucket(dataBucket) return b.Put(syncHeightKey, rawHeight) })) success = err == nil return err } // TreeLastSyncHeight implements the pilorama.Forest interface. func (t *boltForest) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeLastSyncHeight", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeLastSyncHeight", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() var height uint64 buck := bucketName(cid, treeID) err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(buck) if treeRoot == nil { return ErrTreeNotFound } b := treeRoot.Bucket(dataBucket) data := b.Get(syncHeightKey) if len(data) == 8 { height = binary.LittleEndian.Uint64(data) } return nil }) success = err == nil return height, metaerr.Wrap(err) } // TreeAddByPath implements the Forest interface. func (t *boltForest) TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeAddByPath", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeAddByPath", trace.WithAttributes( attribute.String("container_id", d.CID.EncodeToString()), attribute.Int("position", d.Position), attribute.Int("size", d.Size), attribute.String("tree_id", treeID), attribute.String("attr", attr), attribute.Int("path_count", len(path)), attribute.Int("meta_count", len(meta)), ), ) defer span.End() res, err := t.addByPathInternal(d, attr, treeID, path, meta) success = err == nil return res, err } func (t *boltForest) addByPathInternal(d CIDDescriptor, attr string, treeID string, path []string, meta []KeyValue) ([]Move, error) { if !d.checkValid() { return nil, ErrInvalidCIDDescriptor } if !isAttributeInternal(attr) { return nil, ErrNotPathAttribute } t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, ErrDegradedMode } else if t.mode.ReadOnly() { return nil, ErrReadOnlyMode } var lm []Move var key [maxKeySize]byte fullID := bucketName(d.CID, treeID) err := t.db.Batch(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, fullID) if err != nil { return err } i, node, err := t.getPathPrefix(bTree, attr, path) if err != nil { return err } ts := t.getLatestTimestamp(bLog, d.Position, d.Size) lm = make([]Move, len(path)-i+1) for j := i; j < len(path); j++ { lm[j-i] = Move{ Parent: node, Meta: Meta{ Time: ts, Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}, }, Child: t.findSpareID(bTree), } err := t.do(bLog, bTree, key[:], &lm[j-i]) if err != nil { return err } ts = nextTimestamp(ts, uint64(d.Position), uint64(d.Size)) node = lm[j-i].Child } lm[len(lm)-1] = Move{ Parent: node, Meta: Meta{ Time: ts, Items: meta, }, Child: t.findSpareID(bTree), } return t.do(bLog, bTree, key[:], &lm[len(lm)-1]) }) return lm, metaerr.Wrap(err) } // getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than // all timestamps corresponding to already stored operations. func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint64 { var ts uint64 c := bLog.Cursor() key, _ := c.Last() if len(key) != 0 { ts = binary.BigEndian.Uint64(key) } return nextTimestamp(ts, uint64(pos), uint64(size)) } // findSpareID returns random unused ID. func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { id := uint64(rand.Int63()) key := make([]byte, 9) for { _, _, _, ok := t.getState(bTree, stateKey(key, id)) if !ok { return id } id = uint64(rand.Int63()) } } // TreeApply implements the Forest interface. func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeApply", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApply", trace.WithAttributes( attribute.String("container_id", cnr.EncodeToString()), attribute.String("tree_id", treeID), attribute.Bool("background", backgroundSync), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return ErrDegradedMode } else if t.mode.ReadOnly() { return ErrReadOnlyMode } if backgroundSync { var seen bool err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cnr, treeID)) if treeRoot == nil { success = true return nil } b := treeRoot.Bucket(logBucket) var logKey [8]byte binary.BigEndian.PutUint64(logKey[:], m.Time) seen = b.Get(logKey[:]) != nil success = true return nil }) if err != nil || seen { success = err == nil return metaerr.Wrap(err) } } if t.db.MaxBatchSize == 1 { fullID := bucketName(cnr, treeID) err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, fullID) if err != nil { return err } var lm Move return t.applyOperation(bLog, bTree, []*Move{m}, &lm) })) success = err == nil return err } ch := make(chan error, 1) t.addBatch(cnr, treeID, m, ch) err := <-ch success = err == nil return metaerr.Wrap(err) } // TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed. func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeApplyStream", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyStream", trace.WithAttributes( attribute.String("container_id", cnr.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return ErrDegradedMode } else if t.mode.ReadOnly() { return ErrReadOnlyMode } fullID := bucketName(cnr, treeID) err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, fullID) if err != nil { return err } for { select { case <-ctx.Done(): return ctx.Err() case m, ok := <-source: if !ok { return nil } var lm Move if e := t.applyOperation(bLog, bTree, []*Move{m}, &lm); e != nil { return e } } } })) success = err == nil return err } func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { t.mtx.Lock() for i := 0; i < len(t.batches); i++ { t.batches[i].mtx.Lock() if t.batches[i].timer == nil { t.batches[i].mtx.Unlock() copy(t.batches[i:], t.batches[i+1:]) t.batches = t.batches[:len(t.batches)-1] i-- continue } found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID if found { t.batches[i].results = append(t.batches[i].results, ch) t.batches[i].operations = append(t.batches[i].operations, m) if len(t.batches[i].operations) == t.db.MaxBatchSize { t.batches[i].timer.Stop() t.batches[i].timer = nil t.batches[i].mtx.Unlock() b := t.batches[i] t.mtx.Unlock() b.trigger() return } t.batches[i].mtx.Unlock() t.mtx.Unlock() return } t.batches[i].mtx.Unlock() } b := &batch{ forest: t, cid: cnr, treeID: treeID, results: []chan<- error{ch}, operations: []*Move{m}, } b.mtx.Lock() b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger) b.mtx.Unlock() t.batches = append(t.batches, b) t.mtx.Unlock() } func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, treeRoot []byte) (*bbolt.Bucket, *bbolt.Bucket, error) { child := tx.Bucket(treeRoot) if child != nil { return child.Bucket(logBucket), child.Bucket(dataBucket), nil } child, err := tx.CreateBucket(treeRoot) if err != nil { return nil, nil, err } bLog, err := child.CreateBucket(logBucket) if err != nil { return nil, nil, err } bData, err := child.CreateBucket(dataBucket) if err != nil { return nil, nil, err } return bLog, bData, nil } // applyOperations applies log operations. Assumes lm are sorted by timestamp. func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *Move) error { var tmp Move var cKey [maxKeySize]byte c := logBucket.Cursor() key, value := c.Last() b := bytes.NewReader(nil) r := io.NewBinReaderFromIO(b) // 1. Undo up until the desired timestamp is here. for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) { b.Reset(value) tmp.Child = r.ReadU64LE() tmp.Parent = r.ReadU64LE() tmp.Time = r.ReadVarUint() if r.Err != nil { return r.Err } if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil { return err } key, value = c.Prev() } for i := 0; i < len(ms); i++ { // Loop invariant: key represents the next stored timestamp after ms[i].Time. // 2. Insert the operation. *lm = *ms[i] if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil { return err } // Cursor can be invalid, seek again. binary.BigEndian.PutUint64(cKey[:], lm.Time) _, _ = c.Seek(cKey[:8]) key, value = c.Next() // 3. Re-apply all other operations. for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) { if err := t.logFromBytes(&tmp, value); err != nil { return err } if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil { return err } key, value = c.Next() } } return nil } func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *Move) error { binary.BigEndian.PutUint64(key, op.Time) rawLog := t.logToBytes(op) if err := lb.Put(key[:8], rawLog); err != nil { return err } return t.redo(b, key, op, rawLog[16:]) } func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *Move, rawMeta []byte) error { var err error parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child)) if inTree { err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta) } else { ts = op.Time err = b.Delete(oldKey(key, op.Time)) } if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) { return err } if inTree { if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil { return err } var meta Meta if err := meta.FromBytes(currMeta); err != nil { return err } for i := range meta.Items { if isAttributeInternal(meta.Items[i].Key) { key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child) err := b.Delete(key) if err != nil { return err } } } } return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta) } // removeNode removes node keys from the tree except the children key or its parent. func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error { k := stateKey(key, node) _, _, rawMeta, _ := t.getState(b, k) var meta Meta if err := meta.FromBytes(rawMeta); err == nil { for i := range meta.Items { if isAttributeInternal(meta.Items[i].Key) { err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node)) if err != nil { return err } } } } return b.Delete(k) } // addNode adds node keys to the tree except the timestamp key. func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error { if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil { return err } err := b.Put(childrenKey(key, child, parent), []byte{1}) if err != nil { return err } for i := range meta.Items { if !isAttributeInternal(meta.Items[i].Key) { continue } key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, child) if len(meta.Items) == 1 { err = b.Put(key, []byte{1}) } else { err = b.Put(key, []byte{0}) } if err != nil { return err } } return nil } func (t *boltForest) undo(m *Move, b *bbolt.Bucket, key []byte) error { if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil { return err } parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time)) if !ok { return t.removeNode(b, key, m.Child, m.Parent) } var meta Meta if err := meta.FromBytes(rawMeta); err != nil { return err } return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta) } func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool { key := make([]byte, 9) key[0] = 's' for node := child; node != parent; { binary.LittleEndian.PutUint64(key[1:], node) parent, _, _, ok := t.getState(b, key) if !ok { return false } node = parent } return true } // TreeGetByPath implements the Forest interface. func (t *boltForest) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]Node, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeGetByPath", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetByPath", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("attr", attr), attribute.Int("path_count", len(path)), attribute.Bool("latest", latest), ), ) defer span.End() if !isAttributeInternal(attr) { return nil, ErrNotPathAttribute } if len(path) == 0 { success = true return nil, nil } t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, ErrDegradedMode } var nodes []Node err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) if treeRoot == nil { return ErrTreeNotFound } b := treeRoot.Bucket(dataBucket) i, curNodes, err := t.getPathPrefixMultiTraversal(b, attr, path[:len(path)-1]) if err != nil { return err } if i < len(path)-1 { return nil } var maxTimestamp uint64 c := b.Cursor() for i := range curNodes { attrKey := internalKey(nil, attr, path[len(path)-1], curNodes[i], 0) attrKey = attrKey[:len(attrKey)-8] childKey, _ := c.Seek(attrKey) for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) if latest { _, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child)) if ts >= maxTimestamp { nodes = append(nodes[:0], child) maxTimestamp = ts } } else { nodes = append(nodes, child) } childKey, _ = c.Next() } } return nil })) success = err == nil return nodes, err } // TreeGetMeta implements the forest interface. func (t *boltForest) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeGetMeta", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetMeta", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("node_id", strconv.FormatUint(nodeID, 10)), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return Meta{}, 0, ErrDegradedMode } key := stateKey(make([]byte, 9), nodeID) var m Meta var parentID uint64 err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) if treeRoot == nil { return ErrTreeNotFound } b := treeRoot.Bucket(dataBucket) if data := b.Get(key); len(data) != 0 { parentID = binary.LittleEndian.Uint64(data) } _, _, meta, _ := t.getState(b, stateKey(key, nodeID)) return m.FromBytes(meta) }) success = err == nil return m, parentID, metaerr.Wrap(err) } func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeIDs MultiNode, threshold int) bool { key := make([]byte, 9) key[0] = 'c' count := 0 for _, nodeID := range nodeIDs { binary.LittleEndian.PutUint64(key[1:], nodeID) c := b.Cursor() for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() { if count++; count > threshold { return false } } } return true } // TreeSortedByFilename implements the Forest interface. func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeIDs MultiNode, last *string, count int) ([]MultiNodeInfo, *string, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeSortedByFilename", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeSortedByFilename", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, last, ErrDegradedMode } if len(nodeIDs) == 0 { return nil, last, errors.New("empty node list") } h := newHeap(last, count) key := make([]byte, 9) var result []NodeInfo var fewChildren bool err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) if treeRoot == nil { return ErrTreeNotFound } b := treeRoot.Bucket(dataBucket) // If the node is a leaf, we could scan all filenames in the tree. // To prevent this we first count the number of children: if it is less than // the number of nodes we need to return, fallback to TreeGetChildren() implementation. if fewChildren = t.hasFewChildren(b, nodeIDs, count); fewChildren { var err error result, err = t.getChildren(b, nodeIDs) return err } t.fillSortedChildren(b, nodeIDs, h) for info, ok := h.pop(); ok; info, ok = h.pop() { for _, id := range info.id { childInfo, err := t.getChildInfo(b, key, id) if err != nil { return err } result = append(result, childInfo) } } return nil }) success = err == nil if err != nil { return nil, last, metaerr.Wrap(err) } if fewChildren { result = sortAndCut(result, last) } res := mergeNodeInfos(result) if len(res) > count { res = res[:count] } if len(res) != 0 { s := string(findAttr(res[len(res)-1].Meta, AttributeFilename)) last = &s } return res, last, metaerr.Wrap(err) } func sortAndCut(result []NodeInfo, last *string) []NodeInfo { var lastBytes []byte if last != nil { lastBytes = []byte(*last) } sort.Slice(result, func(i, j int) bool { return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1 }) for i := range result { if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 { return result[i:] } } return nil } func (t *boltForest) getChildInfo(b *bbolt.Bucket, key []byte, childID Node) (NodeInfo, error) { childInfo := NodeInfo{ID: childID} parentID, _, metaBytes, found := t.getState(b, stateKey(key, childID)) if found { childInfo.ParentID = parentID if err := childInfo.Meta.FromBytes(metaBytes); err != nil { return NodeInfo{}, err } } return childInfo, nil } func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *fixedHeap) { c := b.Cursor() prefix := internalKeyPrefix(nil, AttributeFilename) length := uint16(0) count := 0 var nodes []uint64 var lastFilename *string for k, _ := c.Seek(prefix); len(k) > 0 && k[0] == 'i'; k, _ = c.Next() { if len(k) < len(prefix)+2+16 { continue } parentID := binary.LittleEndian.Uint64(k[len(k)-16:]) var contains bool for i := range nodeIDs { if parentID == nodeIDs[i] { contains = true break } } if !contains { continue } actualLength := binary.LittleEndian.Uint16(k[len(prefix):]) childID := binary.LittleEndian.Uint64(k[len(k)-8:]) filename := string(k[len(prefix)+2 : len(k)-16]) if lastFilename == nil { lastFilename = &filename nodes = append(nodes, childID) } else if *lastFilename == filename { nodes = append(nodes, childID) } else { processed := h.push(nodes, *lastFilename) nodes = MultiNode{childID} lastFilename = &filename if actualLength != length { length = actualLength count = 1 } else if processed { if count++; count > h.count { lastFilename = nil nodes = nil length = actualLength + 1 c.Seek(append(prefix, byte(length), byte(length>>8))) c.Prev() // c.Next() will be performed by for loop } } } } if len(nodes) != 0 && lastFilename != nil { h.push(nodes, *lastFilename) } } // TreeGetChildren implements the Forest interface. func (t *boltForest) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeGetChildren", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetChildren", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("node_id", strconv.FormatUint(nodeID, 10)), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, ErrDegradedMode } var result []NodeInfo err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) if treeRoot == nil { return ErrTreeNotFound } b := treeRoot.Bucket(dataBucket) var err error result, err = t.getChildren(b, []Node{nodeID}) return err }) success = err == nil return result, metaerr.Wrap(err) } func (t *boltForest) getChildren(b *bbolt.Bucket, nodeIDs MultiNode) ([]NodeInfo, error) { var result []NodeInfo key := make([]byte, 9) for _, nodeID := range nodeIDs { key[0] = 'c' binary.LittleEndian.PutUint64(key[1:], nodeID) c := b.Cursor() for k, _ := c.Seek(key); len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == nodeID; k, _ = c.Next() { childID := binary.LittleEndian.Uint64(k[9:]) childInfo, err := t.getChildInfo(b, key, childID) if err != nil { return nil, err } result = append(result, childInfo) } } return result, nil } // TreeList implements the Forest interface. func (t *boltForest) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeList", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeList", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, ErrDegradedMode } var ids []string cidRaw := make([]byte, 32) cid.Encode(cidRaw) cidLen := len(cidRaw) err := t.db.View(func(tx *bbolt.Tx) error { c := tx.Cursor() for k, _ := c.Seek(cidRaw); k != nil; k, _ = c.Next() { if !bytes.HasPrefix(k, cidRaw) { return nil } ids = append(ids, string(k[cidLen:])) } return nil }) if err != nil { return nil, metaerr.Wrap(fmt.Errorf("could not list trees: %w", err)) } success = true return ids, nil } // TreeGetOpLog implements the pilorama.Forest interface. func (t *boltForest) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeGetOpLog", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeGetOpLog", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), attribute.String("height", strconv.FormatUint(height, 10)), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return Move{}, ErrDegradedMode } key := make([]byte, 8) binary.BigEndian.PutUint64(key, height) var lm Move err := t.db.View(func(tx *bbolt.Tx) error { treeRoot := tx.Bucket(bucketName(cid, treeID)) if treeRoot == nil { return ErrTreeNotFound } c := treeRoot.Bucket(logBucket).Cursor() if _, data := c.Seek(key); data != nil { return t.moveFromBytes(&lm, data) } return nil }) success = err == nil return lm, metaerr.Wrap(err) } // TreeDrop implements the pilorama.Forest interface. func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string) error { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeDrop", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeDrop", trace.WithAttributes( attribute.String("container_id", cid.EncodeToString()), attribute.String("tree_id", treeID), ), ) defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return ErrDegradedMode } else if t.mode.ReadOnly() { return ErrReadOnlyMode } err := metaerr.Wrap(t.db.Batch(func(tx *bbolt.Tx) error { if treeID == "" { c := tx.Cursor() prefix := make([]byte, 32) cid.Encode(prefix) for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Seek(prefix) { err := tx.DeleteBucket(k) if err != nil { return err } _, _ = c.First() // rewind the cursor to the root page } return nil } err := tx.DeleteBucket(bucketName(cid, treeID)) if errors.Is(err, bbolt.ErrBucketNotFound) { return ErrTreeNotFound } return err })) success = err == nil return err } // TreeListTrees implements ForestStorage. func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) { var ( startedAt = time.Now() success = false ) defer func() { t.metrics.AddMethodDuration("TreeListTrees", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeListTrees") defer span.End() t.modeMtx.RLock() defer t.modeMtx.RUnlock() if t.mode.NoMetabase() { return nil, ErrDegradedMode } batchSize := prm.BatchSize if batchSize <= 0 { batchSize = treeListTreesBatchSizeDefault } var res TreeListTreesResult err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error { c := tx.Cursor() checkNextPageToken := true for k, _ := c.Seek(prm.NextPageToken); k != nil; k, _ = c.Next() { if bytes.Equal(k, dataBucket) || bytes.Equal(k, logBucket) { continue } if checkNextPageToken && bytes.Equal(k, prm.NextPageToken) { checkNextPageToken = false continue } var contID cidSDK.ID if err := contID.Decode(k[:32]); err != nil { return fmt.Errorf("failed to decode containerID: %w", err) } res.Items = append(res.Items, ContainerIDTreeID{ CID: contID, TreeID: string(k[32:]), }) if len(res.Items) == batchSize { res.NextPageToken = make([]byte, len(k)) copy(res.NextPageToken, k) break } } return nil })) success = err == nil if err != nil { return nil, err } return &res, nil } func (t *boltForest) getPathPrefixMultiTraversal(bTree *bbolt.Bucket, attr string, path []string) (int, []Node, error) { c := bTree.Cursor() var curNodes []Node nextNodes := []Node{RootID} var attrKey []byte for i := range path { curNodes, nextNodes = nextNodes, curNodes[:0] for j := range curNodes { attrKey = internalKey(attrKey, attr, path[i], curNodes[j], 0) attrKey = attrKey[:len(attrKey)-8] childKey, value := c.Seek(attrKey) for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { if len(value) == 1 && value[0] == 1 { nextNodes = append(nextNodes, binary.LittleEndian.Uint64(childKey[len(childKey)-8:])) } childKey, value = c.Next() } } if len(nextNodes) == 0 { return i, curNodes, nil } } return len(path), nextNodes, nil } func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { c := bTree.Cursor() var curNode Node var attrKey []byte loop: for i := range path { attrKey = internalKey(attrKey, attr, path[i], curNode, 0) attrKey = attrKey[:len(attrKey)-8] childKey, value := c.Seek(attrKey) for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { if len(value) == 1 && value[0] == 1 { curNode = binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) continue loop } childKey, value = c.Next() } return i, curNode, nil } return len(path), curNode, nil } func (t *boltForest) moveFromBytes(m *Move, data []byte) error { return t.logFromBytes(m, data) } func (t *boltForest) logFromBytes(lm *Move, data []byte) error { lm.Child = binary.LittleEndian.Uint64(data) lm.Parent = binary.LittleEndian.Uint64(data[8:]) return lm.Meta.FromBytes(data[16:]) } func (t *boltForest) logToBytes(lm *Move) []byte { w := io.NewBufBinWriter() size := 8 + 8 + lm.Meta.Size() + 1 // if lm.HasOld { // size += 8 + lm.Old.Meta.Size() // } w.Grow(size) w.WriteU64LE(lm.Child) w.WriteU64LE(lm.Parent) lm.Meta.EncodeBinary(w.BinWriter) // w.WriteBool(lm.HasOld) // if lm.HasOld { // w.WriteU64LE(lm.Old.Parent) // lm.Old.Meta.EncodeBinary(w.BinWriter) // } return w.Bytes() } func bucketName(cid cidSDK.ID, treeID string) []byte { treeRoot := make([]byte, 32+len(treeID)) cid.Encode(treeRoot) copy(treeRoot[32:], treeID) return treeRoot } // 'o' + time -> old meta. func oldKey(key []byte, ts Timestamp) []byte { key[0] = 'o' binary.LittleEndian.PutUint64(key[1:], ts) return key[:9] } // 's' + child ID -> parent + timestamp of the first appearance + meta. func stateKey(key []byte, child Node) []byte { key[0] = 's' binary.LittleEndian.PutUint64(key[1:], child) return key[:9] } func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error { data := make([]byte, len(meta)+8+8) binary.LittleEndian.PutUint64(data, parent) binary.LittleEndian.PutUint64(data[8:], timestamp) copy(data[16:], meta) return b.Put(key, data) } func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) { data := b.Get(key) if data == nil { return 0, 0, nil, false } parent := binary.LittleEndian.Uint64(data) timestamp := binary.LittleEndian.Uint64(data[8:]) return parent, timestamp, data[16:], true } // 'c' + parent (id) + child (id) -> 0/1. func childrenKey(key []byte, child, parent Node) []byte { key[0] = 'c' binary.LittleEndian.PutUint64(key[1:], parent) binary.LittleEndian.PutUint64(key[9:], child) return key[:childrenKeySize] } func internalKeyPrefix(key []byte, k string) []byte { key = key[:0] key = append(key, 'i') l := len(k) key = append(key, byte(l), byte(l>>8)) key = append(key, k...) return key } // 'i' + attribute name (string) + attribute value (string) + parent (id) + node (id) -> 0/1. func internalKey(key []byte, k, v string, parent, node Node) []byte { size := 1 /* prefix */ + 2*2 /* len */ + 2*8 /* nodes */ + len(k) + len(v) if cap(key) < size { key = make([]byte, 0, size) } key = internalKeyPrefix(key, k) l := len(v) key = append(key, byte(l), byte(l>>8)) key = append(key, v...) var raw [8]byte binary.LittleEndian.PutUint64(raw[:], parent) key = append(key, raw[:]...) binary.LittleEndian.PutUint64(raw[:], node) key = append(key, raw[:]...) return key }