[#1502] shard: Process locks on metabase refill

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
experimental
Evgenii Stratonikov 2022-06-20 15:28:20 +03:00 committed by fyrchik
parent 972ca83e23
commit 78ea450c25
2 changed files with 63 additions and 2 deletions

View File

@ -81,7 +81,9 @@ func (s *Shard) refillMetabase() error {
}
return blobstor.IterateObjects(s.blobStor, func(obj *objectSDK.Object, blzID *blobovnicza.ID) error {
if obj.Type() == objectSDK.TypeTombstone {
//nolint: exhaustive
switch obj.Type() {
case objectSDK.TypeTombstone:
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
@ -108,6 +110,21 @@ func (s *Shard) refillMetabase() error {
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}
case objectSDK.TypeLock:
var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal lock content: %w", err)
}
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err = s.metaBase.Lock(cnr, id, locked)
if err != nil {
return fmt.Errorf("could not lock objects: %w", err)
}
}
err := meta.Put(s.metaBase, obj, blzID)

View File

@ -9,8 +9,11 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test"
"github.com/stretchr/testify/require"
)
@ -47,11 +50,19 @@ func TestRefillMetabase(t *testing.T) {
}
mObjs := make(map[string]objAddr)
locked := make([]oid.ID, 1, 2)
locked[0] = oidtest.ID()
cnrLocked := cidtest.ID()
for i := uint64(0); i < objNum; i++ {
obj := objecttest.Object()
obj.SetType(objectSDK.TypeRegular)
if len(locked) < 2 {
obj.SetContainerID(cnrLocked)
id, _ := obj.ID()
locked = append(locked, id)
}
addr := object.AddressOf(obj)
mObjs[addr.EncodeToString()] = objAddr{
@ -96,6 +107,21 @@ func TestRefillMetabase(t *testing.T) {
_, err = sh.Put(putPrm)
require.NoError(t, err)
// LOCK object handling
var lock objectSDK.Lock
lock.WriteMembers(locked)
lockObj := objecttest.Object()
lockObj.SetContainerID(cnrLocked)
objectSDK.WriteLock(lockObj, lock)
putPrm.WithObject(lockObj)
_, err = sh.Put(putPrm)
require.NoError(t, err)
lockID, _ := lockObj.ID()
require.NoError(t, sh.Lock(cnrLocked, lockID, locked))
var inhumePrm InhumePrm
inhumePrm.WithTarget(object.AddressOf(tombObj), tombMembers...)
@ -142,9 +168,26 @@ func TestRefillMetabase(t *testing.T) {
}
}
checkLocked := func(t *testing.T, cnr cid.ID, locked []oid.ID) {
var addr oid.Address
addr.SetContainer(cnr)
for i := range locked {
addr.SetObject(locked[i])
var prm InhumePrm
prm.MarkAsGarbage(addr)
_, err := sh.Inhume(prm)
require.ErrorAs(t, err, new(apistatus.ObjectLocked),
"object %s should be locked", locked[i])
}
}
checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
err = sh.Close()
require.NoError(t, err)
@ -174,4 +217,5 @@ func TestRefillMetabase(t *testing.T) {
checkAllObjs(true)
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
}