[#789] shard: Add method to refill the metabase
There is a need to refill Metabase data with the objects from BlobStor. Implement `refillMetabase` method which iterates over all objects from BlobStor and saves them in Metabase. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
8d016d2529
commit
b9c22e21b1
2 changed files with 219 additions and 0 deletions
|
@ -1,7 +1,14 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
)
|
||||
|
||||
// Open opens all Shard's components.
|
||||
|
@ -59,6 +66,52 @@ func (s *Shard) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillMetabase() error {
|
||||
err := s.metaBase.Reset()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not reset metabase: %w", err)
|
||||
}
|
||||
|
||||
return blobstor.IterateObjects(s.blobStor, func(obj *object.Object, blzID *blobovnicza.ID) error {
|
||||
if obj.Type() == objectSDK.TypeTombstone {
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||
}
|
||||
|
||||
tombAddr := obj.Address()
|
||||
cid := tombAddr.ContainerID()
|
||||
memberIDs := tombstone.Members()
|
||||
tombMembers := make([]*objectSDK.Address, 0, len(memberIDs))
|
||||
|
||||
for _, id := range memberIDs {
|
||||
if id == nil {
|
||||
return errors.New("empty member in tombstone")
|
||||
}
|
||||
|
||||
a := objectSDK.NewAddress()
|
||||
a.SetContainerID(cid)
|
||||
a.SetObjectID(id)
|
||||
|
||||
tombMembers = append(tombMembers, a)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.WithAddresses(tombAddr)
|
||||
inhumePrm.WithAddresses(tombMembers...)
|
||||
|
||||
_, err = s.metaBase.Inhume(&inhumePrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not inhume objects: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return meta.Put(s.metaBase, obj, blzID)
|
||||
})
|
||||
}
|
||||
|
||||
// Close releases all Shard's components.
|
||||
func (s *Shard) Close() error {
|
||||
components := []interface{ Close() error }{
|
||||
|
|
166
pkg/local_object_storage/shard/control_test.go
Normal file
166
pkg/local_object_storage/shard/control_test.go
Normal file
|
@ -0,0 +1,166 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRefillMetabase(t *testing.T) {
|
||||
p := t.Name()
|
||||
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
blobOpts := []blobstor.Option{
|
||||
blobstor.WithRootPath(path.Join(p, "blob")),
|
||||
blobstor.WithBlobovniczaShallowWidth(1),
|
||||
blobstor.WithBlobovniczaShallowDepth(1),
|
||||
}
|
||||
|
||||
sh := New(
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
WithMetaBaseOptions(
|
||||
meta.WithPath(path.Join(p, "meta")),
|
||||
),
|
||||
)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, sh.Open())
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init())
|
||||
|
||||
const objNum = 5
|
||||
|
||||
type objAddr struct {
|
||||
obj *object.Object
|
||||
addr *objectSDK.Address
|
||||
}
|
||||
|
||||
mObjs := make(map[string]objAddr)
|
||||
|
||||
for i := uint64(0); i < objNum; i++ {
|
||||
rawObj := objecttest.Raw()
|
||||
rawObj.SetType(objectSDK.TypeRegular)
|
||||
|
||||
obj := object.NewFromSDK(rawObj.Object())
|
||||
|
||||
addr := obj.Address()
|
||||
|
||||
mObjs[addr.String()] = objAddr{
|
||||
obj: obj,
|
||||
addr: addr,
|
||||
}
|
||||
}
|
||||
|
||||
tombObjRaw := object.NewRawFrom(objecttest.Raw())
|
||||
tombObjRaw.SetType(objectSDK.TypeTombstone)
|
||||
|
||||
tombstone := objecttest.Tombstone()
|
||||
|
||||
tombData, err := tombstone.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
tombObjRaw.SetPayload(tombData)
|
||||
|
||||
tombObj := tombObjRaw.Object()
|
||||
|
||||
tombMembers := make([]*objectSDK.Address, 0, len(tombstone.Members()))
|
||||
|
||||
for _, member := range tombstone.Members() {
|
||||
a := objectSDK.NewAddress()
|
||||
a.SetObjectID(member)
|
||||
a.SetContainerID(tombObj.ContainerID())
|
||||
|
||||
tombMembers = append(tombMembers, a)
|
||||
}
|
||||
|
||||
var putPrm PutPrm
|
||||
|
||||
for _, v := range mObjs {
|
||||
_, err := sh.Put(putPrm.WithObject(v.obj))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = sh.Put(putPrm.WithObject(tombObj))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sh.Inhume(new(InhumePrm).WithTarget(tombObj.Address(), tombMembers...))
|
||||
require.NoError(t, err)
|
||||
|
||||
var headPrm HeadPrm
|
||||
|
||||
checkObj := func(addr *objectSDK.Address, expObj *object.Object) {
|
||||
res, err := sh.Head(headPrm.WithAddress(addr))
|
||||
|
||||
if expObj == nil {
|
||||
require.ErrorIs(t, err, object.ErrNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, object.NewRawFromObject(expObj).CutPayload().Object(), res.Object())
|
||||
}
|
||||
|
||||
checkAllObjs := func(exists bool) {
|
||||
for _, v := range mObjs {
|
||||
if exists {
|
||||
checkObj(v.addr, v.obj)
|
||||
} else {
|
||||
checkObj(v.addr, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkTombMembers := func(exists bool) {
|
||||
for _, member := range tombMembers {
|
||||
_, err := sh.Head(headPrm.WithAddress(member))
|
||||
|
||||
if exists {
|
||||
require.ErrorIs(t, err, object.ErrAlreadyRemoved)
|
||||
} else {
|
||||
require.ErrorIs(t, err, object.ErrNotFound)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkAllObjs(true)
|
||||
checkObj(tombObj.Address(), tombObj)
|
||||
checkTombMembers(true)
|
||||
|
||||
err = sh.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
sh = New(
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
WithMetaBaseOptions(
|
||||
meta.WithPath(path.Join(p, "meta_restored")),
|
||||
),
|
||||
)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, sh.Open())
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init())
|
||||
|
||||
defer sh.Close()
|
||||
|
||||
checkAllObjs(false)
|
||||
checkObj(tombObj.Address(), nil)
|
||||
checkTombMembers(false)
|
||||
|
||||
err = sh.refillMetabase()
|
||||
require.NoError(t, err)
|
||||
|
||||
checkAllObjs(true)
|
||||
checkObj(tombObj.Address(), tombObj)
|
||||
checkTombMembers(true)
|
||||
}
|
Loading…
Reference in a new issue