[#1940] Removing all trees by container ID if tree ID is empty in pilorama.Forest.TreeDrop

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2022-11-08 15:32:38 +03:00 committed by fyrchik
parent 92cac5bbdf
commit b167700b6f
7 changed files with 65 additions and 9 deletions

View file

@ -30,6 +30,7 @@ Changelog for NeoFS Node
- Losing meta information on request forwarding (#2040)
- Assembly process triggered by a request with a bearer token (#2040)
- Losing locking context after metabase resync (#1502)
- Removing all trees by container ID if tree ID is empty in `pilorama.Forest.TreeDrop` (#1940)
### Removed
### Updated

View file

@ -73,6 +73,7 @@ func initTreeService(c *cfg) {
ev := e.(containerEvent.DeleteSuccess)
// This is executed asynchronously, so we don't care about the operation taking some time.
c.log.Debug("removing all trees for container", zap.Stringer("cid", ev.ID))
err := c.treeService.DropTree(context.Background(), ev.ID, "")
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
// Ignore pilorama.ErrTreeNotFound but other errors, including shard.ErrReadOnly, should be logged.

View file

@ -627,6 +627,17 @@ func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (
// TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(cid cidSDK.ID, treeID string) error {
return t.db.Batch(func(tx *bbolt.Tx) error {
if treeID == "" {
c := tx.Cursor()
prefix := []byte(cid.EncodeToString())
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
err := tx.DeleteBucket(k)
if err != nil {
return err
}
}
return nil
}
err := tx.DeleteBucket(bucketName(cid, treeID))
if errors.Is(err, bbolt.ErrBucketNotFound) {
return ErrTreeNotFound

View file

@ -183,13 +183,21 @@ func (f *memoryForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64)
// TreeDrop implements the pilorama.Forest interface.
func (f *memoryForest) TreeDrop(cid cidSDK.ID, treeID string) error {
fullID := cid.String() + "/" + treeID
_, ok := f.treeMap[fullID]
if !ok {
return ErrTreeNotFound
cidStr := cid.String()
if treeID == "" {
for k := range f.treeMap {
if strings.HasPrefix(k, cidStr) {
delete(f.treeMap, k)
}
}
} else {
fullID := cidStr + "/" + treeID
_, ok := f.treeMap[fullID]
if !ok {
return ErrTreeNotFound
}
delete(f.treeMap, fullID)
}
delete(f.treeMap, fullID)
return nil
}

View file

@ -180,14 +180,26 @@ func TestForest_TreeDrop(t *testing.T) {
}
func testForestTreeDrop(t *testing.T, s Forest) {
cid := cidtest.ID()
const cidsSize = 3
var cids [cidsSize]cidSDK.ID
for i := range cids {
cids[i] = cidtest.ID()
}
cid := cids[0]
t.Run("return nil if not found", func(t *testing.T) {
require.ErrorIs(t, s.TreeDrop(cid, "123"), ErrTreeNotFound)
})
require.NoError(t, s.TreeDrop(cid, ""))
trees := []string{"tree1", "tree2"}
d := CIDDescriptor{cid, 0, 1}
var descs [cidsSize]CIDDescriptor
for i := range descs {
descs[i] = CIDDescriptor{cids[i], 0, 1}
}
d := descs[0]
for i := range trees {
_, err := s.TreeAddByPath(d, trees[i], AttributeFilename, []string{"path"},
[]KeyValue{{Key: "TreeName", Value: []byte(trees[i])}})
@ -202,6 +214,28 @@ func testForestTreeDrop(t *testing.T, s Forest) {
_, err = s.TreeGetByPath(cid, trees[1], AttributeFilename, []string{"path"}, true)
require.NoError(t, err)
for j := range descs {
for i := range trees {
_, err := s.TreeAddByPath(descs[j], trees[i], AttributeFilename, []string{"path"},
[]KeyValue{{Key: "TreeName", Value: []byte(trees[i])}})
require.NoError(t, err)
}
}
list, err := s.TreeList(cid)
require.NotEmpty(t, list)
require.NoError(t, s.TreeDrop(cid, ""))
list, err = s.TreeList(cid)
require.NoError(t, err)
require.Empty(t, list)
for j := 1; j < len(cids); j++ {
list, err = s.TreeList(cids[j])
require.NoError(t, err)
require.Equal(t, len(list), len(trees))
}
}
func TestForest_TreeAdd(t *testing.T) {

View file

@ -35,6 +35,7 @@ type Forest interface {
TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (Move, error)
// TreeDrop drops a tree from the database.
// If the tree is not found, ErrTreeNotFound should be returned.
// In case of empty treeID drops all trees related to container.
TreeDrop(cid cidSDK.ID, treeID string) error
// TreeList returns all the tree IDs that have been added to the
// passed container ID. Nil slice should be returned if no tree found.

View file

@ -295,7 +295,7 @@ func (s *Service) syncLoop(ctx context.Context) {
for cnr := range s.cnrMap {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err = s.DropTree(ctx, cnr, "") // TODO: #1940 drop all the trees here
err = s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),