diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index bab5452dd..967910f3c 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -1,11 +1,10 @@ package meta import ( + "bytes" "context" - "encoding/binary" "errors" "fmt" - gio "io" "strconv" "time" @@ -19,18 +18,10 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/cockroachdb/pebble" - "github.com/nspcc-dev/neo-go/pkg/io" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -type ( - namedBucketItem struct { - name, key, val []byte - } -) - // PutPrm groups the parameters of Put operation. type PutPrm struct { obj *objectSDK.Object @@ -90,10 +81,16 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { } currEpoch := db.epochState.CurrentEpoch() + cnr, ok := prm.obj.ContainerID() + if !ok { + return PutRes{}, errors.New("missing container in object") + } + + defer db.guard.LockContainerID(cnr)() err = db.batch(func(b *pebble.Batch) error { var e error - res, e = db.put(b, prm.obj, prm.id, nil, currEpoch) + res, e = db.put(ctx, b, prm.obj, prm.id, nil, currEpoch) return e }) if err == nil { @@ -106,7 +103,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { return res, metaerr.Wrap(err) } -func (db *DB) put(batch *pebble.Batch, +func (db *DB) put( + ctx context.Context, + b *pebble.Batch, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, @@ -119,7 +118,7 @@ func (db *DB) put(batch *pebble.Batch, isParent := si != nil - exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch) + exists, _, err := db.exists(ctx, b, objectCore.AddressOf(obj), oid.Address{}, currEpoch) var splitInfoError *objectSDK.SplitInfoError if errors.As(err, &splitInfoError) { @@ -129,70 +128,71 @@ func (db *DB) put(batch *pebble.Batch, } if exists { - return PutRes{}, db.updateObj(tx, obj, id, si, isParent) + return PutRes{}, db.updateObj(b, obj, id, si, isParent) } - return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch) + return PutRes{Inserted: true}, db.insertObject(ctx, b, obj, id, si, isParent, cnr, currEpoch) } -func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { +func (db *DB) updateObj(b *pebble.Batch, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { + addr := objectCore.AddressOf(obj) // most right child and split header overlap parent so we have to // check if object exists to not overwrite it twice // When storage engine moves objects between different sub-storages, // it calls metabase.Put method with new storage ID, thus triggering this code. if !isParent && id != nil { - return setStorageID(tx, objectCore.AddressOf(obj), id, true) + return setStorageID(b, addr, id, true) } // when storage already has last object in split hierarchy and there is // a linking object to put (or vice versa), we should update split info // with object ids of these objects if isParent { - return updateSplitInfo(tx, objectCore.AddressOf(obj), si) + return updateSplitInfo(b, addr.Container(), addr.Object(), si) } return nil } -func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error { +func (db *DB) insertObject(ctx context.Context, b *pebble.Batch, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error { if par := obj.Parent(); par != nil && !isParent { // limit depth by two parentSI, err := splitInfoFromObject(obj) if err != nil { return err } - _, err = db.put(tx, par, id, parentSI, currEpoch) + _, err = db.put(ctx, b, par, id, parentSI, currEpoch) if err != nil { return err } } - err := putUniqueIndexes(tx, obj, si, id) + err := putUniqueIndexes(b, obj, si, id) if err != nil { return fmt.Errorf("can't put unique indexes: %w", err) } - err = updateListIndexes(tx, obj, putListIndexItem) + err = updateListIndexes(b, obj, putListIndexItem) if err != nil { return fmt.Errorf("can't put list indexes: %w", err) } - err = updateFKBTIndexes(tx, obj, putFKBTIndexItem) + err = updateFKBTIndexes(b, obj, putListIndexItem) if err != nil { return fmt.Errorf("can't put fake bucket tree indexes: %w", err) } // update container volume size estimation if obj.Type() == objectSDK.TypeRegular && !isParent { - err = changeContainerSize(tx, cnr, obj.PayloadSize(), true) + err = changeContainerSize(b, cnr, int64(obj.PayloadSize())) if err != nil { return err } } if !isParent { - if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil { + if err = incCounters(b, cnr, IsUserObject(obj)); err != nil { return err } } @@ -201,26 +201,24 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o } func putUniqueIndexes( - tx *bbolt.Tx, + b *pebble.Batch, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte, ) error { isParent := si != nil addr := objectCore.AddressOf(obj) - cnr := addr.Container() - objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) - bucketName := make([]byte, bucketKeySize) // add value to primary unique bucket if !isParent { + var key []byte switch obj.Type() { case objectSDK.TypeRegular: - bucketName = primaryBucketName(cnr, bucketName) + key = primaryKey(addr.Container(), addr.Object()) case objectSDK.TypeTombstone: - bucketName = tombstoneBucketName(cnr, bucketName) + key = tombstoneKey(addr.Container(), addr.Object()) case objectSDK.TypeLock: - bucketName = bucketNameLockers(cnr, bucketName) + key = lockersKey(addr.Container(), addr.Object()) default: return ErrUnknownObjectType } @@ -230,18 +228,14 @@ func putUniqueIndexes( return fmt.Errorf("can't marshal object header: %w", err) } - err = putUniqueIndexItem(tx, namedBucketItem{ - name: bucketName, - key: objKey, - val: rawObject, - }) + err = b.Set(key, rawObject, pebble.Sync) if err != nil { return err } // index storageID if it is present if id != nil { - if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil { + if err = setStorageID(b, objectCore.AddressOf(obj), id, false); err != nil { return err } } @@ -249,6 +243,7 @@ func putUniqueIndexes( // index root object if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { + objID := addr.Object() if ecHead := obj.ECHeader(); ecHead != nil { parentID := ecHead.Parent() if ecHead.ParentSplitID() != nil { @@ -263,37 +258,14 @@ func putUniqueIndexes( parentID = *parentSplitParentID } - objKey = objectKey(parentID, objKey) + objID = parentID } - return updateSplitInfoIndex(tx, objKey, cnr, bucketName, si) + return updateSplitInfo(b, addr.Container(), objID, si) } return nil } -func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error { - return updateUniqueIndexItem(tx, namedBucketItem{ - name: rootBucketName(cnr, bucketName), - key: objKey, - }, func(old, _ []byte) ([]byte, error) { - switch { - case si == nil && old == nil: - return []byte{}, nil - case si == nil: - return old, nil - case old == nil: - return si.Marshal() - default: - oldSI := objectSDK.NewSplitInfo() - if err := oldSI.Unmarshal(old); err != nil { - return nil, err - } - si = util.MergeSplitInfo(si, oldSI) - return si.Marshal() - } - }) -} - type updateIndexItemFunc = func(b *pebble.Batch, key []byte) error func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error { @@ -354,12 +326,7 @@ func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItem } if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil { - objKey := objectKey(ech.Parent(), make([]byte, objectKeySize)) - err := f(tx, namedBucketItem{ - name: parentBucketName(cnr, bucketName), - key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)), - val: objKey, - }) + err := f(b, parentKey(cnr, *parentSplitParentID, ech.Parent())) if err != nil { return err } @@ -396,161 +363,42 @@ func updateFKBTIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItem return nil } -type bucketContainer interface { - Bucket([]byte) *bbolt.Bucket - CreateBucket([]byte) (*bbolt.Bucket, error) - CreateBucketIfNotExists([]byte) (*bbolt.Bucket, error) -} - -func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Bucket, error) { - if bkt := tx.Bucket(name); bkt != nil { - return bkt, nil - } - return tx.CreateBucket(name) -} - -func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error { - bkt, err := createBucketLikelyExists(tx, item.name) - if err != nil { - return fmt.Errorf("can't create index %v: %w", item.name, err) - } - - data, err := update(bkt.Get(item.key), item.val) - if err != nil { - return err - } - return bkt.Put(item.key, data) -} - -func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil }) -} - -func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - bkt, err := createBucketLikelyExists(tx, item.name) - if err != nil { - return fmt.Errorf("can't create index %v: %w", item.name, err) - } - - fkbtRoot, err := createBucketLikelyExists(bkt, item.key) - if err != nil { - return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err) - } - - return fkbtRoot.Put(item.val, zeroValue) -} - -func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - bkt, err := createBucketLikelyExists(tx, item.name) - if err != nil { - return fmt.Errorf("can't create index %v: %w", item.name, err) - } - - lst, err := decodeList(bkt.Get(item.key)) - if err != nil { - return fmt.Errorf("can't decode leaf list %v: %w", item.key, err) - } - - lst = append(lst, item.val) - - encodedLst, err := encodeList(lst) - if err != nil { - return fmt.Errorf("can't encode leaf list %v: %w", item.key, err) - } - - return bkt.Put(item.key, encodedLst) -} - -// encodeList decodes list of bytes into a single blog for list bucket indexes. -func encodeList(lst [][]byte) ([]byte, error) { - w := io.NewBufBinWriter() - w.WriteVarUint(uint64(len(lst))) - for i := range lst { - w.WriteVarBytes(lst[i]) - } - if w.Err != nil { - return nil, w.Err - } - return w.Bytes(), nil -} - -// decodeList decodes blob into the list of bytes from list bucket index. -func decodeList(data []byte) (lst [][]byte, err error) { - if len(data) == 0 { - return nil, nil - } - - var offset uint64 - size, n, err := getVarUint(data) - if err != nil { - return nil, err - } - - offset += uint64(n) - lst = make([][]byte, size, size+1) - for i := range lst { - sz, n, err := getVarUint(data[offset:]) - if err != nil { - return nil, err - } - offset += uint64(n) - - next := offset + sz - if uint64(len(data)) < next { - return nil, gio.ErrUnexpectedEOF - } - lst[i] = data[offset:next] - offset = next - } - return lst, nil -} - -func getVarUint(data []byte) (uint64, int, error) { - if len(data) == 0 { - return 0, 0, gio.ErrUnexpectedEOF - } - - switch b := data[0]; b { - case 0xfd: - if len(data) < 3 { - return 0, 1, gio.ErrUnexpectedEOF - } - return uint64(binary.LittleEndian.Uint16(data[1:])), 3, nil - case 0xfe: - if len(data) < 5 { - return 0, 1, gio.ErrUnexpectedEOF - } - return uint64(binary.LittleEndian.Uint32(data[1:])), 5, nil - case 0xff: - if len(data) < 9 { - return 0, 1, gio.ErrUnexpectedEOF - } - return binary.LittleEndian.Uint64(data[1:]), 9, nil - default: - return uint64(b), 1, nil - } -} - -// setStorageID for existing objects if they were moved from one -// storage location to another. -func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error { - key := make([]byte, bucketKeySize) - bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key)) - if err != nil { - return err - } - key = objectKey(addr.Object(), key) - if override || bkt.Get(key) == nil { - return bkt.Put(key, id) - } - return nil +func putListIndexItem(b *pebble.Batch, key []byte) error { + return b.Set(key, zeroValue, pebble.Sync) } // updateSpliInfo for existing objects if storage filled with extra information // about last object in split hierarchy or linking object. -func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error { - objKey := objectKey(addr.Object(), make([]byte, bucketKeySize)) - return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from) +func updateSplitInfo(b *pebble.Batch, cnr cid.ID, obj oid.ID, si *objectSDK.SplitInfo) error { + key := rootKey(cnr, obj) + existed, err := valueSafe(b, key) + if err != nil { + return nil + } + + switch { + case si == nil && existed == nil: + return b.Set(key, zeroValue, pebble.Sync) + case si == nil: + return nil + case existed == nil || bytes.Equal(existed, zeroValue): + siBytes, err := si.Marshal() + if err != nil { + return nil + } + return b.Set(key, siBytes, pebble.Sync) + default: + existedSI := objectSDK.NewSplitInfo() + if err := existedSI.Unmarshal(existed); err != nil { + return err + } + si = util.MergeSplitInfo(si, existedSI) + siBytes, err := si.Marshal() + if err != nil { + return nil + } + return b.Set(key, siBytes, pebble.Sync) + } } // splitInfoFromObject returns split info based on last or linkin object.