[#238] metabase: Support raw flag in Get method

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-08 14:23:23 +03:00
parent c267a08886
commit bf7a390304
6 changed files with 86 additions and 30 deletions

View file

@ -2,7 +2,6 @@ package meta
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
@ -18,8 +17,6 @@ type DeletePrm struct {
// DeleteRes groups resulting values of Delete operation. // DeleteRes groups resulting values of Delete operation.
type DeleteRes struct{} type DeleteRes struct{}
var ErrVirtualObject = errors.New("do not remove virtual object directly")
// WithAddresses is a Delete option to set the addresses of the objects to delete. // WithAddresses is a Delete option to set the addresses of the objects to delete.
// //
// Option is required. // Option is required.
@ -37,7 +34,7 @@ func Delete(db *DB, addrs ...*objectSDK.Address) error {
return err return err
} }
// DeleteObjects marks list of objects as deleted. // Delete removed object records from metabase indexes.
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error { return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error {
for i := range prm.addrs { for i := range prm.addrs {
@ -52,15 +49,8 @@ func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
} }
func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error { func (db *DB) delete(tx *bbolt.Tx, addr *objectSDK.Address, isParent bool) error {
pl := parentLength(tx, addr) // parentLength of address, for virtual objects it is > 0 // unmarshal object, work only with physically stored (raw == true) objects
obj, err := db.get(tx, addr, false, true)
// do not remove virtual objects directly
if !isParent && pl > 0 {
return ErrVirtualObject
}
// unmarshal object
obj, err := db.get(tx, addr, false)
if err != nil { if err != nil {
return err return err
} }

View file

@ -32,6 +32,10 @@ func TestDB_Delete(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, l, 1) require.Len(t, l, 1)
// try to remove parent unsuccessfully
err = meta.Delete(db, parent.Object().Address())
require.Error(t, err)
// inhume parent and child so they will be on graveyard // inhume parent and child so they will be on graveyard
ts := generateRawObjectWithCID(t, cid) ts := generateRawObjectWithCID(t, cid)

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -74,16 +75,9 @@ func (db *DB) exists(tx *bbolt.Tx, addr *objectSDK.Address) (exists bool, err er
// if primary bucket is empty, then check if object exists in parent bucket // if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) { if inBucket(tx, parentBucketName(addr.ContainerID()), objKey) {
rawSplitInfo := getFromBucket(tx, rootBucketName(addr.ContainerID()), objKey) splitInfo, err := getSplitInfo(tx, addr.ContainerID(), objKey)
if len(rawSplitInfo) == 0 {
return false, ErrLackSplitInfo
}
splitInfo := objectSDK.NewSplitInfo()
err := splitInfo.Unmarshal(rawSplitInfo)
if err != nil { if err != nil {
return false, fmt.Errorf("can't unmarshal split info from root index: %w", err) return false, err
} }
return false, objectSDK.NewSplitInfoError(splitInfo) return false, objectSDK.NewSplitInfoError(splitInfo)
@ -122,3 +116,21 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool {
return len(val) != 0 return len(val) != 0
} }
// getSplitInfo returns SplitInfo structure from root index. Returns error
// if there is no `key` record in root index.
func getSplitInfo(tx *bbolt.Tx, cid *container.ID, key []byte) (*objectSDK.SplitInfo, error) {
rawSplitInfo := getFromBucket(tx, rootBucketName(cid), key)
if len(rawSplitInfo) == 0 {
return nil, ErrLackSplitInfo
}
splitInfo := objectSDK.NewSplitInfo()
err := splitInfo.Unmarshal(rawSplitInfo)
if err != nil {
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
}
return splitInfo, nil
}

View file

@ -12,6 +12,7 @@ import (
// GetPrm groups the parameters of Get operation. // GetPrm groups the parameters of Get operation.
type GetPrm struct { type GetPrm struct {
addr *objectSDK.Address addr *objectSDK.Address
raw bool
} }
// GetRes groups resulting values of Get operation. // GetRes groups resulting values of Get operation.
@ -30,12 +31,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
return p return p
} }
// WithRaw is a Get option to set raw flag value. If flag is unset, then Get
// returns header of virtual object, otherwise it returns SplitInfo of virtual
// object.
func (p *GetPrm) WithRaw(raw bool) *GetPrm {
if p != nil {
p.raw = raw
}
return p
}
// Header returns the requested object header. // Header returns the requested object header.
func (r *GetRes) Header() *object.Object { func (r *GetRes) Header() *object.Object {
return r.hdr return r.hdr
} }
// Get read the object from DB. // Get reads the object from DB.
func Get(db *DB, addr *objectSDK.Address) (*object.Object, error) { func Get(db *DB, addr *objectSDK.Address) (*object.Object, error) {
r, err := db.Get(new(GetPrm).WithAddress(addr)) r, err := db.Get(new(GetPrm).WithAddress(addr))
if err != nil { if err != nil {
@ -45,12 +57,22 @@ func Get(db *DB, addr *objectSDK.Address) (*object.Object, error) {
return r.Header(), nil return r.Header(), nil
} }
// GetRaw reads physically stored object from DB.
func GetRaw(db *DB, addr *objectSDK.Address, raw bool) (*object.Object, error) {
r, err := db.Get(new(GetPrm).WithAddress(addr).WithRaw(raw))
if err != nil {
return nil, err
}
return r.Header(), nil
}
// Get returns object header for specified address. // Get returns object header for specified address.
func (db *DB) Get(prm *GetPrm) (res *GetRes, err error) { func (db *DB) Get(prm *GetPrm) (res *GetRes, err error) {
res = new(GetRes) res = new(GetRes)
err = db.boltDB.View(func(tx *bbolt.Tx) error { err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.hdr, err = db.get(tx, prm.addr, true) res.hdr, err = db.get(tx, prm.addr, true, prm.raw)
return err return err
}) })
@ -58,7 +80,7 @@ func (db *DB) Get(prm *GetPrm) (res *GetRes, err error) {
return return
} }
func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address, checkGraveyard bool) (*object.Object, error) { func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address, checkGraveyard, raw bool) (*object.Object, error) {
obj := object.New() obj := object.New()
key := objectKey(addr.ObjectID()) key := objectKey(addr.ObjectID())
cid := addr.ContainerID() cid := addr.ContainerID()
@ -86,7 +108,7 @@ func (db *DB) get(tx *bbolt.Tx, addr *objectSDK.Address, checkGraveyard bool) (*
} }
// if not found then check if object is a virtual // if not found then check if object is a virtual
return getVirtualObject(tx, cid, key) return getVirtualObject(tx, cid, key, raw)
} }
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
@ -98,7 +120,11 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
return bkt.Get(key) return bkt.Get(key)
} }
func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte) (*object.Object, error) { func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte, raw bool) (*object.Object, error) {
if raw {
return nil, getSplitInfoError(tx, cid, key)
}
parentBucket := tx.Bucket(parentBucketName(cid)) parentBucket := tx.Bucket(parentBucketName(cid))
if parentBucket == nil { if parentBucket == nil {
return nil, object.ErrNotFound return nil, object.ErrNotFound
@ -132,3 +158,12 @@ func getVirtualObject(tx *bbolt.Tx, cid *container.ID, key []byte) (*object.Obje
return child.GetParent(), nil return child.GetParent(), nil
} }
func getSplitInfoError(tx *bbolt.Tx, cid *container.ID, key []byte) error {
splitInfo, err := getSplitInfo(tx, cid, key)
if err == nil {
return objectSDK.NewSplitInfoError(splitInfo)
}
return object.ErrNotFound
}

View file

@ -60,21 +60,36 @@ func TestDB_Get(t *testing.T) {
t.Run("put virtual object", func(t *testing.T) { t.Run("put virtual object", func(t *testing.T) {
cid := testCID() cid := testCID()
splitID := objectSDK.NewSplitID()
parent := generateRawObjectWithCID(t, cid) parent := generateRawObjectWithCID(t, cid)
addAttribute(parent, "foo", "bar") addAttribute(parent, "foo", "bar")
child := generateRawObjectWithCID(t, cid) child := generateRawObjectWithCID(t, cid)
child.SetParent(parent.Object().SDK()) child.SetParent(parent.Object().SDK())
child.SetParentID(parent.ID()) child.SetParentID(parent.ID())
child.SetSplitID(splitID)
err := putBig(db, child.Object()) err := putBig(db, child.Object())
require.NoError(t, err) require.NoError(t, err)
newParent, err := meta.Get(db, parent.Object().Address()) t.Run("raw is true", func(t *testing.T) {
_, err = meta.GetRaw(db, parent.Object().Address(), true)
require.Error(t, err)
siErr, ok := err.(*objectSDK.SplitInfoError)
require.True(t, ok)
require.Equal(t, splitID, siErr.SplitInfo().SplitID())
require.Equal(t, child.ID(), siErr.SplitInfo().LastPart())
require.Nil(t, siErr.SplitInfo().Link())
})
newParent, err := meta.GetRaw(db, parent.Object().Address(), false)
require.NoError(t, err) require.NoError(t, err)
require.True(t, binaryEqual(parent.Object(), newParent)) require.True(t, binaryEqual(parent.Object(), newParent))
newChild, err := meta.Get(db, child.Object().Address()) newChild, err := meta.GetRaw(db, child.Object().Address(), true)
require.NoError(t, err) require.NoError(t, err)
require.True(t, binaryEqual(child.Object(), newChild)) require.True(t, binaryEqual(child.Object(), newChild))
}) })

View file

@ -336,7 +336,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.Sear
return true return true
} }
obj, err := db.get(tx, addr, true) obj, err := db.get(tx, addr, true, false)
if err != nil { if err != nil {
return false return false
} }