[#1630] pilorama: Support dropping trees

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-09-07 11:46:13 +03:00 committed by fyrchik
parent 699b534416
commit a2bb3a2a96
6 changed files with 89 additions and 0 deletions

View file

@ -159,3 +159,24 @@ func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64
}
return lm, err
}
// TreeDrop implements the pilorama.Forest interface.
func (e *StorageEngine) TreeDrop(cid cidSDK.ID, treeID string) error {
var err error
for _, sh := range e.sortShardsByWeight(cid) {
err = sh.TreeDrop(cid, treeID)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) && !errors.Is(err, shard.ErrReadOnlyMode) {
e.reportShardError(sh, "can't perform `TreeDrop`", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
}
continue
}
return nil
}
return err
}

View file

@ -3,6 +3,7 @@ package pilorama
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"os"
@ -579,6 +580,17 @@ func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (
return lm, err
}
// TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(cid cidSDK.ID, treeID string) error {
return t.db.Batch(func(tx *bbolt.Tx) error {
err := tx.DeleteBucket(bucketName(cid, treeID))
if errors.Is(err, bbolt.ErrBucketNotFound) {
return ErrTreeNotFound
}
return err
})
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
c := bTree.Cursor()

View file

@ -179,3 +179,15 @@ func (f *memoryForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64)
}
return s.operations[n].Move, nil
}
// TreeDrop implements the pilorama.Forest interface.
func (f *memoryForest) TreeDrop(cid cidSDK.ID, treeID string) error {
fullID := cid.String() + "/" + treeID
_, ok := f.treeMap[fullID]
if !ok {
return ErrTreeNotFound
}
delete(f.treeMap, fullID)
return nil
}

View file

@ -167,6 +167,39 @@ func testForestTreeGetChildren(t *testing.T, s Forest) {
})
}
func TestForest_TreeDrop(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeDrop(t, providers[i].construct(t))
})
}
}
func testForestTreeDrop(t *testing.T, s Forest) {
cid := cidtest.ID()
t.Run("return nil if not found", func(t *testing.T) {
require.ErrorIs(t, s.TreeDrop(cid, "123"), ErrTreeNotFound)
})
trees := []string{"tree1", "tree2"}
d := CIDDescriptor{cid, 0, 1}
for i := range trees {
_, err := s.TreeAddByPath(d, trees[i], AttributeFilename, []string{"path"},
[]KeyValue{{Key: "TreeName", Value: []byte(trees[i])}})
require.NoError(t, err)
}
err := s.TreeDrop(cid, trees[0])
require.NoError(t, err)
_, err = s.TreeGetByPath(cid, trees[0], AttributeFilename, []string{"path"}, true)
require.ErrorIs(t, err, ErrTreeNotFound)
_, err = s.TreeGetByPath(cid, trees[1], AttributeFilename, []string{"path"}, true)
require.NoError(t, err)
}
func TestForest_TreeAdd(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {

View file

@ -34,6 +34,9 @@ type Forest interface {
// TreeGetOpLog returns first log operation stored at or above the height.
// In case no such operation is found, empty Move and nil error should be returned.
TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (Move, error)
// TreeDrop drops a tree from the database.
// If the tree is not found, ErrTreeNotFound should be returned.
TreeDrop(cid cidSDK.ID, treeID string) error
}
type ForestStorage interface {

View file

@ -76,3 +76,11 @@ func (s *Shard) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (pilor
}
return s.pilorama.TreeGetOpLog(cid, treeID, height)
}
// TreeDrop implements the pilorama.Forest interface.
func (s *Shard) TreeDrop(cid cidSDK.ID, treeID string) error {
if s.pilorama == nil {
return ErrPiloramaDisabled
}
return s.pilorama.TreeDrop(cid, treeID)
}