[#1175] metabase: Work with LOCK objects

After introduction of LOCK objects (of type `TypeLock`) complicated
extended its behavior:
  * create `lockers` container bucket (LCB) during PUT;
  * remove object from LCB during DELETE;
  * look up object in LCB during EXISTS;
  * get object from LCB during GET;
  * list objects from LCB during LIST with cursor;
  * select objects from LCB during SELECT with '*'.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2022-02-15 15:51:56 +03:00 committed by LeL
parent 9f13674a10
commit 9508633a7e
13 changed files with 97 additions and 18 deletions

View file

@ -76,7 +76,7 @@ func TestDB_Containers(t *testing.T) {
func TestDB_ContainersCount(t *testing.T) {
db := newDB(t)
const R, T, SG = 10, 11, 12 // amount of object per type
const R, T, SG, L = 10, 11, 12, 13 // amount of object per type
uploadObjects := [...]struct {
amount int
@ -85,9 +85,10 @@ func TestDB_ContainersCount(t *testing.T) {
{R, objectSDK.TypeRegular},
{T, objectSDK.TypeTombstone},
{SG, objectSDK.TypeStorageGroup},
{L, objectSDK.TypeLock},
}
expected := make([]*cid.ID, 0, R+T+SG)
expected := make([]*cid.ID, 0, R+T+SG+L)
for _, upload := range uploadObjects {
for i := 0; i < upload.amount; i++ {

View file

@ -257,6 +257,8 @@ func delUniqueIndexes(obj *objectSDK.Object, isParent bool) ([]namedBucketItem,
bucketName = tombstoneBucketName(addr.ContainerID())
case objectSDK.TypeStorageGroup:
bucketName = storageGroupBucketName(addr.ContainerID())
case objectSDK.TypeLock:
bucketName = bucketNameLockers(*addr.ContainerID())
default:
return nil, ErrUnknownObjectType
}

View file

@ -88,13 +88,8 @@ func (db *DB) exists(tx *bbolt.Tx, addr *addressSDK.Address) (exists bool, err e
return false, objectSDK.NewSplitInfoError(splitInfo)
}
// if parent bucket is empty, then check if object exists in tombstone bucket
if inBucket(tx, tombstoneBucketName(addr.ContainerID()), objKey) {
return true, nil
}
// if parent bucket is empty, then check if object exists in storage group bucket
return inBucket(tx, storageGroupBucketName(addr.ContainerID()), objKey), nil
// if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, *addr.ContainerID(), objKey) != objectSDK.TypeRegular, nil
}
// inGraveyard returns:

View file

@ -55,6 +55,18 @@ func TestDB_Exists(t *testing.T) {
require.True(t, exists)
})
t.Run("lock object", func(t *testing.T) {
lock := generateObject(t)
lock.SetType(objectSDK.TypeLock)
err := putBig(db, lock)
require.NoError(t, err)
exists, err := meta.Exists(db, object.AddressOf(lock))
require.NoError(t, err)
require.True(t, exists)
})
t.Run("virtual object", func(t *testing.T) {
cid := cidtest.ID()
parent := generateObjectWithCID(t, cid)

View file

@ -113,6 +113,12 @@ func (db *DB) get(tx *bbolt.Tx, addr *addressSDK.Address, checkGraveyard, raw bo
return obj, obj.Unmarshal(data)
}
// if not found then check in locker index
data = getFromBucket(tx, bucketNameLockers(*cid), key)
if len(data) != 0 {
return obj, obj.Unmarshal(data)
}
// if not found then check if object is a virtual
return getVirtualObject(tx, cid, key, raw)
}

View file

@ -58,6 +58,18 @@ func TestDB_Get(t *testing.T) {
require.Equal(t, raw.CutPayload(), newObj)
})
t.Run("put lock object", func(t *testing.T) {
raw.SetType(objectSDK.TypeLock)
raw.SetID(testOID())
err := putBig(db, raw.Object())
require.NoError(t, err)
newObj, err := meta.Get(db, object.AddressOf(raw))
require.NoError(t, err)
require.Equal(t, raw.CutPayload().Object(), newObj)
})
t.Run("put virtual object", func(t *testing.T) {
cid := cidtest.ID()
splitID := objectSDK.NewSplitID()

View file

@ -24,6 +24,7 @@ func TestDB_IterateExpired(t *testing.T) {
object.TypeRegular,
object.TypeTombstone,
object.TypeStorageGroup,
object.TypeLock,
} {
mAlive[typ] = putWithExpiration(t, db, typ, epoch)
mExpired[typ] = putWithExpiration(t, db, typ, epoch-1)

View file

@ -57,8 +57,8 @@ func (l ListRes) Cursor() *Cursor {
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes regular, tombstone and storage group objects. Does not
// include inhumed objects. Use cursor value from response for consecutive requests.
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
@ -72,8 +72,8 @@ func ListWithCursor(db *DB, count uint32, cursor *Cursor) ([]*addressSDK.Address
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes regular, tombstone and storage group objects. Does not
// include inhumed objects. Use cursor value from response for consecutive requests.
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
@ -107,7 +107,11 @@ loop:
}
switch postfix {
case "", storageGroupPostfix, tombstonePostfix:
case
"",
storageGroupPostfix,
bucketNameSuffixLockers,
tombstonePostfix:
default:
continue
}

View file

@ -18,7 +18,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
const (
containers = 5
total = containers * 4 // regular + ts + sg + child
total = containers * 5 // regular + ts + sg + child + lock
)
expected := make([]*addressSDK.Address, 0, total)
@ -48,6 +48,13 @@ func TestLisObjectsWithCursor(t *testing.T) {
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
// add one lock
obj = generateObjectWithCID(t, containerID)
obj.SetType(objectSDK.TypeLock)
err = putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
// add one inhumed (do not include into expected)
obj = generateObjectWithCID(t, containerID)
obj.SetType(objectSDK.TypeRegular)

View file

@ -3,7 +3,6 @@ package meta_test
import (
"testing"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
@ -34,7 +33,7 @@ func TestDB_Lock(t *testing.T) {
obj.SetContainerID(&cnr)
// save irregular object
err := meta.Put(db, objectCore.NewFromSDK(obj.Object()), nil)
err := meta.Put(db, obj, nil)
require.NoError(t, err, typ)
// try to lock it

View file

@ -199,7 +199,7 @@ func uniqueIndexes(obj *objectSDK.Object, si *objectSDK.SplitInfo, id *blobovnic
case objectSDK.TypeStorageGroup:
bucketName = storageGroupBucketName(addr.ContainerID())
case objectSDK.TypeLock:
bucketName = bucketNameLocked(*addr.ContainerID())
bucketName = bucketNameLockers(*addr.ContainerID())
default:
return nil, ErrUnknownObjectType
}

View file

@ -153,6 +153,7 @@ func (db *DB) selectAll(tx *bbolt.Tx, cid *cid.ID, to map[string]int) {
selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, 0)
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, 0)
selectAllFromBucket(tx, parentBucketName(cid), prefix, to, 0)
selectAllFromBucket(tx, bucketNameLockers(*cid), prefix, to, 0)
}
// selectAllFromBucket goes through all keys in bucket and adds them in a
@ -207,6 +208,7 @@ func (db *DB) selectFastFilter(
selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, fNum)
selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, fNum)
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, fNum)
selectAllFromBucket(tx, bucketNameLockers(*cid), prefix, to, fNum)
default: // user attribute
bucketName := attributeBucketName(cid, f.Header())
@ -222,6 +224,9 @@ var mBucketNaming = map[string][]func(*cid.ID) []byte{
v2object.TypeRegular.String(): {primaryBucketName, parentBucketName},
v2object.TypeTombstone.String(): {tombstoneBucketName},
v2object.TypeStorageGroup.String(): {storageGroupBucketName},
v2object.TypeLock.String(): {func(id *cid.ID) []byte {
return bucketNameLockers(*id)
}},
}
func allBucketNames(cid *cid.ID) (names [][]byte) {

View file

@ -162,6 +162,11 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
err = putBig(db, leftChild)
require.NoError(t, err)
lock := generateObjectWithCID(t, cid)
lock.SetType(objectSDK.TypeLock)
err = putBig(db, lock)
require.NoError(t, err)
parent := generateObjectWithCID(t, cid)
rightChild := generateObjectWithCID(t, cid)
@ -201,6 +206,7 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
object.AddressOf(leftChild),
object.AddressOf(rightChild),
object.AddressOf(link),
object.AddressOf(lock),
)
fs = objectSDK.SearchFilters{}
@ -224,6 +230,7 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
testSelect(t, db, cid, fs,
object.AddressOf(ts),
object.AddressOf(sg),
object.AddressOf(lock),
)
fs = objectSDK.SearchFilters{}
@ -245,6 +252,7 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
object.AddressOf(link),
object.AddressOf(parent),
object.AddressOf(sg),
object.AddressOf(lock),
)
fs = objectSDK.SearchFilters{}
@ -266,6 +274,7 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
object.AddressOf(link),
object.AddressOf(parent),
object.AddressOf(ts),
object.AddressOf(lock),
)
fs = objectSDK.SearchFilters{}
@ -299,6 +308,7 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
object.AddressOf(rightChild),
object.AddressOf(link),
object.AddressOf(parent),
object.AddressOf(lock),
)
})
}
@ -494,6 +504,11 @@ func TestDB_SelectObjectID(t *testing.T) {
err = putBig(db, sg)
require.NoError(t, err)
lock := generateObjectWithCID(t, cid)
lock.SetType(objectSDK.TypeLock)
err = putBig(db, lock)
require.NoError(t, err)
t.Run("not present", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchNotPresent, nil)
@ -516,6 +531,7 @@ func TestDB_SelectObjectID(t *testing.T) {
object.AddressOf(parent),
object.AddressOf(sg),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
@ -530,6 +546,7 @@ func TestDB_SelectObjectID(t *testing.T) {
object.AddressOf(parent),
object.AddressOf(sg),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
@ -544,6 +561,7 @@ func TestDB_SelectObjectID(t *testing.T) {
object.AddressOf(regular),
object.AddressOf(parent),
object.AddressOf(sg),
object.AddressOf(lock),
)
})
@ -558,6 +576,7 @@ func TestDB_SelectObjectID(t *testing.T) {
object.AddressOf(regular),
object.AddressOf(parent),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
@ -572,6 +591,22 @@ func TestDB_SelectObjectID(t *testing.T) {
object.AddressOf(regular),
object.AddressOf(sg),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
t.Run("lock objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, lock.ID())
testSelect(t, db, cid, fs, object.AddressOf(lock))
fs = objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, lock.ID())
testSelect(t, db, cid, fs,
object.AddressOf(regular),
object.AddressOf(parent),
object.AddressOf(sg),
object.AddressOf(ts),
)
})
}