diff --git a/pkg/local_object_storage/metabase/batch.go b/pkg/local_object_storage/metabase/batch.go new file mode 100644 index 000000000..459545e2d --- /dev/null +++ b/pkg/local_object_storage/metabase/batch.go @@ -0,0 +1,148 @@ +// NOTE: code is partially taken from https://github.com/etcd-io/bbolt/blob/v1.3.10/db.go + +/* +The MIT License (MIT) + +Copyright (c) 2013 Ben Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package meta + +import ( + "errors" + "fmt" + "sync" + "time" + + "go.etcd.io/bbolt" +) + +type fn func(*bbolt.Tx, *bucketCache) error + +type call struct { + fn fn + err chan<- error +} + +type batch struct { + db *DB + timer *time.Timer + start sync.Once + calls []call +} + +func (b *batch) trigger() { + b.start.Do(b.run) +} + +func (b *batch) run() { + b.db.batchMtx.Lock() + b.timer.Stop() + // Make sure no new work is added to this batch, but don't break + // other batches. + if b.db.batch == b { + b.db.batch = nil + } + b.db.batchMtx.Unlock() + + bc := newBucketCache() +retry: + for len(b.calls) > 0 { + failIdx := -1 + err := b.db.boltDB.Update(func(tx *bbolt.Tx) error { + for i, c := range b.calls { + if err := safelyCall(c.fn, tx, bc); err != nil { + failIdx = i + return err + } + } + return nil + }) + + if failIdx >= 0 { + // take the failing transaction out of the batch. it's + // safe to shorten b.calls here because db.batch no longer + // points to us, and we hold the mutex anyway. + c := b.calls[failIdx] + b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] + // tell the submitter re-run it solo, continue with the rest of the batch + c.err <- errTrySolo + continue retry + } + + // pass success, or bolt internal errors, to all callers + for _, c := range b.calls { + c.err <- err + } + break retry + } +} + +func safelyCall(fn func(*bbolt.Tx, *bucketCache) error, tx *bbolt.Tx, bc *bucketCache) (err error) { + defer func() { + if p := recover(); p != nil { + err = panicked{p} + } + }() + return fn(tx, bc) +} + +func (db *DB) Batch(fn fn) error { + errCh := make(chan error, 1) + + db.batchMtx.Lock() + if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.boltBatchSize) { + // There is no existing batch, or the existing batch is full; start a new one. + db.batch = &batch{ + db: db, + } + db.batch.timer = time.AfterFunc(db.boltBatchDelay, db.batch.trigger) + } + db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) + if len(db.batch.calls) >= db.boltBatchSize { + // wake up batch, it's ready to run + go db.batch.trigger() + } + db.batchMtx.Unlock() + + err := <-errCh + if err == errTrySolo { + err = db.boltDB.Update(func(tx *bbolt.Tx) error { + return fn(tx, nil) + }) + } + return err +} + +// errTrySolo is a special sentinel error value used for signaling that a +// transaction function should be re-run. It should never be seen by +// callers. +var errTrySolo = errors.New("batch function returned an error and should be re-run solo") + +type panicked struct { + reason interface{} +} + +func (p panicked) Error() string { + if err, ok := p.reason.(error); ok { + return err.Error() + } + return fmt.Sprintf("panic: %v", p.reason) +} diff --git a/pkg/local_object_storage/metabase/bucket_cache.go b/pkg/local_object_storage/metabase/bucket_cache.go index de1479e6f..2dc85df89 100644 --- a/pkg/local_object_storage/metabase/bucket_cache.go +++ b/pkg/local_object_storage/metabase/bucket_cache.go @@ -9,8 +9,20 @@ type bucketCache struct { locked *bbolt.Bucket graveyard *bbolt.Bucket garbage *bbolt.Bucket + expEpoch *bbolt.Bucket + contVol *bbolt.Bucket + contCount *bbolt.Bucket + shardInfo *bbolt.Bucket expired map[cid.ID]*bbolt.Bucket primary map[cid.ID]*bbolt.Bucket + parent map[cid.ID]*bbolt.Bucket + tombstone map[cid.ID]*bbolt.Bucket + lockers map[cid.ID]*bbolt.Bucket + small map[cid.ID]*bbolt.Bucket + root map[cid.ID]*bbolt.Bucket + expObj map[cid.ID]*bbolt.Bucket + split map[cid.ID]*bbolt.Bucket + ecInfo map[cid.ID]*bbolt.Bucket } func newBucketCache() *bucketCache { @@ -38,6 +50,34 @@ func getGarbageBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket { return getBucket(&bc.garbage, tx, garbageBucketName) } +func getExpEpochToObjectBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket { + if bc == nil { + return tx.Bucket(expEpochToObjectBucketName) + } + return getBucket(&bc.expEpoch, tx, expEpochToObjectBucketName) +} + +func getContainerVolumeBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket { + if bc == nil { + return tx.Bucket(containerVolumeBucketName) + } + return getBucket(&bc.contVol, tx, containerVolumeBucketName) +} + +func getShardInfoBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket { + if bc == nil { + return tx.Bucket(shardInfoBucket) + } + return getBucket(&bc.shardInfo, tx, shardInfoBucket) +} + +func getContainerCounterBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket { + if bc == nil { + return tx.Bucket(containerCounterBucketName) + } + return getBucket(&bc.contCount, tx, containerCounterBucketName) +} + func getBucket(cache **bbolt.Bucket, tx *bbolt.Tx, name []byte) *bbolt.Bucket { if *cache != nil { return *cache @@ -65,6 +105,78 @@ func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr) } +func getParentBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = parentBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.parent, tx, parentBucketName, cnr) +} + +func getTombstoneBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = tombstoneBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.tombstone, tx, tombstoneBucketName, cnr) +} + +func getLockersBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = bucketNameLockers(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.lockers, tx, bucketNameLockers, cnr) +} + +func getSmallBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = smallBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.small, tx, smallBucketName, cnr) +} + +func getRootBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = rootBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.root, tx, rootBucketName, cnr) +} + +func getObjToExpEpochBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = objectToExpirationEpochBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.expObj, tx, objectToExpirationEpochBucketName, cnr) +} + +func getSplitBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = splitBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.split, tx, splitBucketName, cnr) +} + +func getECInfoBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { + if bc == nil { + bucketName := make([]byte, bucketKeySize) + bucketName = ecInfoBucketName(cnr, bucketName) + return tx.Bucket(bucketName) + } + return getMappedBucket(&bc.ecInfo, tx, ecInfoBucketName, cnr) +} + func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket { value, ok := (*m)[cnr] if ok { diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index da27e6085..9a5c1c839 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -100,8 +100,8 @@ func parseContainerSize(v []byte) uint64 { return binary.LittleEndian.Uint64(v) } -func changeContainerSize(tx *bbolt.Tx, id cid.ID, delta uint64, increase bool) error { - containerVolume := tx.Bucket(containerVolumeBucketName) +func changeContainerSize(tx *bbolt.Tx, bc *bucketCache, id cid.ID, delta uint64, increase bool) error { + containerVolume := getContainerVolumeBucket(bc, tx) key := make([]byte, cidSize) id.Encode(key) diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 732f99519..90806ef72 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -231,10 +231,10 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er return result, metaerr.Wrap(err) } -func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { - b := tx.Bucket(shardInfoBucket) +func (db *DB) incCounters(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error { + b := getShardInfoBucket(bc, tx) if b == nil { - return db.incContainerObjectCounter(tx, cnrID, isUserObject) + return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject) } if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil { @@ -248,7 +248,7 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { return fmt.Errorf("increase user object counter: %w", err) } } - return db.incContainerObjectCounter(tx, cnrID, isUserObject) + return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject) } func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error { @@ -338,8 +338,8 @@ func nextValue(existed, delta uint64, inc bool) uint64 { return existed } -func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { - b := tx.Bucket(containerCounterBucketName) +func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error { + b := getContainerCounterBucket(bc, tx) if b == nil { return nil } diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 4474aa229..417ea8b43 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -44,6 +44,9 @@ type DB struct { boltDB *bbolt.DB initialized bool + + batchMtx sync.Mutex + batch *batch } // Option is an option of DB constructor. diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index d338e228f..8ffc45cea 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -110,8 +110,8 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { var err error var res DeleteRes - err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - res, err = db.deleteGroup(tx, prm.addrs) + err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { + res, err = db.deleteGroup(tx, bc, prm.addrs) return err }) if err == nil { @@ -127,7 +127,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { // deleteGroup deletes object from the metabase. Handles removal of the // references of the split objects. -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) { +func (db *DB) deleteGroup(tx *bbolt.Tx, bc *bucketCache, addrs []oid.Address) (DeleteRes, error) { res := DeleteRes{ removedByCnrID: make(map[cid.ID]ObjectCounters), } @@ -135,7 +135,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) currEpoch := db.epochState.CurrentEpoch() for i := range addrs { - r, err := db.delete(tx, addrs[i], refCounter, currEpoch) + r, err := db.delete(tx, bc, addrs[i], refCounter, currEpoch) if err != nil { return DeleteRes{}, err } @@ -149,7 +149,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) for _, refNum := range refCounter { if refNum.cur == refNum.all { - err := db.deleteObject(tx, refNum.obj, true) + err := db.deleteObject(tx, bc, refNum.obj, true) if err != nil { return DeleteRes{}, err } @@ -243,16 +243,16 @@ type deleteSingleResult struct { // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object // counter). The third return value The fourth return value is removed object payload size. -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) { +func (db *DB) delete(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) { key := make([]byte, addressKeySize) addrKey := addressKey(addr, key) - garbageBKT := tx.Bucket(garbageBucketName) - graveyardBKT := tx.Bucket(graveyardBucketName) + garbageBKT := getGarbageBucket(bc, tx) + graveyardBKT := getGraveyardBucket(bc, tx) removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0 // unmarshal object, work only with physically stored (raw == true) objects - obj, err := db.get(tx, addr, key, false, true, currEpoch) + obj, err := db.getWithCache(bc, tx, addr, key, false, true, currEpoch) if err != nil { if client.IsErrObjectNotFound(err) { addrKey = addressKey(addr, key) @@ -293,7 +293,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter nRef, ok := refCounter[k] if !ok { nRef = &referenceNumber{ - all: parentLength(tx, parAddr), + all: parentLength(tx, bc, parAddr), obj: parent, } @@ -306,12 +306,12 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter isUserObject := IsUserObject(obj) // remove object - err = db.deleteObject(tx, obj, false) + err = db.deleteObject(tx, bc, obj, false) if err != nil { return deleteSingleResult{}, fmt.Errorf("remove object: %w", err) } - if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil { + if err := deleteECRelatedInfo(tx, bc, obj, addr.Container(), refCounter); err != nil { return deleteSingleResult{}, err } @@ -325,15 +325,16 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter func (db *DB) deleteObject( tx *bbolt.Tx, + bc *bucketCache, obj *objectSDK.Object, isParent bool, ) error { - err := delUniqueIndexes(tx, obj, isParent) + err := delUniqueIndexes(tx, bc, obj, isParent) if err != nil { return errFailedToRemoveUniqueIndexes } - err = updateListIndexes(tx, obj, delListIndexItem) + err = updateListIndexes(tx, bc, obj, delListIndexItem, false) if err != nil { return fmt.Errorf("remove list indexes: %w", err) } @@ -345,7 +346,7 @@ func (db *DB) deleteObject( if isParent { // remove record from the garbage bucket, because regular object deletion does nothing for virtual object - garbageBKT := tx.Bucket(garbageBucketName) + garbageBKT := getGarbageBucket(bc, tx) if garbageBKT != nil { key := make([]byte, addressKeySize) addrKey := addressKey(object.AddressOf(obj), key) @@ -360,10 +361,10 @@ func (db *DB) deleteObject( } // parentLength returns amount of available children from parentid index. -func parentLength(tx *bbolt.Tx, addr oid.Address) int { +func parentLength(tx *bbolt.Tx, bc *bucketCache, addr oid.Address) int { bucketName := make([]byte, bucketKeySize) - bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:])) + bkt := getParentBucket(bc, tx, addr.Container()) if bkt == nil { return 0 } @@ -376,15 +377,16 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int { return len(lst) } -func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) { - bkt := tx.Bucket(item.name) +func delUniqueIndexItem(item bucketItem) error { + bkt := item.bucket if bkt != nil { - _ = bkt.Delete(item.key) // ignore error, best effort there + return bkt.Delete(item.key) } + return nil } -func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - bkt := tx.Bucket(item.name) +func delListIndexItem(item bucketItem) error { + bkt := item.bucket if bkt == nil { return nil } @@ -405,19 +407,16 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { // if list empty, remove the key from bucket if len(lst) == 0 { - _ = bkt.Delete(item.key) // ignore error, best effort there - - return nil + return bkt.Delete(item.key) } // if list is not empty, then update it encodedLst, err := encodeList(lst) if err != nil { - return nil // ignore error, best effort there + return err } - _ = bkt.Put(item.key, encodedLst) // ignore error, best effort there - return nil + return bkt.Put(item.key, encodedLst) } func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { @@ -460,67 +459,79 @@ func hasAnyItem(b *bbolt.Bucket) bool { return hasAnyItem } -func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error { +func delUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, isParent bool) error { addr := object.AddressOf(obj) objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) cnr := addr.Container() - bucketName := make([]byte, bucketKeySize) + var bkt *bbolt.Bucket // add value to primary unique bucket if !isParent { switch obj.Type() { case objectSDK.TypeRegular: - bucketName = primaryBucketName(cnr, bucketName) + bkt = getPrimaryBucket(bc, tx, cnr) case objectSDK.TypeTombstone: - bucketName = tombstoneBucketName(cnr, bucketName) + bkt = getTombstoneBucket(bc, tx, cnr) case objectSDK.TypeLock: - bucketName = bucketNameLockers(cnr, bucketName) + bkt = getLockersBucket(bc, tx, cnr) default: return ErrUnknownObjectType } - delUniqueIndexItem(tx, namedBucketItem{ - name: bucketName, - key: objKey, - }) + if err := delUniqueIndexItem(bucketItem{ + bucket: bkt, + key: objKey, + }); err != nil { + return err + } } else { - delUniqueIndexItem(tx, namedBucketItem{ - name: parentBucketName(cnr, bucketName), - key: objKey, - }) + if err := delUniqueIndexItem(bucketItem{ + bucket: getParentBucket(bc, tx, cnr), + key: objKey, + }); err != nil { + return err + } } - delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index - name: smallBucketName(cnr, bucketName), - key: objKey, - }) - delUniqueIndexItem(tx, namedBucketItem{ // remove from root index - name: rootBucketName(cnr, bucketName), - key: objKey, - }) + if err := delUniqueIndexItem(bucketItem{ // remove from storage id index + bucket: getSmallBucket(bc, tx, cnr), + key: objKey, + }); err != nil { + return err + } + if err := delUniqueIndexItem(bucketItem{ // remove from root index + bucket: getRootBucket(bc, tx, cnr), + key: objKey, + }); err != nil { + return err + } if expEpoch, ok := hasExpirationEpoch(obj); ok { - delUniqueIndexItem(tx, namedBucketItem{ - name: expEpochToObjectBucketName, - key: expirationEpochKey(expEpoch, cnr, addr.Object()), - }) - delUniqueIndexItem(tx, namedBucketItem{ - name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)), - key: objKey, - }) + if err := delUniqueIndexItem(bucketItem{ + bucket: getExpEpochToObjectBucket(bc, tx), + key: expirationEpochKey(expEpoch, cnr, addr.Object()), + }); err != nil { + return err + } + if err := delUniqueIndexItem(bucketItem{ + bucket: getObjToExpEpochBucket(bc, tx, cnr), + key: objKey, + }); err != nil { + return err + } } return nil } -func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error { +func deleteECRelatedInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error { ech := obj.ECHeader() if ech == nil { return nil } - - hasAnyChunks := hasAnyECChunks(tx, ech, cnr) + garbageBKT := getGarbageBucket(bc, tx) + hasAnyChunks := hasAnyECChunks(tx, bc, ech, cnr) // drop EC parent GC mark if current EC chunk is the last one if !hasAnyChunks && garbageBKT != nil { var ecParentAddress oid.Address @@ -535,10 +546,12 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK. // also drop EC parent root info if current EC chunk is the last one if !hasAnyChunks { - delUniqueIndexItem(tx, namedBucketItem{ - name: rootBucketName(cnr, make([]byte, bucketKeySize)), - key: objectKey(ech.Parent(), make([]byte, objectKeySize)), - }) + if err := delUniqueIndexItem(bucketItem{ + bucket: getRootBucket(bc, tx, cnr), + key: objectKey(ech.Parent(), make([]byte, objectKeySize)), + }); err != nil { + return err + } } if ech.ParentSplitParentID() == nil { @@ -557,7 +570,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK. return nil } - if parentLength(tx, splitParentAddress) > 0 { + if parentLength(tx, bc, splitParentAddress) > 0 { // linking object still exists, so leave split info and gc mark deletion for linking object processing return nil } @@ -572,15 +585,14 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK. } // drop split info - delUniqueIndexItem(tx, namedBucketItem{ - name: rootBucketName(cnr, make([]byte, bucketKeySize)), - key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)), + return delUniqueIndexItem(bucketItem{ + bucket: getRootBucket(bc, tx, cnr), + key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)), }) - return nil } -func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool { - data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)), +func hasAnyECChunks(tx *bbolt.Tx, bc *bucketCache, ech *objectSDK.ECHeader, cnr cid.ID) bool { + data := getFromBucket(getECInfoBucket(bc, tx, cnr), objectKey(ech.Parent(), make([]byte, objectKeySize))) return len(data) > 0 } diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 7bd6f90a6..9be2c35da 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -81,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err currEpoch := db.epochState.CurrentEpoch() err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch) + res.exists, res.locked, err = db.exists(tx, newBucketCache(), prm.addr, prm.ecParentAddr, currEpoch) return err }) @@ -89,10 +89,10 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err return res, metaerr.Wrap(err) } -func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) { +func (db *DB) exists(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) { var locked bool if !ecParent.Equals(oid.Address{}) { - st, err := objectStatus(tx, ecParent, currEpoch) + st, err := objectStatusWithCache(bc, tx, ecParent, currEpoch) if err != nil { return false, false, err } @@ -103,10 +103,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE return false, locked, ErrObjectIsExpired } - locked = objectLocked(tx, ecParent.Container(), ecParent.Object()) + locked = objectLockedWithCache(bc, tx, ecParent.Container(), ecParent.Object()) } // check graveyard and object expiration first - st, err := objectStatus(tx, addr, currEpoch) + st, err := objectStatusWithCache(bc, tx, addr, currEpoch) if err != nil { return false, false, err } @@ -122,16 +122,15 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) cnr := addr.Container() - key := make([]byte, bucketKeySize) // if graveyard is empty, then check if object exists in primary bucket - if inBucket(tx, primaryBucketName(cnr, key), objKey) { + if inBucket(getPrimaryBucket(bc, tx, cnr), objKey) { return true, locked, nil } // if primary bucket is empty, then check if object exists in parent bucket - if inBucket(tx, parentBucketName(cnr, key), objKey) { - splitInfo, err := getSplitInfo(tx, cnr, objKey) + if inBucket(getParentBucket(bc, tx, cnr), objKey) { + splitInfo, err := getSplitInfo(tx, bc, cnr, objKey) if err != nil { return false, locked, err } @@ -139,12 +138,12 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } // if parent bucket is empty, then check if object exists in ec bucket - if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 { - return false, locked, getECInfoError(tx, cnr, data) + if data := getFromBucket(getECInfoBucket(bc, tx, cnr), objKey); len(data) != 0 { + return false, locked, getECInfoError(tx, bc, cnr, data) } // if parent bucket is empty, then check if object exists in typed buckets - return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil + return firstIrregularObjectType(tx, bc, cnr, objKey) != objectSDK.TypeRegular, locked, nil } // objectStatus returns: @@ -152,10 +151,6 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE // - 1 if object with GC mark; // - 2 if object is covered with tombstone; // - 3 if object is expired. -func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) { - return objectStatusWithCache(nil, tx, addr, currEpoch) -} - func objectStatusWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) { // locked object could not be removed/marked with GC/expired if objectLockedWithCache(bc, tx, addr.Container(), addr.Object()) { @@ -207,8 +202,7 @@ func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uin } // inBucket checks if key is present in bucket . -func inBucket(tx *bbolt.Tx, name, key []byte) bool { - bkt := tx.Bucket(name) +func inBucket(bkt *bbolt.Bucket, key []byte) bool { if bkt == nil { return false } @@ -221,9 +215,8 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool { // getSplitInfo returns SplitInfo structure from root index. Returns error // if there is no `key` record in root index. -func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) { - bucketName := rootBucketName(cnr, make([]byte, bucketKeySize)) - rawSplitInfo := getFromBucket(tx, bucketName, key) +func getSplitInfo(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) { + rawSplitInfo := getFromBucket(getRootBucket(bc, tx, cnr), key) if len(rawSplitInfo) == 0 { return nil, ErrLackSplitInfo } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 821810c09..352803bf2 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -110,7 +110,6 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key key = objectKey(addr.Object(), key) cnr := addr.Container() obj := objectSDK.New() - bucketName := make([]byte, bucketKeySize) // check in primary index if b := getPrimaryBucket(bc, tx, cnr); b != nil { @@ -119,43 +118,40 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key } } - data := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) + data := getFromBucket(getECInfoBucket(bc, tx, cnr), key) if len(data) != 0 { - return nil, getECInfoError(tx, cnr, data) + return nil, getECInfoError(tx, bc, cnr, data) } // if not found then check in tombstone index - data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) + data = getFromBucket(getTombstoneBucket(bc, tx, cnr), key) if len(data) != 0 { return obj, obj.Unmarshal(data) } // if not found then check in locker index - data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key) + data = getFromBucket(getLockersBucket(bc, tx, cnr), key) if len(data) != 0 { return obj, obj.Unmarshal(data) } // if not found then check if object is a virtual - return getVirtualObject(tx, cnr, key, raw) + return getVirtualObject(tx, bc, cnr, key, raw) } -func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { - bkt := tx.Bucket(name) +func getFromBucket(bkt *bbolt.Bucket, key []byte) []byte { if bkt == nil { return nil } - return bkt.Get(key) } -func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) { +func getVirtualObject(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) { if raw { - return nil, getSplitInfoError(tx, cnr, key) + return nil, getSplitInfoError(tx, bc, cnr, key) } - bucketName := make([]byte, bucketKeySize) - parentBucket := tx.Bucket(parentBucketName(cnr, bucketName)) + parentBucket := getParentBucket(bc, tx, cnr) if parentBucket == nil { return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) } @@ -172,17 +168,17 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD var data []byte for i := 0; i < len(relativeLst) && len(data) == 0; i++ { virtualOID := relativeLst[len(relativeLst)-i-1] - data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) + data = getFromBucket(getPrimaryBucket(bc, tx, cnr), virtualOID) } if len(data) == 0 { // check if any of the relatives is an EC object for _, relative := range relativeLst { - data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative) + data = getFromBucket(getECInfoBucket(bc, tx, cnr), relative) if len(data) > 0 { // we can't return object headers, but can return error, // so assembler can try to assemble complex object - return nil, getSplitInfoError(tx, cnr, key) + return nil, getSplitInfoError(tx, bc, cnr, key) } } } @@ -203,8 +199,8 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD return par, nil } -func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error { - splitInfo, err := getSplitInfo(tx, cnr, key) +func getSplitInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) error { + splitInfo, err := getSplitInfo(tx, bc, cnr, key) if err == nil { return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } @@ -212,7 +208,7 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } -func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error { +func getECInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, data []byte) error { keys, err := decodeList(data) if err != nil { return err @@ -220,7 +216,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error { ecInfo := objectSDK.NewECInfo() for _, key := range keys { // check in primary index - objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key) + objData := getFromBucket(getPrimaryBucket(bc, tx, cnr), key) if len(objData) != 0 { obj := objectSDK.New() if err := obj.Unmarshal(objData); err != nil { diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index 2f23d424c..21830999d 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -287,19 +287,19 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh var res InhumeRes - err := db.boltDB.Batch(func(tx *bbolt.Tx) error { + err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)} - garbageBKT := tx.Bucket(garbageBucketName) - graveyardBKT := tx.Bucket(graveyardBucketName) + garbageBKT := getGarbageBucket(bc, tx) + graveyardBKT := getGraveyardBucket(bc, tx) - bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm) + targetBucket, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm) if err != nil { return err } for i := range tss { - if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil { + if err := db.inhumeTxSingle(tx, bc, targetBucket, value, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil { return err } if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil { diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 76018fb61..434bf2129 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -199,8 +199,8 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { inhumedByCnrID: make(map[cid.ID]ObjectCounters), } currEpoch := db.epochState.CurrentEpoch() - err := db.boltDB.Batch(func(tx *bbolt.Tx) error { - return db.inhumeTx(tx, currEpoch, prm, &res) + err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { + return db.inhumeTx(tx, bc, currEpoch, prm, &res) }) success = err == nil if success { @@ -213,18 +213,15 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { return res, metaerr.Wrap(err) } -func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error { - garbageBKT := tx.Bucket(garbageBucketName) - graveyardBKT := tx.Bucket(graveyardBucketName) - - bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm) +func (db *DB) inhumeTx(tx *bbolt.Tx, bc *bucketCache, epoch uint64, prm InhumePrm, res *InhumeRes) error { + bkt, value, err := db.getInhumeTargetBucketAndValue(getGarbageBucket(bc, tx), getGraveyardBucket(bc, tx), prm) if err != nil { return err } buf := make([]byte, addressKeySize) for i := range prm.target { - if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil { + if err := db.inhumeTxSingle(tx, bc, bkt, value, prm.target[i], buf, epoch, prm, res); err != nil { return err } } @@ -232,13 +229,12 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes return db.applyInhumeResToCounters(tx, res) } -func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error { +func (db *DB) inhumeTxSingle(tx *bbolt.Tx, bc *bucketCache, targetBucket *bbolt.Bucket, value []byte, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error { id := addr.Object() cnr := addr.Container() - tx := bkt.Tx() // prevent locked objects to be inhumed - if !prm.forceRemoval && objectLocked(tx, cnr, id) { + if !prm.forceRemoval && objectLockedWithCache(bc, tx, cnr, id) { return new(apistatus.ObjectLocked) } @@ -248,23 +244,23 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb // if `Inhume` was called not with the // `WithForceGCMark` option if !prm.forceRemoval { - if isLockObject(tx, cnr, id) { + if isLockObject(tx, bc, cnr, id) { return ErrLockObjectRemoval } lockWasChecked = true } - obj, err := db.get(tx, addr, buf, false, true, epoch) + obj, err := db.getWithCache(bc, tx, addr, buf, false, true, epoch) targetKey := addressKey(addr, buf) var ecErr *objectSDK.ECInfoError if err == nil { - err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) + err = db.updateDeleteInfo(tx, bc, targetKey, cnr, obj, res) if err != nil { return err } } else if errors.As(err, &ecErr) { - err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value) + err = db.inhumeECInfo(tx, bc, epoch, prm.tomb, res, ecErr.ECInfo(), cnr, targetBucket, value) if err != nil { return err } @@ -272,7 +268,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb if prm.tomb != nil { var isTomb bool - isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey) + isTomb, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), targetKey) if err != nil { return err } @@ -283,7 +279,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb } // consider checking if target is already in graveyard? - err = bkt.Put(targetKey, value) + err = targetBucket.Put(targetKey, value) if err != nil { return err } @@ -297,15 +293,14 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb return nil } - if isLockObject(tx, cnr, id) { + if isLockObject(tx, bc, cnr, id) { res.deletedLockObj = append(res.deletedLockObj, addr) } } return nil } -func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, - garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket, +func (db *DB) inhumeECInfo(tx *bbolt.Tx, bc *bucketCache, epoch uint64, tomb *oid.Address, res *InhumeRes, ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, ) error { for _, chunk := range ecInfo.Chunks { @@ -318,17 +313,17 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I return err } chunkAddr.SetObject(chunkID) - chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch) + chunkObj, err := db.getWithCache(bc, tx, chunkAddr, chunkBuf, false, true, epoch) if err != nil { return err } chunkKey := addressKey(chunkAddr, chunkBuf) - err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res) + err = db.updateDeleteInfo(tx, bc, chunkKey, cnr, chunkObj, res) if err != nil { return err } if tomb != nil { - _, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey) + _, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), chunkKey) if err != nil { return err } @@ -398,16 +393,16 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte return false, garbageBKT.Put(addressKey, zeroValue) } -func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { +func (db *DB) updateDeleteInfo(tx *bbolt.Tx, bc *bucketCache, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { containerID, _ := obj.ContainerID() - if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { + if inGraveyardWithKey(targetKey, getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx)) == 0 { res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj)) } // if object is stored, and it is regular object then update bucket // with container size estimations if obj.Type() == objectSDK.TypeRegular { - err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) + err := changeContainerSize(tx, bc, cnr, obj.PayloadSize(), false) if err != nil { return err } diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 9cccd7dad..99a98b9f5 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -76,7 +76,8 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH } func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { - b := tx.Bucket(expEpochToObjectBucketName) + bc := newBucketCache() + b := getExpEpochToObjectBucket(bc, tx) c := b.Cursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { expiresAfter, cnr, obj, err := parseExpirationEpochKey(k) @@ -87,7 +88,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) if expiresAfter >= epoch { return nil } - if objectLocked(tx, cnr, obj) { + if objectLockedWithCache(bc, tx, cnr, obj) { continue } var addr oid.Address @@ -95,7 +96,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) addr.SetObject(obj) objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) err = h(&ExpiredObject{ - typ: firstIrregularObjectType(tx, cnr, objKey), + typ: firstIrregularObjectType(tx, bc, cnr, objKey), addr: addr, }) if err == nil { diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index f73c2b4f6..1a6fd6d0d 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -79,12 +79,12 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error { } key := make([]byte, cidSize) - return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error { - if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular { + return metaerr.Wrap(db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { + if firstIrregularObjectType(tx, bc, cnr, bucketKeysLocked...) != objectSDK.TypeRegular { return logicerr.Wrap(new(apistatus.LockNonRegularObject)) } - bucketLocked := tx.Bucket(bucketNameLocked) + bucketLocked := getLockedBucket(bc, tx) cnr.Encode(key) bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key) @@ -144,9 +144,9 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) { var unlockedObjects []oid.Address - if err := db.boltDB.Batch(func(tx *bbolt.Tx) error { + if err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { for i := range lockers { - unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object()) + unlocked, err := freePotentialLocks(tx, bc, lockers[i].Container(), lockers[i].Object()) if err != nil { return err } @@ -211,9 +211,9 @@ func getLocks(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) { // Operation is very resource-intensive, which is caused by the admissibility // of multiple locks. Also, if we knew what objects are locked, it would be // possible to speed up the execution. -func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) { +func freePotentialLocks(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) { var unlockedObjects []oid.Address - bucketLocked := tx.Bucket(bucketNameLocked) + bucketLocked := getLockedBucket(bc, tx) if bucketLocked == nil { return unlockedObjects, nil } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 5e1bbfe9e..1f7a8cfb3 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" @@ -28,6 +29,11 @@ type ( namedBucketItem struct { name, key, val []byte } + + bucketItem struct { + bucket *bbolt.Bucket + key, val []byte + } ) // PutPrm groups the parameters of Put operation. @@ -63,6 +69,8 @@ var ( ErrIncorrectRootObject = errors.New("invalid root object") ) +const bucketNilAsserMsg = "bucket to put data is nil" + // Put saves object header in metabase. Object payload expected to be cut. // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. @@ -93,9 +101,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { currEpoch := db.epochState.CurrentEpoch() - err = db.boltDB.Batch(func(tx *bbolt.Tx) error { + err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { var e error - res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes) + res, e = db.put(tx, bc, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes) return e }) if err == nil { @@ -109,6 +117,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { } func (db *DB) put(tx *bbolt.Tx, + bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, @@ -128,7 +137,7 @@ func (db *DB) put(tx *bbolt.Tx, isParent := si != nil - exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch) + exists, _, err := db.exists(tx, bc, objectCore.AddressOf(obj), ecParentAddress, currEpoch) var splitInfoError *objectSDK.SplitInfoError if errors.As(err, &splitInfoError) { @@ -138,51 +147,51 @@ func (db *DB) put(tx *bbolt.Tx, } if exists { - return PutRes{}, db.updateObj(tx, obj, id, si, isParent) + return PutRes{}, db.updateObj(tx, bc, obj, id, si, isParent) } - return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes) + return PutRes{Inserted: true}, db.insertObject(tx, bc, obj, id, si, isParent, cnr, currEpoch, indexAttributes) } -func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { +func (db *DB) updateObj(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { // most right child and split header overlap parent so we have to // check if object exists to not overwrite it twice // When storage engine moves objects between different sub-storages, // it calls metabase.Put method with new storage ID, thus triggering this code. if !isParent && id != nil { - return setStorageID(tx, objectCore.AddressOf(obj), id, true) + return setStorageID(tx, bc, objectCore.AddressOf(obj), id, true) } // when storage already has last object in split hierarchy and there is // a linking object to put (or vice versa), we should update split info // with object ids of these objects if isParent { - return updateSplitInfo(tx, objectCore.AddressOf(obj), si) + return updateSplitInfo(tx, bc, objectCore.AddressOf(obj), si) } return nil } -func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error { +func (db *DB) insertObject(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error { if par := obj.Parent(); par != nil && !isParent { // limit depth by two parentSI, err := splitInfoFromObject(obj) if err != nil { return err } - _, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes) + _, err = db.put(tx, bc, par, id, parentSI, currEpoch, indexAttributes) if err != nil { return err } } - err := putUniqueIndexes(tx, obj, si, id) + err := putUniqueIndexes(tx, bc, obj, si, id) if err != nil { return fmt.Errorf("put unique indexes: %w", err) } - err = updateListIndexes(tx, obj, putListIndexItem) + err = updateListIndexes(tx, bc, obj, putListIndexItem, true) if err != nil { return fmt.Errorf("put list indexes: %w", err) } @@ -196,14 +205,14 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o // update container volume size estimation if obj.Type() == objectSDK.TypeRegular && !isParent { - err = changeContainerSize(tx, cnr, obj.PayloadSize(), true) + err = changeContainerSize(tx, bc, cnr, obj.PayloadSize(), true) if err != nil { return err } } if !isParent { - if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil { + if err = db.incCounters(tx, bc, cnr, IsUserObject(obj)); err != nil { return err } } @@ -211,69 +220,78 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o return nil } -func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error { +func putUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error { isParent := si != nil addr := objectCore.AddressOf(obj) objKey := objectKey(addr.Object(), make([]byte, objectKeySize)) bucketName := make([]byte, bucketKeySize) if !isParent { - err := putRawObjectData(tx, obj, bucketName, addr, objKey) + err := putRawObjectData(tx, bc, obj, bucketName, addr, objKey) if err != nil { return err } if id != nil { - if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil { + if err = setStorageID(tx, bc, objectCore.AddressOf(obj), id, false); err != nil { return err } } } - if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil { + if err := putExpirationEpoch(tx, bc, obj, addr, objKey); err != nil { return err } - return putSplitInfo(tx, obj, bucketName, addr, si, objKey) + return putSplitInfo(tx, bc, obj, bucketName, addr, si, objKey) } -func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error { +func putRawObjectData(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error { + var bkt *bbolt.Bucket + var err error switch obj.Type() { case objectSDK.TypeRegular: - bucketName = primaryBucketName(addr.Container(), bucketName) + bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getPrimaryBucket, primaryBucketName(addr.Container(), bucketName), true) case objectSDK.TypeTombstone: - bucketName = tombstoneBucketName(addr.Container(), bucketName) + bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getTombstoneBucket, tombstoneBucketName(addr.Container(), bucketName), true) case objectSDK.TypeLock: - bucketName = bucketNameLockers(addr.Container(), bucketName) + bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getLockersBucket, bucketNameLockers(addr.Container(), bucketName), true) default: return ErrUnknownObjectType } + if err != nil { + return err + } rawObject, err := obj.CutPayload().Marshal() if err != nil { return fmt.Errorf("marshal object header: %w", err) } - return putUniqueIndexItem(tx, namedBucketItem{ - name: bucketName, - key: objKey, - val: rawObject, + return putUniqueIndexItem(bucketItem{ + bucket: bkt, + key: objKey, + val: rawObject, }) } -func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error { +func putExpirationEpoch(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, addr oid.Address, objKey []byte) error { if expEpoch, ok := hasExpirationEpoch(obj); ok { - err := putUniqueIndexItem(tx, namedBucketItem{ - name: expEpochToObjectBucketName, - key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()), - val: zeroValue, + err := putUniqueIndexItem(bucketItem{ + bucket: getExpEpochToObjectBucket(bc, tx), + key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()), + val: zeroValue, }) if err != nil { return err } + bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getObjToExpEpochBucket, objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)), true) + if err != nil { + return err + } val := make([]byte, epochSize) binary.LittleEndian.PutUint64(val, expEpoch) - err = putUniqueIndexItem(tx, namedBucketItem{ - name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)), - key: objKey, - val: val, + err = putUniqueIndexItem(bucketItem{ + bucket: bkt, + key: objKey, + val: val, }) if err != nil { return err @@ -282,7 +300,7 @@ func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, o return nil } -func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error { +func putSplitInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error { if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { if ecHead := obj.ECHeader(); ecHead != nil { parentID := ecHead.Parent() @@ -300,15 +318,19 @@ func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr o } objKey = objectKey(parentID, objKey) } - return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si) + return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), bucketName, si) } return nil } -func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error { - return updateUniqueIndexItem(tx, namedBucketItem{ - name: rootBucketName(cnr, bucketName), - key: objKey, +func updateSplitInfoIndex(tx *bbolt.Tx, bc *bucketCache, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error { + bkt, err := getOrCreateBucket(tx, bc, cnr, getRootBucket, rootBucketName(cnr, bucketName), true) + if err != nil { + return err + } + return updateUniqueIndexItem(bucketItem{ + bucket: bkt, + key: objKey, }, func(old, _ []byte) ([]byte, error) { switch { case si == nil && old == nil: @@ -328,78 +350,100 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName [] }) } -type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error +type updateIndexItemFunc = func(item bucketItem) error -func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { +func updateListIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, f updateIndexItemFunc, createBuckets bool) error { idObj, _ := obj.ID() cnr, _ := obj.ContainerID() objKey := objectKey(idObj, make([]byte, objectKeySize)) bucketName := make([]byte, bucketKeySize) idParent, ok := obj.ParentID() - // index parent ids if ok { - err := f(tx, namedBucketItem{ - name: parentBucketName(cnr, bucketName), - key: objectKey(idParent, make([]byte, objectKeySize)), - val: objKey, - }) + bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets) if err != nil { return err } + if err := f(bucketItem{ + bucket: bkt, + key: objectKey(idParent, make([]byte, objectKeySize)), + val: objKey, + }); err != nil { + return err + } } // index split ids if obj.SplitID() != nil { - err := f(tx, namedBucketItem{ - name: splitBucketName(cnr, bucketName), - key: obj.SplitID().ToV2(), - val: objKey, - }) + bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets) if err != nil { return err } + if err := f(bucketItem{ + bucket: bkt, + key: obj.SplitID().ToV2(), + val: objKey, + }); err != nil { + return err + } } if ech := obj.ECHeader(); ech != nil { - err := f(tx, namedBucketItem{ - name: ecInfoBucketName(cnr, bucketName), - key: objectKey(ech.Parent(), make([]byte, objectKeySize)), - val: objKey, - }) + bkt, err := getOrCreateBucket(tx, bc, cnr, getECInfoBucket, ecInfoBucketName(cnr, bucketName), createBuckets) if err != nil { return err } + if err := f(bucketItem{ + bucket: bkt, + key: objectKey(ech.Parent(), make([]byte, objectKeySize)), + val: objKey, + }); err != nil { + return err + } if ech.ParentSplitID() != nil { - objKey := objectKey(ech.Parent(), make([]byte, objectKeySize)) - err := f(tx, namedBucketItem{ - name: splitBucketName(cnr, bucketName), - key: ech.ParentSplitID().ToV2(), - val: objKey, - }) + bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets) if err != nil { return err } + if err := f(bucketItem{ + bucket: bkt, + key: ech.ParentSplitID().ToV2(), + val: objectKey(ech.Parent(), make([]byte, objectKeySize)), + }); err != nil { + return err + } } if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil { - objKey := objectKey(ech.Parent(), make([]byte, objectKeySize)) - err := f(tx, namedBucketItem{ - name: parentBucketName(cnr, bucketName), - key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)), - val: objKey, - }) + bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets) if err != nil { return err } + if err := f(bucketItem{ + bucket: bkt, + key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)), + val: objectKey(ech.Parent(), make([]byte, objectKeySize)), + }); err != nil { + return err + } } } return nil } +func getOrCreateBucket(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, getter func(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket, + bucketName []byte, create bool, +) (*bbolt.Bucket, error) { + bkt := getter(bc, tx, cnr) + if bkt == nil && create { + return createBucketLikelyExists(tx, bucketName) + } + return bkt, nil +} + var indexedAttributes = map[string]struct{}{ "S3-Access-Box-CRDT-Name": {}, objectSDK.AttributeFilePath: {}, @@ -411,7 +455,9 @@ func IsAtrributeIndexed(attr string) bool { return found } -func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { +type updateFKBTIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error + +func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateFKBTIndexItemFunc) error { id, _ := obj.ID() cnr, _ := obj.ContainerID() objKey := objectKey(id, make([]byte, objectKeySize)) @@ -471,11 +517,9 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck return tx.CreateBucket(name) } -func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error { - bkt, err := createBucketLikelyExists(tx, item.name) - if err != nil { - return fmt.Errorf("create index %v: %w", item.name, err) - } +func updateUniqueIndexItem(item bucketItem, update func(oldData, newData []byte) ([]byte, error)) error { + bkt := item.bucket + assert.True(bkt != nil, bucketNilAsserMsg) data, err := update(bkt.Get(item.key), item.val) if err != nil { @@ -484,8 +528,8 @@ func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldDa return bkt.Put(item.key, data) } -func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil }) +func putUniqueIndexItem(item bucketItem) error { + return updateUniqueIndexItem(item, func(_, val []byte) ([]byte, error) { return val, nil }) } func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { @@ -502,11 +546,9 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { return fkbtRoot.Put(item.val, zeroValue) } -func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { - bkt, err := createBucketLikelyExists(tx, item.name) - if err != nil { - return fmt.Errorf("create index %v: %w", item.name, err) - } +func putListIndexItem(item bucketItem) error { + bkt := item.bucket + assert.True(bkt != nil, bucketNilAsserMsg) lst, err := decodeList(bkt.Get(item.key)) if err != nil { @@ -595,9 +637,9 @@ func getVarUint(data []byte) (uint64, int, error) { // setStorageID for existing objects if they were moved from one // storage location to another. -func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error { +func setStorageID(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, id []byte, override bool) error { key := make([]byte, bucketKeySize) - bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key)) + bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getSmallBucket, smallBucketName(addr.Container(), key), true) if err != nil { return err } @@ -610,9 +652,9 @@ func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) erro // updateSpliInfo for existing objects if storage filled with extra information // about last object in split hierarchy or linking object. -func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error { +func updateSplitInfo(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, from *objectSDK.SplitInfo) error { objKey := objectKey(addr.Object(), make([]byte, bucketKeySize)) - return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from) + return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), make([]byte, bucketKeySize), from) } // splitInfoFromObject returns split info based on last or linkin object. diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 60da50671..268a7751c 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -413,7 +413,7 @@ func (db *DB) selectObjectID( addr.SetObject(id) var splitInfoError *objectSDK.SplitInfoError - ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch) + ok, _, err := db.exists(tx, nil, addr, oid.Address{}, currEpoch) if (err == nil && ok) || errors.As(err, &splitInfoError) { raw := make([]byte, objectKeySize) id.Encode(raw) diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index 8f2376503..e73672e0c 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -126,8 +126,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res return res, ErrReadOnlyMode } - err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return setStorageID(tx, prm.addr, prm.id, true) + err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error { + return setStorageID(tx, bc, prm.addr, prm.id, true) }) success = err == nil return res, metaerr.Wrap(err) diff --git a/pkg/local_object_storage/metabase/upgrade.go b/pkg/local_object_storage/metabase/upgrade.go index 4948f3424..d959d34d1 100644 --- a/pkg/local_object_storage/metabase/upgrade.go +++ b/pkg/local_object_storage/metabase/upgrade.go @@ -168,20 +168,7 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a return nil } if err := db.Batch(func(tx *bbolt.Tx) error { - if err := putUniqueIndexItem(tx, namedBucketItem{ - name: expEpochToObjectBucketName, - key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID), - val: zeroValue, - }); err != nil { - return err - } - val := make([]byte, epochSize) - binary.LittleEndian.PutUint64(val, obj.expirationEpoch) - return putUniqueIndexItem(tx, namedBucketItem{ - name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)), - key: objectKey(obj.objectID, make([]byte, objectKeySize)), - val: val, - }) + return saveObjectExpirationEpoch(tx, obj) }); err != nil { return err } @@ -201,6 +188,31 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a return nil } +func saveObjectExpirationEpoch(tx *bbolt.Tx, obj objectIDToExpEpoch) error { + bkt, err := createBucketLikelyExists(tx, expEpochToObjectBucketName) + if err != nil { + return err + } + if err := putUniqueIndexItem(bucketItem{ + bucket: bkt, + key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID), + val: zeroValue, + }); err != nil { + return err + } + bkt, err = createBucketLikelyExists(tx, objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize))) + if err != nil { + return err + } + val := make([]byte, epochSize) + binary.LittleEndian.PutUint64(val, obj.expirationEpoch) + return putUniqueIndexItem(bucketItem{ + bucket: bkt, + key: objectKey(obj.objectID, make([]byte, objectKeySize)), + val: val, + }) +} + func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error { defer close(objects) diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 80851f1c4..6c94624be 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -277,24 +277,22 @@ func objectKey(obj oid.ID, key []byte) []byte { // if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular. // // firstIrregularObjectType(tx, cnr, obj) usage allows getting object type. -func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) objectSDK.Type { +func firstIrregularObjectType(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, objs ...[]byte) objectSDK.Type { if len(objs) == 0 { panic("empty object list in firstIrregularObjectType") } - var keys [2][1 + cidSize]byte - irregularTypeBuckets := [...]struct { - typ objectSDK.Type - name []byte + typ objectSDK.Type + bkt *bbolt.Bucket }{ - {objectSDK.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])}, - {objectSDK.TypeLock, bucketNameLockers(idCnr, keys[1][:])}, + {objectSDK.TypeTombstone, getTombstoneBucket(bc, tx, idCnr)}, + {objectSDK.TypeLock, getLockersBucket(bc, tx, idCnr)}, } for i := range objs { for j := range irregularTypeBuckets { - if inBucket(tx, irregularTypeBuckets[j].name, objs[i]) { + if inBucket(irregularTypeBuckets[j].bkt, objs[i]) { return irregularTypeBuckets[j].typ } } @@ -304,8 +302,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object } // return true if provided object is of LOCK type. -func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool { - return inBucket(tx, - bucketNameLockers(idCnr, make([]byte, bucketKeySize)), +func isLockObject(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, obj oid.ID) bool { + return inBucket(getLockersBucket(bc, tx, idCnr), objectKey(obj, make([]byte, objectKeySize))) }