diff --git a/go.mod b/go.mod index 66c88841ef..430d0aae69 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b - github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201201103311-576841e0e091 + github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201203150742-6db6b569e098 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index 4ac85ba46f..3827d83123 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/local_object_storage/metabase/v2/put.go b/pkg/local_object_storage/metabase/v2/put.go index 79911b5684..d579e3f20d 100644 --- a/pkg/local_object_storage/metabase/v2/put.go +++ b/pkg/local_object_storage/metabase/v2/put.go @@ -21,17 +21,21 @@ type ( var ( ErrUnknownObjectType = errors.New("unknown object type") ErrIncorrectBlobovniczaUpdate = errors.New("updating blobovnicza id on object without it") + ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it") + ErrIncorrectRootObject = errors.New("invalid root object") ) // Put saves object header in metabase. Object payload expected to be cut. // Big objects have nil blobovniczaID. func (db *DB) Put(obj *object.Object, id *blobovnicza.ID) error { return db.boltDB.Update(func(tx *bbolt.Tx) error { - return db.put(tx, obj, id, false) + return db.put(tx, obj, id, nil) }) } -func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent bool) error { +func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, si *objectSDK.SplitInfo) error { + isParent := si != nil + exists, err := db.exists(tx, obj.Address()) if err != nil { return err @@ -42,22 +46,35 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent if exists { // when storage engine moves small objects from one blobovniczaID // to another, then it calls metabase.Put method with new blobovniczaID - // and this code should be triggered. + // and this code should be triggered if !isParent && id != nil { return updateBlobovniczaID(tx, obj.Address(), id) } + // when storage already has last object in split hierarchy and there is + // a linking object to put (or vice versa), we should update split info + // with object ids of these objects + if isParent { + return updateSplitInfo(tx, obj.Address(), si) + } + return nil } if obj.GetParent() != nil && !isParent { // limit depth by two - err = db.put(tx, obj.GetParent(), id, true) + parentSI, err := splitInfoFromObject(obj) + if err != nil { + return err + } + + err = db.put(tx, obj.GetParent(), id, parentSI) if err != nil { return err } } - uniqueIndexes, err := uniqueIndexes(obj, isParent, id) + // build unique indexes + uniqueIndexes, err := uniqueIndexes(obj, si, id) if err != nil { return fmt.Errorf("can' build unique indexes: %w", err) } @@ -102,7 +119,8 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent } // builds list of indexes from the object. -func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) { +func uniqueIndexes(obj *object.Object, si *objectSDK.SplitInfo, id *blobovnicza.ID) ([]namedBucketItem, error) { + isParent := si != nil addr := obj.Address() objKey := objectKey(addr.ObjectID()) result := make([]namedBucketItem, 0, 3) @@ -145,10 +163,22 @@ func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]nam // index root object if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { + var ( + err error + splitInfo []byte + ) + + if isParent { + splitInfo, err = si.Marshal() + if err != nil { + return nil, fmt.Errorf("can't marshal split info: %w", err) + } + } + result = append(result, namedBucketItem{ name: rootBucketName(addr.ContainerID()), key: objKey, - val: zeroValue, // todo: store split.Info when it will be ready + val: splitInfo, }) } @@ -305,3 +335,87 @@ func updateBlobovniczaID(tx *bbolt.Tx, addr *objectSDK.Address, id *blobovnicza. return bkt.Put(objectKey, *id) } + +// updateSpliInfo for existing objects if storage filled with extra information +// about last object in split hierarchy or linking object. +func updateSplitInfo(tx *bbolt.Tx, addr *objectSDK.Address, from *objectSDK.SplitInfo) error { + bkt := tx.Bucket(rootBucketName(addr.ContainerID())) + if bkt == nil { + // if object doesn't exists and we want to update split info on it + // then ignore, this should never happen + return ErrIncorrectSplitInfoUpdate + } + + objectKey := objectKey(addr.ObjectID()) + + rawSplitInfo := bkt.Get(objectKey) + if len(rawSplitInfo) == 0 { + return ErrIncorrectSplitInfoUpdate + } + + to := objectSDK.NewSplitInfo() + + err := to.Unmarshal(rawSplitInfo) + if err != nil { + return fmt.Errorf("can't unmarshal split info from root index: %w", err) + } + + result := mergeSplitInfo(from, to) + + rawSplitInfo, err = result.Marshal() + if err != nil { + return fmt.Errorf("can't marhsal merged split info: %w", err) + } + + return bkt.Put(objectKey, rawSplitInfo) +} + +// splitInfoFromObject returns split info based on last or linkin object. +// Otherwise returns nil, nil. +func splitInfoFromObject(obj *object.Object) (*objectSDK.SplitInfo, error) { + if obj.Parent() == nil { + return nil, nil + } + + info := objectSDK.NewSplitInfo() + info.SetSplitID(obj.SplitID()) + + switch { + case isLinkObject(obj): + info.SetLink(obj.ID()) + case isLastObject(obj): + info.SetLastPart(obj.ID()) + default: + return nil, ErrIncorrectRootObject // should never happen + } + + return info, nil +} + +// mergeSplitInfo ignores conflicts and rewrites `to` with non empty values +// from `from`. +func mergeSplitInfo(from, to *objectSDK.SplitInfo) *objectSDK.SplitInfo { + to.SetSplitID(from.SplitID()) // overwrite SplitID and ignore conflicts + + if lp := from.LastPart(); lp != nil { + to.SetLastPart(lp) + } + + if link := from.Link(); link != nil { + to.SetLink(link) + } + + return to +} + +// isLinkObject returns true if object contains parent header and list +// of children. +func isLinkObject(obj *object.Object) bool { + return len(obj.Children()) > 0 && obj.Parent() != nil +} + +// isLastObject returns true if object contains only parent header without list +// of children. +func isLastObject(obj *object.Object) bool { + return len(obj.Children()) == 0 && obj.Parent() != nil +}