[#1444] pilorama: Fix TreeMove in bbolt backend

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-28 17:11:50 +03:00
parent 578fbdca57
commit 6b02df7b8c
2 changed files with 21 additions and 20 deletions

View file

@ -79,8 +79,8 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove
return nil, ErrInvalidCIDDescriptor return nil, ErrInvalidCIDDescriptor
} }
var lm *LogMove var lm LogMove
return lm, t.db.Batch(func(tx *bbolt.Tx) error { return &lm, t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
if err != nil { if err != nil {
return err return err
@ -90,8 +90,8 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove
if m.Child == RootID { if m.Child == RootID {
m.Child = t.findSpareID(bTree) m.Child = t.findSpareID(bTree)
} }
lm, err = t.applyOperation(bLog, bTree, m) lm.Move = *m
return err return t.applyOperation(bLog, bTree, &lm)
}) })
} }
@ -193,8 +193,9 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error {
if err != nil { if err != nil {
return err return err
} }
_, err = t.applyOperation(bLog, bTree, m)
return err lm := &LogMove{Move: *m}
return t.applyOperation(bLog, bTree, lm)
}) })
} }
@ -222,8 +223,7 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
return bLog, bData, nil return bLog, bData, nil
} }
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, m *Move) (*LogMove, error) { func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error {
var lm LogMove
var tmp LogMove var tmp LogMove
var cKey [17]byte var cKey [17]byte
@ -235,22 +235,21 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, m *Move
r := io.NewBinReaderFromIO(b) r := io.NewBinReaderFromIO(b)
// 1. Undo up until the desired timestamp is here. // 1. Undo up until the desired timestamp is here.
for len(key) == 8 && binary.BigEndian.Uint64(key) > m.Time { for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time {
b.Reset(value) b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil { if err := t.logFromBytes(&tmp, r); err != nil {
return nil, err return err
} }
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil { if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
return nil, err return err
} }
key, value = c.Prev() key, value = c.Prev()
} }
// 2. Insert the operation. // 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != m.Time { if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time {
lm.Move = *m if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
if err := t.do(logBucket, treeBucket, cKey[:], &lm); err != nil { return err
return nil, err
} }
} }
key, value = c.Next() key, value = c.Next()
@ -259,15 +258,15 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, m *Move
for len(key) == 8 { for len(key) == 8 {
b.Reset(value) b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil { if err := t.logFromBytes(&tmp, r); err != nil {
return nil, err return err
} }
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
return nil, err return err
} }
key, value = c.Next() key, value = c.Next()
} }
return &lm, nil return nil
} }
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error { func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {

View file

@ -79,24 +79,26 @@ func testForestTreeMove(t *testing.T, s Forest) {
require.ErrorIs(t, err, ErrInvalidCIDDescriptor) require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
}) })
t.Run("same parent, update meta", func(t *testing.T) { t.Run("same parent, update meta", func(t *testing.T) {
_, err = s.TreeMove(d, treeID, &Move{ res, err := s.TreeMove(d, treeID, &Move{
Parent: lm[1].Child, Parent: lm[1].Child,
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
Child: nodeID, Child: nodeID,
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, res.Child, nodeID)
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false) nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
require.NoError(t, err) require.NoError(t, err)
require.ElementsMatch(t, []Node{nodeID}, nodes) require.ElementsMatch(t, []Node{nodeID}, nodes)
}) })
t.Run("different parent", func(t *testing.T) { t.Run("different parent", func(t *testing.T) {
_, err = s.TreeMove(d, treeID, &Move{ res, err := s.TreeMove(d, treeID, &Move{
Parent: RootID, Parent: RootID,
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
Child: nodeID, Child: nodeID,
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, res.Child, nodeID)
nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false) nodes, err := s.TreeGetByPath(cid, treeID, AttributeFilename, []string{"path", "to", "file.txt"}, false)
require.NoError(t, err) require.NoError(t, err)