From b9c22e21b16f3f0a425d28be3fede7ef0d9d5720 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 13 Sep 2021 16:38:58 +0300 Subject: [PATCH] [#789] shard: Add method to refill the metabase There is a need to refill Metabase data with the objects from BlobStor. Implement `refillMetabase` method which iterates over all objects from BlobStor and saves them in Metabase. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/shard/control.go | 53 ++++++ .../shard/control_test.go | 166 ++++++++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 pkg/local_object_storage/shard/control_test.go diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 120374440..f5326a153 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -1,7 +1,14 @@ package shard import ( + "errors" "fmt" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" ) // Open opens all Shard's components. @@ -59,6 +66,52 @@ func (s *Shard) Init() error { return nil } +func (s *Shard) refillMetabase() error { + err := s.metaBase.Reset() + if err != nil { + return fmt.Errorf("could not reset metabase: %w", err) + } + + return blobstor.IterateObjects(s.blobStor, func(obj *object.Object, blzID *blobovnicza.ID) error { + if obj.Type() == objectSDK.TypeTombstone { + tombstone := objectSDK.NewTombstone() + + if err := tombstone.Unmarshal(obj.Payload()); err != nil { + return fmt.Errorf("could not unmarshal tombstone content: %w", err) + } + + tombAddr := obj.Address() + cid := tombAddr.ContainerID() + memberIDs := tombstone.Members() + tombMembers := make([]*objectSDK.Address, 0, len(memberIDs)) + + for _, id := range memberIDs { + if id == nil { + return errors.New("empty member in tombstone") + } + + a := objectSDK.NewAddress() + a.SetContainerID(cid) + a.SetObjectID(id) + + tombMembers = append(tombMembers, a) + } + + var inhumePrm meta.InhumePrm + + inhumePrm.WithAddresses(tombAddr) + inhumePrm.WithAddresses(tombMembers...) + + _, err = s.metaBase.Inhume(&inhumePrm) + if err != nil { + return fmt.Errorf("could not inhume objects: %w", err) + } + } + + return meta.Put(s.metaBase, obj, blzID) + }) +} + // Close releases all Shard's components. func (s *Shard) Close() error { components := []interface{ Close() error }{ diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go new file mode 100644 index 000000000..5288caff1 --- /dev/null +++ b/pkg/local_object_storage/shard/control_test.go @@ -0,0 +1,166 @@ +package shard + +import ( + "os" + "path" + "testing" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/stretchr/testify/require" +) + +func TestRefillMetabase(t *testing.T) { + p := t.Name() + + defer os.RemoveAll(p) + + blobOpts := []blobstor.Option{ + blobstor.WithRootPath(path.Join(p, "blob")), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + } + + sh := New( + WithBlobStorOptions(blobOpts...), + WithMetaBaseOptions( + meta.WithPath(path.Join(p, "meta")), + ), + ) + + // open Blobstor + require.NoError(t, sh.Open()) + + // initialize Blobstor + require.NoError(t, sh.Init()) + + const objNum = 5 + + type objAddr struct { + obj *object.Object + addr *objectSDK.Address + } + + mObjs := make(map[string]objAddr) + + for i := uint64(0); i < objNum; i++ { + rawObj := objecttest.Raw() + rawObj.SetType(objectSDK.TypeRegular) + + obj := object.NewFromSDK(rawObj.Object()) + + addr := obj.Address() + + mObjs[addr.String()] = objAddr{ + obj: obj, + addr: addr, + } + } + + tombObjRaw := object.NewRawFrom(objecttest.Raw()) + tombObjRaw.SetType(objectSDK.TypeTombstone) + + tombstone := objecttest.Tombstone() + + tombData, err := tombstone.Marshal() + require.NoError(t, err) + + tombObjRaw.SetPayload(tombData) + + tombObj := tombObjRaw.Object() + + tombMembers := make([]*objectSDK.Address, 0, len(tombstone.Members())) + + for _, member := range tombstone.Members() { + a := objectSDK.NewAddress() + a.SetObjectID(member) + a.SetContainerID(tombObj.ContainerID()) + + tombMembers = append(tombMembers, a) + } + + var putPrm PutPrm + + for _, v := range mObjs { + _, err := sh.Put(putPrm.WithObject(v.obj)) + require.NoError(t, err) + } + + _, err = sh.Put(putPrm.WithObject(tombObj)) + require.NoError(t, err) + + _, err = sh.Inhume(new(InhumePrm).WithTarget(tombObj.Address(), tombMembers...)) + require.NoError(t, err) + + var headPrm HeadPrm + + checkObj := func(addr *objectSDK.Address, expObj *object.Object) { + res, err := sh.Head(headPrm.WithAddress(addr)) + + if expObj == nil { + require.ErrorIs(t, err, object.ErrNotFound) + return + } + + require.NoError(t, err) + require.Equal(t, object.NewRawFromObject(expObj).CutPayload().Object(), res.Object()) + } + + checkAllObjs := func(exists bool) { + for _, v := range mObjs { + if exists { + checkObj(v.addr, v.obj) + } else { + checkObj(v.addr, nil) + } + } + } + + checkTombMembers := func(exists bool) { + for _, member := range tombMembers { + _, err := sh.Head(headPrm.WithAddress(member)) + + if exists { + require.ErrorIs(t, err, object.ErrAlreadyRemoved) + } else { + require.ErrorIs(t, err, object.ErrNotFound) + } + } + } + + checkAllObjs(true) + checkObj(tombObj.Address(), tombObj) + checkTombMembers(true) + + err = sh.Close() + require.NoError(t, err) + + sh = New( + WithBlobStorOptions(blobOpts...), + WithMetaBaseOptions( + meta.WithPath(path.Join(p, "meta_restored")), + ), + ) + + // open Blobstor + require.NoError(t, sh.Open()) + + // initialize Blobstor + require.NoError(t, sh.Init()) + + defer sh.Close() + + checkAllObjs(false) + checkObj(tombObj.Address(), nil) + checkTombMembers(false) + + err = sh.refillMetabase() + require.NoError(t, err) + + checkAllObjs(true) + checkObj(tombObj.Address(), tombObj) + checkTombMembers(true) +}