diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 237a99d81..1b93d0ddc 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -2,7 +2,6 @@ package meta import ( "bytes" - "errors" "fmt" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -18,8 +17,6 @@ type DeletePrm struct { // DeleteRes groups resulting values of Delete operation. type DeleteRes struct{} -var ErrVirtualObject = errors.New("do not remove virtual object directly") - // WithAddresses is a Delete option to set the addresses of the objects to delete. // // Option is required. @@ -37,7 +34,7 @@ func Delete(db *DB, addrs ...*objectSDK.Address) error { return err } -// DeleteObjects marks list of objects as deleted. +// Delete removed object records from metabase indexes. func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error { for i := range prm.addrs { @@ -52,15 +49,8 @@ func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { } func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error { - pl := parentLength(tx, addr) // parentLength of address, for virtual objects it is > 0 - - // do not remove virtual objects directly - if !isParent && pl > 0 { - return ErrVirtualObject - } - - // unmarshal object - obj, err := db.get(tx, addr, false) + // unmarshal object, work only with physically stored (raw == true) objects + obj, err := db.get(tx, addr, false, true) if err != nil { return err } diff --git a/pkg/local_object_storage/metabase/delete_test.go b/pkg/local_object_storage/metabase/delete_test.go index fe83050c6..c9c3e1124 100644 --- a/pkg/local_object_storage/metabase/delete_test.go +++ b/pkg/local_object_storage/metabase/delete_test.go @@ -32,6 +32,10 @@ func TestDB_Delete(t *testing.T) { require.NoError(t, err) require.Len(t, l, 1) + // try to remove parent unsuccessfully + err = meta.Delete(db, parent.Object().Address()) + require.Error(t, err) + // inhume parent and child so they will be on graveyard ts := generateRawObjectWithCID(t, cid) diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 56259a954..d43545cd6 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "go.etcd.io/bbolt" @@ -74,16 +75,9 @@ func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err er // if primary bucket is empty, then check if object exists in parent bucket if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) { - rawSplitInfo := getFromBucket(tx, rootBucketName(addr.ContainerID()), objKey) - if len(rawSplitInfo) == 0 { - return false, ErrLackSplitInfo - } - - splitInfo := objectSDK.NewSplitInfo() - - err := splitInfo.Unmarshal(rawSplitInfo) + splitInfo, err := getSplitInfo(tx, addr.ContainerID(), objKey) if err != nil { - return false, fmt.Errorf("can't unmarshal split info from root index: %w", err) + return false, err } return false, objectSDK.NewSplitInfoError(splitInfo) @@ -122,3 +116,21 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool { return len(val) != 0 } + +// getSplitInfo returns SplitInfo structure from root index. Returns error +// if there is no `key` record in root index. +func getSplitInfo(tx *bbolt.Tx, cid *container.ID, key []byte) (*objectSDK.SplitInfo, error) { + rawSplitInfo := getFromBucket(tx, rootBucketName(cid), key) + if len(rawSplitInfo) == 0 { + return nil, ErrLackSplitInfo + } + + splitInfo := objectSDK.NewSplitInfo() + + err := splitInfo.Unmarshal(rawSplitInfo) + if err != nil { + return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err) + } + + return splitInfo, nil +} diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 1a6cd09be..10d5a8942 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -12,6 +12,7 @@ import ( // GetPrm groups the parameters of Get operation. type GetPrm struct { addr *objectSDK.Address + raw bool } // GetRes groups resulting values of Get operation. @@ -30,12 +31,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm { return p } +// WithRaw is a Get option to set raw flag value. If flag is unset, then Get +// returns header of virtual object, otherwise it returns SplitInfo of virtual +// object. +func (p *GetPrm) WithRaw(raw bool) *GetPrm { + if p != nil { + p.raw = raw + } + + return p +} + // Header returns the requested object header. func (r *GetRes) Header() *object.Object { return r.hdr } -// Get read the object from DB. +// Get reads the object from DB. func Get(db *DB, addr *objectSDK.Address) (*object.Object, error) { r, err := db.Get(new(GetPrm).WithAddress(addr)) if err != nil { @@ -45,12 +57,22 @@ func Get(db *DB, addr *objectSDK.Address) (*object.Object, error) { return r.Header(), nil } +// GetRaw reads physically stored object from DB. +func GetRaw(db *DB, addr *objectSDK.Address, raw bool) (*object.Object, error) { + r, err := db.Get(new(GetPrm).WithAddress(addr).WithRaw(raw)) + if err != nil { + return nil, err + } + + return r.Header(), nil +} + // Get returns object header for specified address. func (db *DB) Get(prm *GetPrm) (res *GetRes, err error) { res = new(GetRes) err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.hdr, err = db.get(tx, prm.addr, true) + res.hdr, err = db.get(tx, prm.addr, true, prm.raw) return err }) @@ -58,7 +80,7 @@ func (db *DB) Get(prm *GetPrm) (res *GetRes, err error) { return } -func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address, checkGraveyard bool) (*object.Object, error) { +func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address, checkGraveyard, raw bool) (*object.Object, error) { obj := object.New() key := objectKey(addr.ObjectID()) cid := addr.ContainerID() @@ -86,7 +108,7 @@ func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address, checkGraveyard bool) (* } // if not found then check if object is a virtual - return getVirtualObject(tx, cid, key) + return getVirtualObject(tx, cid, key, raw) } func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { @@ -98,7 +120,11 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { return bkt.Get(key) } -func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte) (*object.Object, error) { +func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte, raw bool) (*object.Object, error) { + if raw { + return nil, getSplitInfoError(tx, cid, key) + } + parentBucket := tx.Bucket(parentBucketName(cid)) if parentBucket == nil { return nil, object.ErrNotFound @@ -132,3 +158,12 @@ func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte) (*object.Obje return child.GetParent(), nil } + +func getSplitInfoError(tx *bbolt.Tx, cid *container.ID, key []byte) error { + splitInfo, err := getSplitInfo(tx, cid, key) + if err == nil { + return objectSDK.NewSplitInfoError(splitInfo) + } + + return object.ErrNotFound +} diff --git a/pkg/local_object_storage/metabase/get_test.go b/pkg/local_object_storage/metabase/get_test.go index d380d0a85..e45e06f6b 100644 --- a/pkg/local_object_storage/metabase/get_test.go +++ b/pkg/local_object_storage/metabase/get_test.go @@ -60,21 +60,36 @@ func TestDB_Get(t *testing.T) { t.Run("put virtual object", func(t *testing.T) { cid := testCID() + splitID := objectSDK.NewSplitID() + parent := generateRawObjectWithCID(t, cid) addAttribute(parent, "foo", "bar") child := generateRawObjectWithCID(t, cid) child.SetParent(parent.Object().SDK()) child.SetParentID(parent.ID()) + child.SetSplitID(splitID) err := putBig(db, child.Object()) require.NoError(t, err) - newParent, err := meta.Get(db, parent.Object().Address()) + t.Run("raw is true", func(t *testing.T) { + _, err = meta.GetRaw(db, parent.Object().Address(), true) + require.Error(t, err) + + siErr, ok := err.(*objectSDK.SplitInfoError) + require.True(t, ok) + + require.Equal(t, splitID, siErr.SplitInfo().SplitID()) + require.Equal(t, child.ID(), siErr.SplitInfo().LastPart()) + require.Nil(t, siErr.SplitInfo().Link()) + }) + + newParent, err := meta.GetRaw(db, parent.Object().Address(), false) require.NoError(t, err) require.True(t, binaryEqual(parent.Object(), newParent)) - newChild, err := meta.Get(db, child.Object().Address()) + newChild, err := meta.GetRaw(db, child.Object().Address(), true) require.NoError(t, err) require.True(t, binaryEqual(child.Object(), newChild)) }) diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index e5ed052bb..3f2afedce 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -336,7 +336,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.Sear return true } - obj, err := db.get(tx, addr, true) + obj, err := db.get(tx, addr, true, false) if err != nil { return false }