[#230] metabase: Index split info structures
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
eaae5a5dd7
commit
e2de95e3f6
3 changed files with 122 additions and 8 deletions
2
go.mod
2
go.mod
|
@ -17,7 +17,7 @@ require (
|
||||||
github.com/multiformats/go-multihash v0.0.13 // indirect
|
github.com/multiformats/go-multihash v0.0.13 // indirect
|
||||||
github.com/nspcc-dev/hrw v1.0.9
|
github.com/nspcc-dev/hrw v1.0.9
|
||||||
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b
|
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b
|
||||||
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201201103311-576841e0e091
|
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201203150742-6db6b569e098
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
github.com/nspcc-dev/neofs-crypto v0.3.0
|
||||||
github.com/nspcc-dev/tzhash v1.4.0
|
github.com/nspcc-dev/tzhash v1.4.0
|
||||||
github.com/panjf2000/ants/v2 v2.3.0
|
github.com/panjf2000/ants/v2 v2.3.0
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -21,17 +21,21 @@ type (
|
||||||
var (
|
var (
|
||||||
ErrUnknownObjectType = errors.New("unknown object type")
|
ErrUnknownObjectType = errors.New("unknown object type")
|
||||||
ErrIncorrectBlobovniczaUpdate = errors.New("updating blobovnicza id on object without it")
|
ErrIncorrectBlobovniczaUpdate = errors.New("updating blobovnicza id on object without it")
|
||||||
|
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
|
||||||
|
ErrIncorrectRootObject = errors.New("invalid root object")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Put saves object header in metabase. Object payload expected to be cut.
|
// Put saves object header in metabase. Object payload expected to be cut.
|
||||||
// Big objects have nil blobovniczaID.
|
// Big objects have nil blobovniczaID.
|
||||||
func (db *DB) Put(obj *object.Object, id *blobovnicza.ID) error {
|
func (db *DB) Put(obj *object.Object, id *blobovnicza.ID) error {
|
||||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
return db.put(tx, obj, id, false)
|
return db.put(tx, obj, id, nil)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent bool) error {
|
func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *objectSDK.SplitInfo) error {
|
||||||
|
isParent := si != nil
|
||||||
|
|
||||||
exists, err := db.exists(tx, obj.Address())
|
exists, err := db.exists(tx, obj.Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -42,22 +46,35 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent
|
||||||
if exists {
|
if exists {
|
||||||
// when storage engine moves small objects from one blobovniczaID
|
// when storage engine moves small objects from one blobovniczaID
|
||||||
// to another, then it calls metabase.Put method with new blobovniczaID
|
// to another, then it calls metabase.Put method with new blobovniczaID
|
||||||
// and this code should be triggered.
|
// and this code should be triggered
|
||||||
if !isParent && id != nil {
|
if !isParent && id != nil {
|
||||||
return updateBlobovniczaID(tx, obj.Address(), id)
|
return updateBlobovniczaID(tx, obj.Address(), id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// when storage already has last object in split hierarchy and there is
|
||||||
|
// a linking object to put (or vice versa), we should update split info
|
||||||
|
// with object ids of these objects
|
||||||
|
if isParent {
|
||||||
|
return updateSplitInfo(tx, obj.Address(), si)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.GetParent() != nil && !isParent { // limit depth by two
|
if obj.GetParent() != nil && !isParent { // limit depth by two
|
||||||
err = db.put(tx, obj.GetParent(), id, true)
|
parentSI, err := splitInfoFromObject(obj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.put(tx, obj.GetParent(), id, parentSI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uniqueIndexes, err := uniqueIndexes(obj, isParent, id)
|
// build unique indexes
|
||||||
|
uniqueIndexes, err := uniqueIndexes(obj, si, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can' build unique indexes: %w", err)
|
return fmt.Errorf("can' build unique indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -102,7 +119,8 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent
|
||||||
}
|
}
|
||||||
|
|
||||||
// builds list of <unique> indexes from the object.
|
// builds list of <unique> indexes from the object.
|
||||||
func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) {
|
func uniqueIndexes(obj *object.Object, si *objectSDK.SplitInfo, id *blobovnicza.ID) ([]namedBucketItem, error) {
|
||||||
|
isParent := si != nil
|
||||||
addr := obj.Address()
|
addr := obj.Address()
|
||||||
objKey := objectKey(addr.ObjectID())
|
objKey := objectKey(addr.ObjectID())
|
||||||
result := make([]namedBucketItem, 0, 3)
|
result := make([]namedBucketItem, 0, 3)
|
||||||
|
@ -145,10 +163,22 @@ func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]nam
|
||||||
|
|
||||||
// index root object
|
// index root object
|
||||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
splitInfo []byte
|
||||||
|
)
|
||||||
|
|
||||||
|
if isParent {
|
||||||
|
splitInfo, err = si.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("can't marshal split info: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
result = append(result, namedBucketItem{
|
result = append(result, namedBucketItem{
|
||||||
name: rootBucketName(addr.ContainerID()),
|
name: rootBucketName(addr.ContainerID()),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
val: zeroValue, // todo: store split.Info when it will be ready
|
val: splitInfo,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -305,3 +335,87 @@ func updateBlobovniczaID(tx *bbolt.Tx, addr *objectSDK.Address, id *blobovnicza.
|
||||||
|
|
||||||
return bkt.Put(objectKey, *id)
|
return bkt.Put(objectKey, *id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateSpliInfo for existing objects if storage filled with extra information
|
||||||
|
// about last object in split hierarchy or linking object.
|
||||||
|
func updateSplitInfo(tx *bbolt.Tx, addr *objectSDK.Address, from *objectSDK.SplitInfo) error {
|
||||||
|
bkt := tx.Bucket(rootBucketName(addr.ContainerID()))
|
||||||
|
if bkt == nil {
|
||||||
|
// if object doesn't exists and we want to update split info on it
|
||||||
|
// then ignore, this should never happen
|
||||||
|
return ErrIncorrectSplitInfoUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
objectKey := objectKey(addr.ObjectID())
|
||||||
|
|
||||||
|
rawSplitInfo := bkt.Get(objectKey)
|
||||||
|
if len(rawSplitInfo) == 0 {
|
||||||
|
return ErrIncorrectSplitInfoUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
to := objectSDK.NewSplitInfo()
|
||||||
|
|
||||||
|
err := to.Unmarshal(rawSplitInfo)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := mergeSplitInfo(from, to)
|
||||||
|
|
||||||
|
rawSplitInfo, err = result.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't marhsal merged split info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return bkt.Put(objectKey, rawSplitInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitInfoFromObject returns split info based on last or linkin object.
|
||||||
|
// Otherwise returns nil, nil.
|
||||||
|
func splitInfoFromObject(obj *object.Object) (*objectSDK.SplitInfo, error) {
|
||||||
|
if obj.Parent() == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
info := objectSDK.NewSplitInfo()
|
||||||
|
info.SetSplitID(obj.SplitID())
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case isLinkObject(obj):
|
||||||
|
info.SetLink(obj.ID())
|
||||||
|
case isLastObject(obj):
|
||||||
|
info.SetLastPart(obj.ID())
|
||||||
|
default:
|
||||||
|
return nil, ErrIncorrectRootObject // should never happen
|
||||||
|
}
|
||||||
|
|
||||||
|
return info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeSplitInfo ignores conflicts and rewrites `to` with non empty values
|
||||||
|
// from `from`.
|
||||||
|
func mergeSplitInfo(from, to *objectSDK.SplitInfo) *objectSDK.SplitInfo {
|
||||||
|
to.SetSplitID(from.SplitID()) // overwrite SplitID and ignore conflicts
|
||||||
|
|
||||||
|
if lp := from.LastPart(); lp != nil {
|
||||||
|
to.SetLastPart(lp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if link := from.Link(); link != nil {
|
||||||
|
to.SetLink(link)
|
||||||
|
}
|
||||||
|
|
||||||
|
return to
|
||||||
|
}
|
||||||
|
|
||||||
|
// isLinkObject returns true if object contains parent header and list
|
||||||
|
// of children.
|
||||||
|
func isLinkObject(obj *object.Object) bool {
|
||||||
|
return len(obj.Children()) > 0 && obj.Parent() != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// isLastObject returns true if object contains only parent header without list
|
||||||
|
// of children.
|
||||||
|
func isLastObject(obj *object.Object) bool {
|
||||||
|
return len(obj.Children()) == 0 && obj.Parent() != nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue