[#199] Update blobovniczaID on existed objects in metabase

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-11-27 12:23:15 +03:00
parent 54cd91adff
commit 5bfae833fe
2 changed files with 78 additions and 1 deletions

View file

@ -18,7 +18,10 @@ type (
}
)
var ErrUnknownObjectType = errors.New("unknown object type")
var (
ErrUnknownObjectType = errors.New("unknown object type")
ErrIncorrectBlobovniczaUpdate = errors.New("updating blobovnicza id on object without it")
)
// Put saves object header in metabase. Object payload expected to be cut.
// Big objects have nil blobovniczaID.
@ -37,6 +40,13 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent
// most right child and split header overlap parent so we have to
// check if object exists to not overwrite it twice
if exists {
// when storage engine moves small objects from one blobovniczaID
// to another, then it calls metabase.Put method with new blobovniczaID
// and this code should be triggered.
if !isParent && id != nil {
return updateBlobovniczaID(tx, obj.Address(), id)
}
return nil
}
@ -268,3 +278,22 @@ func decodeList(data []byte) (lst [][]byte, err error) {
return lst, nil
}
// updateBlobovniczaID for existing objects if they were moved from from
// one blobovnicza to another.
func updateBlobovniczaID(tx *bbolt.Tx, addr *objectSDK.Address, id *blobovnicza.ID) error {
bkt := tx.Bucket(smallBucketName(addr.ContainerID()))
if bkt == nil {
// if object exists, don't have blobovniczaID and we want to update it
// then ignore, this should never happen
return ErrIncorrectBlobovniczaUpdate
}
objectKey := objectKey(addr.ObjectID())
if len(bkt.Get(objectKey)) == 0 {
return ErrIncorrectBlobovniczaUpdate
}
return bkt.Put(objectKey, *id)
}

View file

@ -0,0 +1,48 @@
package meta_test
import (
"testing"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/stretchr/testify/require"
)
func TestDB_PutBlobovnicaUpdate(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
raw1 := generateRawObject(t)
blobovniczaID := blobovnicza.ID{1, 2, 3, 4}
// put one object with blobovniczaID
err := db.Put(raw1.Object(), &blobovniczaID)
require.NoError(t, err)
fetchedBlobovniczaID, err := db.IsSmall(raw1.Object().Address())
require.NoError(t, err)
require.Equal(t, &blobovniczaID, fetchedBlobovniczaID)
t.Run("update blobovniczaID", func(t *testing.T) {
newID := blobovnicza.ID{5, 6, 7, 8}
err := db.Put(raw1.Object(), &newID)
require.NoError(t, err)
fetchedBlobovniczaID, err := db.IsSmall(raw1.Object().Address())
require.NoError(t, err)
require.Equal(t, &newID, fetchedBlobovniczaID)
})
t.Run("update blobovniczaID on bad object", func(t *testing.T) {
raw2 := generateRawObject(t)
err := db.Put(raw2.Object(), nil)
require.NoError(t, err)
fetchedBlobovniczaID, err := db.IsSmall(raw2.Object().Address())
require.NoError(t, err)
require.Nil(t, fetchedBlobovniczaID)
err = db.Put(raw2.Object(), &blobovniczaID)
require.Error(t, err)
})
}