Compare commits

...
Sign in to create a new pull request.

9 commits

Author SHA1 Message Date
796975e859
[#1694] metabase: Do not ignore errors on delete
Change-Id: I7aa296456b9a594c6adcb65421157a51934f2994
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:28 +03:00
5869be6c11
[#1694] metabase: Use bucket cache in IterateExpired
Change-Id: I067cd1fdb2ef0007bc5605b4d80412b053c6a46e
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:28 +03:00
fef6aa21c5
[#1694] metabase: Use Batch with bucket cache for Lock
Change-Id: Ied27886c90575a1b573a06fefb5bbf10241cf731
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:27 +03:00
fb8d37f5ff
[#1694] metabase: Use Batch with bucket cache for UpdateStorageID
Change-Id: I40ddbf974eee48798f0a6573c4caba91be0a7c8e
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:27 +03:00
4ba9d34188
[#1694] metabase: Use Batch with bucket cache for Put
Change-Id: Id18640f2aed288591a806382569149f8eac61f38
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:26 +03:00
4e63772632
[#1694] metabase: Refactor Inhume
`inhumeTxSingle` method is always called from `Batch` with bucket cache,
so no need to pass garbage and graveyard buckets explicitly.

Change-Id: Ic58b33929668408fe106a300c3c9a0086602615a
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:26 +03:00
84640ca9cf
[#1694] metabase: Use Batch with bucket cache for Inhume
Change-Id: I3cdf86c86806f44817ffa294d91477dde964e4ae
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:25 +03:00
ce2b08e5a4
[#1694] metabase: Use Batch with bucket cache for Delete
Change-Id: I4b7c81ed5235a87cef0952ca7d63f16547e52166
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:25 +03:00
afed6e05b1
[#1694] metabase: Add metabase.Batch method
Custom Batch differs from `bbolt.Batch` by bucket cache support.

Change-Id: Ibfc98377a3c1a3749904bb4d80eca9ff4991865d
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-02 15:10:24 +03:00
17 changed files with 589 additions and 278 deletions

View file

@ -0,0 +1,148 @@
// NOTE: code is partially taken from https://github.com/etcd-io/bbolt/blob/v1.3.10/db.go
/*
The MIT License (MIT)
Copyright (c) 2013 Ben Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package meta
import (
"errors"
"fmt"
"sync"
"time"
"go.etcd.io/bbolt"
)
type fn func(*bbolt.Tx, *bucketCache) error
type call struct {
fn fn
err chan<- error
}
type batch struct {
db *DB
timer *time.Timer
start sync.Once
calls []call
}
func (b *batch) trigger() {
b.start.Do(b.run)
}
func (b *batch) run() {
b.db.batchMtx.Lock()
b.timer.Stop()
// Make sure no new work is added to this batch, but don't break
// other batches.
if b.db.batch == b {
b.db.batch = nil
}
b.db.batchMtx.Unlock()
bc := newBucketCache()
retry:
for len(b.calls) > 0 {
failIdx := -1
err := b.db.boltDB.Update(func(tx *bbolt.Tx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx, bc); err != nil {
failIdx = i
return err
}
}
return nil
})
if failIdx >= 0 {
// take the failing transaction out of the batch. it's
// safe to shorten b.calls here because db.batch no longer
// points to us, and we hold the mutex anyway.
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// tell the submitter re-run it solo, continue with the rest of the batch
c.err <- errTrySolo
continue retry
}
// pass success, or bolt internal errors, to all callers
for _, c := range b.calls {
c.err <- err
}
break retry
}
}
func safelyCall(fn func(*bbolt.Tx, *bucketCache) error, tx *bbolt.Tx, bc *bucketCache) (err error) {
defer func() {
if p := recover(); p != nil {
err = panicked{p}
}
}()
return fn(tx, bc)
}
func (db *DB) Batch(fn fn) error {
errCh := make(chan error, 1)
db.batchMtx.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.boltBatchSize) {
// There is no existing batch, or the existing batch is full; start a new one.
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.boltBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
if len(db.batch.calls) >= db.boltBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()
}
db.batchMtx.Unlock()
err := <-errCh
if err == errTrySolo {
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
return fn(tx, nil)
})
}
return err
}
// errTrySolo is a special sentinel error value used for signaling that a
// transaction function should be re-run. It should never be seen by
// callers.
var errTrySolo = errors.New("batch function returned an error and should be re-run solo")
type panicked struct {
reason interface{}
}
func (p panicked) Error() string {
if err, ok := p.reason.(error); ok {
return err.Error()
}
return fmt.Sprintf("panic: %v", p.reason)
}

View file

@ -9,8 +9,20 @@ type bucketCache struct {
locked *bbolt.Bucket
graveyard *bbolt.Bucket
garbage *bbolt.Bucket
expEpoch *bbolt.Bucket
contVol *bbolt.Bucket
contCount *bbolt.Bucket
shardInfo *bbolt.Bucket
expired map[cid.ID]*bbolt.Bucket
primary map[cid.ID]*bbolt.Bucket
parent map[cid.ID]*bbolt.Bucket
tombstone map[cid.ID]*bbolt.Bucket
lockers map[cid.ID]*bbolt.Bucket
small map[cid.ID]*bbolt.Bucket
root map[cid.ID]*bbolt.Bucket
expObj map[cid.ID]*bbolt.Bucket
split map[cid.ID]*bbolt.Bucket
ecInfo map[cid.ID]*bbolt.Bucket
}
func newBucketCache() *bucketCache {
@ -38,6 +50,34 @@ func getGarbageBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
return getBucket(&bc.garbage, tx, garbageBucketName)
}
func getExpEpochToObjectBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
if bc == nil {
return tx.Bucket(expEpochToObjectBucketName)
}
return getBucket(&bc.expEpoch, tx, expEpochToObjectBucketName)
}
func getContainerVolumeBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
if bc == nil {
return tx.Bucket(containerVolumeBucketName)
}
return getBucket(&bc.contVol, tx, containerVolumeBucketName)
}
func getShardInfoBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
if bc == nil {
return tx.Bucket(shardInfoBucket)
}
return getBucket(&bc.shardInfo, tx, shardInfoBucket)
}
func getContainerCounterBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
if bc == nil {
return tx.Bucket(containerCounterBucketName)
}
return getBucket(&bc.contCount, tx, containerCounterBucketName)
}
func getBucket(cache **bbolt.Bucket, tx *bbolt.Tx, name []byte) *bbolt.Bucket {
if *cache != nil {
return *cache
@ -65,6 +105,78 @@ func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr)
}
func getParentBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = parentBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.parent, tx, parentBucketName, cnr)
}
func getTombstoneBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = tombstoneBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.tombstone, tx, tombstoneBucketName, cnr)
}
func getLockersBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = bucketNameLockers(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.lockers, tx, bucketNameLockers, cnr)
}
func getSmallBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = smallBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.small, tx, smallBucketName, cnr)
}
func getRootBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = rootBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.root, tx, rootBucketName, cnr)
}
func getObjToExpEpochBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = objectToExpirationEpochBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.expObj, tx, objectToExpirationEpochBucketName, cnr)
}
func getSplitBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = splitBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.split, tx, splitBucketName, cnr)
}
func getECInfoBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
if bc == nil {
bucketName := make([]byte, bucketKeySize)
bucketName = ecInfoBucketName(cnr, bucketName)
return tx.Bucket(bucketName)
}
return getMappedBucket(&bc.ecInfo, tx, ecInfoBucketName, cnr)
}
func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
value, ok := (*m)[cnr]
if ok {

View file

@ -100,8 +100,8 @@ func parseContainerSize(v []byte) uint64 {
return binary.LittleEndian.Uint64(v)
}
func changeContainerSize(tx *bbolt.Tx, id cid.ID, delta uint64, increase bool) error {
containerVolume := tx.Bucket(containerVolumeBucketName)
func changeContainerSize(tx *bbolt.Tx, bc *bucketCache, id cid.ID, delta uint64, increase bool) error {
containerVolume := getContainerVolumeBucket(bc, tx)
key := make([]byte, cidSize)
id.Encode(key)

View file

@ -231,10 +231,10 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
return result, metaerr.Wrap(err)
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
b := tx.Bucket(shardInfoBucket)
func (db *DB) incCounters(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error {
b := getShardInfoBucket(bc, tx)
if b == nil {
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject)
}
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
@ -248,7 +248,7 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
return fmt.Errorf("increase user object counter: %w", err)
}
}
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject)
}
func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
@ -338,8 +338,8 @@ func nextValue(existed, delta uint64, inc bool) uint64 {
return existed
}
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
b := tx.Bucket(containerCounterBucketName)
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error {
b := getContainerCounterBucket(bc, tx)
if b == nil {
return nil
}

View file

@ -44,6 +44,9 @@ type DB struct {
boltDB *bbolt.DB
initialized bool
batchMtx sync.Mutex
batch *batch
}
// Option is an option of DB constructor.

View file

@ -110,8 +110,8 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
var err error
var res DeleteRes
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
res, err = db.deleteGroup(tx, prm.addrs)
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
res, err = db.deleteGroup(tx, bc, prm.addrs)
return err
})
if err == nil {
@ -127,7 +127,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
// deleteGroup deletes object from the metabase. Handles removal of the
// references of the split objects.
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
func (db *DB) deleteGroup(tx *bbolt.Tx, bc *bucketCache, addrs []oid.Address) (DeleteRes, error) {
res := DeleteRes{
removedByCnrID: make(map[cid.ID]ObjectCounters),
}
@ -135,7 +135,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
currEpoch := db.epochState.CurrentEpoch()
for i := range addrs {
r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
r, err := db.delete(tx, bc, addrs[i], refCounter, currEpoch)
if err != nil {
return DeleteRes{}, err
}
@ -149,7 +149,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
for _, refNum := range refCounter {
if refNum.cur == refNum.all {
err := db.deleteObject(tx, refNum.obj, true)
err := db.deleteObject(tx, bc, refNum.obj, true)
if err != nil {
return DeleteRes{}, err
}
@ -243,16 +243,16 @@ type deleteSingleResult struct {
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter). The third return value The fourth return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
func (db *DB) delete(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key)
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
garbageBKT := getGarbageBucket(bc, tx)
graveyardBKT := getGraveyardBucket(bc, tx)
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
// unmarshal object, work only with physically stored (raw == true) objects
obj, err := db.get(tx, addr, key, false, true, currEpoch)
obj, err := db.getWithCache(bc, tx, addr, key, false, true, currEpoch)
if err != nil {
if client.IsErrObjectNotFound(err) {
addrKey = addressKey(addr, key)
@ -293,7 +293,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
nRef, ok := refCounter[k]
if !ok {
nRef = &referenceNumber{
all: parentLength(tx, parAddr),
all: parentLength(tx, bc, parAddr),
obj: parent,
}
@ -306,12 +306,12 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
isUserObject := IsUserObject(obj)
// remove object
err = db.deleteObject(tx, obj, false)
err = db.deleteObject(tx, bc, obj, false)
if err != nil {
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
}
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
if err := deleteECRelatedInfo(tx, bc, obj, addr.Container(), refCounter); err != nil {
return deleteSingleResult{}, err
}
@ -325,15 +325,16 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
func (db *DB) deleteObject(
tx *bbolt.Tx,
bc *bucketCache,
obj *objectSDK.Object,
isParent bool,
) error {
err := delUniqueIndexes(tx, obj, isParent)
err := delUniqueIndexes(tx, bc, obj, isParent)
if err != nil {
return errFailedToRemoveUniqueIndexes
}
err = updateListIndexes(tx, obj, delListIndexItem)
err = updateListIndexes(tx, bc, obj, delListIndexItem, false)
if err != nil {
return fmt.Errorf("remove list indexes: %w", err)
}
@ -345,7 +346,7 @@ func (db *DB) deleteObject(
if isParent {
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
garbageBKT := tx.Bucket(garbageBucketName)
garbageBKT := getGarbageBucket(bc, tx)
if garbageBKT != nil {
key := make([]byte, addressKeySize)
addrKey := addressKey(object.AddressOf(obj), key)
@ -360,10 +361,10 @@ func (db *DB) deleteObject(
}
// parentLength returns amount of available children from parentid index.
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
func parentLength(tx *bbolt.Tx, bc *bucketCache, addr oid.Address) int {
bucketName := make([]byte, bucketKeySize)
bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
bkt := getParentBucket(bc, tx, addr.Container())
if bkt == nil {
return 0
}
@ -376,15 +377,16 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int {
return len(lst)
}
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
bkt := tx.Bucket(item.name)
func delUniqueIndexItem(item bucketItem) error {
bkt := item.bucket
if bkt != nil {
_ = bkt.Delete(item.key) // ignore error, best effort there
return bkt.Delete(item.key)
}
return nil
}
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt := tx.Bucket(item.name)
func delListIndexItem(item bucketItem) error {
bkt := item.bucket
if bkt == nil {
return nil
}
@ -405,19 +407,16 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
// if list empty, remove the key from <list> bucket
if len(lst) == 0 {
_ = bkt.Delete(item.key) // ignore error, best effort there
return nil
return bkt.Delete(item.key)
}
// if list is not empty, then update it
encodedLst, err := encodeList(lst)
if err != nil {
return nil // ignore error, best effort there
return err
}
_ = bkt.Put(item.key, encodedLst) // ignore error, best effort there
return nil
return bkt.Put(item.key, encodedLst)
}
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
@ -460,67 +459,79 @@ func hasAnyItem(b *bbolt.Bucket) bool {
return hasAnyItem
}
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
func delUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, isParent bool) error {
addr := object.AddressOf(obj)
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
cnr := addr.Container()
bucketName := make([]byte, bucketKeySize)
var bkt *bbolt.Bucket
// add value to primary unique bucket
if !isParent {
switch obj.Type() {
case objectSDK.TypeRegular:
bucketName = primaryBucketName(cnr, bucketName)
bkt = getPrimaryBucket(bc, tx, cnr)
case objectSDK.TypeTombstone:
bucketName = tombstoneBucketName(cnr, bucketName)
bkt = getTombstoneBucket(bc, tx, cnr)
case objectSDK.TypeLock:
bucketName = bucketNameLockers(cnr, bucketName)
bkt = getLockersBucket(bc, tx, cnr)
default:
return ErrUnknownObjectType
}
delUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
})
if err := delUniqueIndexItem(bucketItem{
bucket: bkt,
key: objKey,
}); err != nil {
return err
}
} else {
delUniqueIndexItem(tx, namedBucketItem{
name: parentBucketName(cnr, bucketName),
key: objKey,
})
if err := delUniqueIndexItem(bucketItem{
bucket: getParentBucket(bc, tx, cnr),
key: objKey,
}); err != nil {
return err
}
}
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
name: smallBucketName(cnr, bucketName),
key: objKey,
})
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
name: rootBucketName(cnr, bucketName),
key: objKey,
})
if err := delUniqueIndexItem(bucketItem{ // remove from storage id index
bucket: getSmallBucket(bc, tx, cnr),
key: objKey,
}); err != nil {
return err
}
if err := delUniqueIndexItem(bucketItem{ // remove from root index
bucket: getRootBucket(bc, tx, cnr),
key: objKey,
}); err != nil {
return err
}
if expEpoch, ok := hasExpirationEpoch(obj); ok {
delUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
})
delUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
key: objKey,
})
if err := delUniqueIndexItem(bucketItem{
bucket: getExpEpochToObjectBucket(bc, tx),
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
}); err != nil {
return err
}
if err := delUniqueIndexItem(bucketItem{
bucket: getObjToExpEpochBucket(bc, tx, cnr),
key: objKey,
}); err != nil {
return err
}
}
return nil
}
func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
func deleteECRelatedInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
ech := obj.ECHeader()
if ech == nil {
return nil
}
hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
garbageBKT := getGarbageBucket(bc, tx)
hasAnyChunks := hasAnyECChunks(tx, bc, ech, cnr)
// drop EC parent GC mark if current EC chunk is the last one
if !hasAnyChunks && garbageBKT != nil {
var ecParentAddress oid.Address
@ -535,10 +546,12 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
// also drop EC parent root info if current EC chunk is the last one
if !hasAnyChunks {
delUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
})
if err := delUniqueIndexItem(bucketItem{
bucket: getRootBucket(bc, tx, cnr),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
}); err != nil {
return err
}
}
if ech.ParentSplitParentID() == nil {
@ -557,7 +570,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
return nil
}
if parentLength(tx, splitParentAddress) > 0 {
if parentLength(tx, bc, splitParentAddress) > 0 {
// linking object still exists, so leave split info and gc mark deletion for linking object processing
return nil
}
@ -572,15 +585,14 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
}
// drop split info
delUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
return delUniqueIndexItem(bucketItem{
bucket: getRootBucket(bc, tx, cnr),
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
})
return nil
}
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
func hasAnyECChunks(tx *bbolt.Tx, bc *bucketCache, ech *objectSDK.ECHeader, cnr cid.ID) bool {
data := getFromBucket(getECInfoBucket(bc, tx, cnr),
objectKey(ech.Parent(), make([]byte, objectKeySize)))
return len(data) > 0
}

View file

@ -81,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
res.exists, res.locked, err = db.exists(tx, newBucketCache(), prm.addr, prm.ecParentAddr, currEpoch)
return err
})
@ -89,10 +89,10 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
return res, metaerr.Wrap(err)
}
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
func (db *DB) exists(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
var locked bool
if !ecParent.Equals(oid.Address{}) {
st, err := objectStatus(tx, ecParent, currEpoch)
st, err := objectStatusWithCache(bc, tx, ecParent, currEpoch)
if err != nil {
return false, false, err
}
@ -103,10 +103,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
return false, locked, ErrObjectIsExpired
}
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
locked = objectLockedWithCache(bc, tx, ecParent.Container(), ecParent.Object())
}
// check graveyard and object expiration first
st, err := objectStatus(tx, addr, currEpoch)
st, err := objectStatusWithCache(bc, tx, addr, currEpoch)
if err != nil {
return false, false, err
}
@ -122,16 +122,15 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
cnr := addr.Container()
key := make([]byte, bucketKeySize)
// if graveyard is empty, then check if object exists in primary bucket
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
if inBucket(getPrimaryBucket(bc, tx, cnr), objKey) {
return true, locked, nil
}
// if primary bucket is empty, then check if object exists in parent bucket
if inBucket(tx, parentBucketName(cnr, key), objKey) {
splitInfo, err := getSplitInfo(tx, cnr, objKey)
if inBucket(getParentBucket(bc, tx, cnr), objKey) {
splitInfo, err := getSplitInfo(tx, bc, cnr, objKey)
if err != nil {
return false, locked, err
}
@ -139,12 +138,12 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}
// if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, locked, getECInfoError(tx, cnr, data)
if data := getFromBucket(getECInfoBucket(bc, tx, cnr), objKey); len(data) != 0 {
return false, locked, getECInfoError(tx, bc, cnr, data)
}
// if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
return firstIrregularObjectType(tx, bc, cnr, objKey) != objectSDK.TypeRegular, locked, nil
}
// objectStatus returns:
@ -152,10 +151,6 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
// - 1 if object with GC mark;
// - 2 if object is covered with tombstone;
// - 3 if object is expired.
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
return objectStatusWithCache(nil, tx, addr, currEpoch)
}
func objectStatusWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
// locked object could not be removed/marked with GC/expired
if objectLockedWithCache(bc, tx, addr.Container(), addr.Object()) {
@ -207,8 +202,7 @@ func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uin
}
// inBucket checks if key <key> is present in bucket <name>.
func inBucket(tx *bbolt.Tx, name, key []byte) bool {
bkt := tx.Bucket(name)
func inBucket(bkt *bbolt.Bucket, key []byte) bool {
if bkt == nil {
return false
}
@ -221,9 +215,8 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool {
// getSplitInfo returns SplitInfo structure from root index. Returns error
// if there is no `key` record in root index.
func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
bucketName := rootBucketName(cnr, make([]byte, bucketKeySize))
rawSplitInfo := getFromBucket(tx, bucketName, key)
func getSplitInfo(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
rawSplitInfo := getFromBucket(getRootBucket(bc, tx, cnr), key)
if len(rawSplitInfo) == 0 {
return nil, ErrLackSplitInfo
}

View file

@ -110,7 +110,6 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
key = objectKey(addr.Object(), key)
cnr := addr.Container()
obj := objectSDK.New()
bucketName := make([]byte, bucketKeySize)
// check in primary index
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
@ -119,43 +118,40 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
}
}
data := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
data := getFromBucket(getECInfoBucket(bc, tx, cnr), key)
if len(data) != 0 {
return nil, getECInfoError(tx, cnr, data)
return nil, getECInfoError(tx, bc, cnr, data)
}
// if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
data = getFromBucket(getTombstoneBucket(bc, tx, cnr), key)
if len(data) != 0 {
return obj, obj.Unmarshal(data)
}
// if not found then check in locker index
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
data = getFromBucket(getLockersBucket(bc, tx, cnr), key)
if len(data) != 0 {
return obj, obj.Unmarshal(data)
}
// if not found then check if object is a virtual
return getVirtualObject(tx, cnr, key, raw)
return getVirtualObject(tx, bc, cnr, key, raw)
}
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
bkt := tx.Bucket(name)
func getFromBucket(bkt *bbolt.Bucket, key []byte) []byte {
if bkt == nil {
return nil
}
return bkt.Get(key)
}
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
func getVirtualObject(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
if raw {
return nil, getSplitInfoError(tx, cnr, key)
return nil, getSplitInfoError(tx, bc, cnr, key)
}
bucketName := make([]byte, bucketKeySize)
parentBucket := tx.Bucket(parentBucketName(cnr, bucketName))
parentBucket := getParentBucket(bc, tx, cnr)
if parentBucket == nil {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
@ -172,17 +168,17 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
var data []byte
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
virtualOID := relativeLst[len(relativeLst)-i-1]
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
data = getFromBucket(getPrimaryBucket(bc, tx, cnr), virtualOID)
}
if len(data) == 0 {
// check if any of the relatives is an EC object
for _, relative := range relativeLst {
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
data = getFromBucket(getECInfoBucket(bc, tx, cnr), relative)
if len(data) > 0 {
// we can't return object headers, but can return error,
// so assembler can try to assemble complex object
return nil, getSplitInfoError(tx, cnr, key)
return nil, getSplitInfoError(tx, bc, cnr, key)
}
}
}
@ -203,8 +199,8 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return par, nil
}
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
splitInfo, err := getSplitInfo(tx, cnr, key)
func getSplitInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) error {
splitInfo, err := getSplitInfo(tx, bc, cnr, key)
if err == nil {
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}
@ -212,7 +208,7 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
func getECInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, data []byte) error {
keys, err := decodeList(data)
if err != nil {
return err
@ -220,7 +216,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
ecInfo := objectSDK.NewECInfo()
for _, key := range keys {
// check in primary index
objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
objData := getFromBucket(getPrimaryBucket(bc, tx, cnr), key)
if len(objData) != 0 {
obj := objectSDK.New()
if err := obj.Unmarshal(objData); err != nil {

View file

@ -287,19 +287,19 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh
var res InhumeRes
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
garbageBKT := getGarbageBucket(bc, tx)
graveyardBKT := getGraveyardBucket(bc, tx)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
targetBucket, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
}
for i := range tss {
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
if err := db.inhumeTxSingle(tx, bc, targetBucket, value, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
return err
}
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {

View file

@ -199,8 +199,8 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
}
currEpoch := db.epochState.CurrentEpoch()
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res)
err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
return db.inhumeTx(tx, bc, currEpoch, prm, &res)
})
success = err == nil
if success {
@ -213,18 +213,15 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return res, metaerr.Wrap(err)
}
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
func (db *DB) inhumeTx(tx *bbolt.Tx, bc *bucketCache, epoch uint64, prm InhumePrm, res *InhumeRes) error {
bkt, value, err := db.getInhumeTargetBucketAndValue(getGarbageBucket(bc, tx), getGraveyardBucket(bc, tx), prm)
if err != nil {
return err
}
buf := make([]byte, addressKeySize)
for i := range prm.target {
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
if err := db.inhumeTxSingle(tx, bc, bkt, value, prm.target[i], buf, epoch, prm, res); err != nil {
return err
}
}
@ -232,13 +229,12 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
func (db *DB) inhumeTxSingle(tx *bbolt.Tx, bc *bucketCache, targetBucket *bbolt.Bucket, value []byte, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
id := addr.Object()
cnr := addr.Container()
tx := bkt.Tx()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
if !prm.forceRemoval && objectLockedWithCache(bc, tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
@ -248,23 +244,23 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
if isLockObject(tx, bc, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, addr, buf, false, true, epoch)
obj, err := db.getWithCache(bc, tx, addr, buf, false, true, epoch)
targetKey := addressKey(addr, buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
err = db.updateDeleteInfo(tx, bc, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
err = db.inhumeECInfo(tx, bc, epoch, prm.tomb, res, ecErr.ECInfo(), cnr, targetBucket, value)
if err != nil {
return err
}
@ -272,7 +268,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
isTomb, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), targetKey)
if err != nil {
return err
}
@ -283,7 +279,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
err = targetBucket.Put(targetKey, value)
if err != nil {
return err
}
@ -297,15 +293,14 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
return nil
}
if isLockObject(tx, cnr, id) {
if isLockObject(tx, bc, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, addr)
}
}
return nil
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
func (db *DB) inhumeECInfo(tx *bbolt.Tx, bc *bucketCache, epoch uint64, tomb *oid.Address, res *InhumeRes,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
) error {
for _, chunk := range ecInfo.Chunks {
@ -318,17 +313,17 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
return err
}
chunkAddr.SetObject(chunkID)
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
chunkObj, err := db.getWithCache(bc, tx, chunkAddr, chunkBuf, false, true, epoch)
if err != nil {
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
err = db.updateDeleteInfo(tx, bc, chunkKey, cnr, chunkObj, res)
if err != nil {
return err
}
if tomb != nil {
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
_, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), chunkKey)
if err != nil {
return err
}
@ -398,16 +393,16 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte
return false, garbageBKT.Put(addressKey, zeroValue)
}
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, bc *bucketCache, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
if inGraveyardWithKey(targetKey, getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx)) == 0 {
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
}
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == objectSDK.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
err := changeContainerSize(tx, bc, cnr, obj.PayloadSize(), false)
if err != nil {
return err
}

View file

@ -76,7 +76,8 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
}
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
b := tx.Bucket(expEpochToObjectBucketName)
bc := newBucketCache()
b := getExpEpochToObjectBucket(bc, tx)
c := b.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
@ -87,7 +88,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
if expiresAfter >= epoch {
return nil
}
if objectLocked(tx, cnr, obj) {
if objectLockedWithCache(bc, tx, cnr, obj) {
continue
}
var addr oid.Address
@ -95,7 +96,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
addr.SetObject(obj)
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
err = h(&ExpiredObject{
typ: firstIrregularObjectType(tx, cnr, objKey),
typ: firstIrregularObjectType(tx, bc, cnr, objKey),
addr: addr,
})
if err == nil {

View file

@ -79,12 +79,12 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
}
key := make([]byte, cidSize)
return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error {
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
return metaerr.Wrap(db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
if firstIrregularObjectType(tx, bc, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
}
bucketLocked := tx.Bucket(bucketNameLocked)
bucketLocked := getLockedBucket(bc, tx)
cnr.Encode(key)
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
@ -144,9 +144,9 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
var unlockedObjects []oid.Address
if err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
if err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
for i := range lockers {
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
unlocked, err := freePotentialLocks(tx, bc, lockers[i].Container(), lockers[i].Object())
if err != nil {
return err
}
@ -211,9 +211,9 @@ func getLocks(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
// Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution.
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
func freePotentialLocks(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
var unlockedObjects []oid.Address
bucketLocked := tx.Bucket(bucketNameLocked)
bucketLocked := getLockedBucket(bc, tx)
if bucketLocked == nil {
return unlockedObjects, nil
}

View file

@ -9,6 +9,7 @@ import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
@ -28,6 +29,11 @@ type (
namedBucketItem struct {
name, key, val []byte
}
bucketItem struct {
bucket *bbolt.Bucket
key, val []byte
}
)
// PutPrm groups the parameters of Put operation.
@ -63,6 +69,8 @@ var (
ErrIncorrectRootObject = errors.New("invalid root object")
)
const bucketNilAsserMsg = "bucket to put data is nil"
// Put saves object header in metabase. Object payload expected to be cut.
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
@ -93,9 +101,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
var e error
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
res, e = db.put(tx, bc, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
return e
})
if err == nil {
@ -109,6 +117,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
}
func (db *DB) put(tx *bbolt.Tx,
bc *bucketCache,
obj *objectSDK.Object,
id []byte,
si *objectSDK.SplitInfo,
@ -128,7 +137,7 @@ func (db *DB) put(tx *bbolt.Tx,
isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
exists, _, err := db.exists(tx, bc, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) {
@ -138,51 +147,51 @@ func (db *DB) put(tx *bbolt.Tx,
}
if exists {
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
return PutRes{}, db.updateObj(tx, bc, obj, id, si, isParent)
}
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
return PutRes{Inserted: true}, db.insertObject(tx, bc, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
}
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
func (db *DB) updateObj(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
// most right child and split header overlap parent so we have to
// check if object exists to not overwrite it twice
// When storage engine moves objects between different sub-storages,
// it calls metabase.Put method with new storage ID, thus triggering this code.
if !isParent && id != nil {
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
return setStorageID(tx, bc, objectCore.AddressOf(obj), id, true)
}
// when storage already has last object in split hierarchy and there is
// a linking object to put (or vice versa), we should update split info
// with object ids of these objects
if isParent {
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
return updateSplitInfo(tx, bc, objectCore.AddressOf(obj), si)
}
return nil
}
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
func (db *DB) insertObject(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
parentSI, err := splitInfoFromObject(obj)
if err != nil {
return err
}
_, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes)
_, err = db.put(tx, bc, par, id, parentSI, currEpoch, indexAttributes)
if err != nil {
return err
}
}
err := putUniqueIndexes(tx, obj, si, id)
err := putUniqueIndexes(tx, bc, obj, si, id)
if err != nil {
return fmt.Errorf("put unique indexes: %w", err)
}
err = updateListIndexes(tx, obj, putListIndexItem)
err = updateListIndexes(tx, bc, obj, putListIndexItem, true)
if err != nil {
return fmt.Errorf("put list indexes: %w", err)
}
@ -196,14 +205,14 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
// update container volume size estimation
if obj.Type() == objectSDK.TypeRegular && !isParent {
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
err = changeContainerSize(tx, bc, cnr, obj.PayloadSize(), true)
if err != nil {
return err
}
}
if !isParent {
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
if err = db.incCounters(tx, bc, cnr, IsUserObject(obj)); err != nil {
return err
}
}
@ -211,69 +220,78 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
return nil
}
func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
func putUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
isParent := si != nil
addr := objectCore.AddressOf(obj)
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
if !isParent {
err := putRawObjectData(tx, obj, bucketName, addr, objKey)
err := putRawObjectData(tx, bc, obj, bucketName, addr, objKey)
if err != nil {
return err
}
if id != nil {
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
if err = setStorageID(tx, bc, objectCore.AddressOf(obj), id, false); err != nil {
return err
}
}
}
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
if err := putExpirationEpoch(tx, bc, obj, addr, objKey); err != nil {
return err
}
return putSplitInfo(tx, obj, bucketName, addr, si, objKey)
return putSplitInfo(tx, bc, obj, bucketName, addr, si, objKey)
}
func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
func putRawObjectData(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
var bkt *bbolt.Bucket
var err error
switch obj.Type() {
case objectSDK.TypeRegular:
bucketName = primaryBucketName(addr.Container(), bucketName)
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getPrimaryBucket, primaryBucketName(addr.Container(), bucketName), true)
case objectSDK.TypeTombstone:
bucketName = tombstoneBucketName(addr.Container(), bucketName)
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getTombstoneBucket, tombstoneBucketName(addr.Container(), bucketName), true)
case objectSDK.TypeLock:
bucketName = bucketNameLockers(addr.Container(), bucketName)
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getLockersBucket, bucketNameLockers(addr.Container(), bucketName), true)
default:
return ErrUnknownObjectType
}
if err != nil {
return err
}
rawObject, err := obj.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("marshal object header: %w", err)
}
return putUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
val: rawObject,
return putUniqueIndexItem(bucketItem{
bucket: bkt,
key: objKey,
val: rawObject,
})
}
func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
func putExpirationEpoch(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
if expEpoch, ok := hasExpirationEpoch(obj); ok {
err := putUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
val: zeroValue,
err := putUniqueIndexItem(bucketItem{
bucket: getExpEpochToObjectBucket(bc, tx),
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
val: zeroValue,
})
if err != nil {
return err
}
bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getObjToExpEpochBucket, objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)), true)
if err != nil {
return err
}
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, expEpoch)
err = putUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)),
key: objKey,
val: val,
err = putUniqueIndexItem(bucketItem{
bucket: bkt,
key: objKey,
val: val,
})
if err != nil {
return err
@ -282,7 +300,7 @@ func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, o
return nil
}
func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
func putSplitInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
if ecHead := obj.ECHeader(); ecHead != nil {
parentID := ecHead.Parent()
@ -300,15 +318,19 @@ func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr o
}
objKey = objectKey(parentID, objKey)
}
return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si)
return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), bucketName, si)
}
return nil
}
func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
return updateUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, bucketName),
key: objKey,
func updateSplitInfoIndex(tx *bbolt.Tx, bc *bucketCache, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
bkt, err := getOrCreateBucket(tx, bc, cnr, getRootBucket, rootBucketName(cnr, bucketName), true)
if err != nil {
return err
}
return updateUniqueIndexItem(bucketItem{
bucket: bkt,
key: objKey,
}, func(old, _ []byte) ([]byte, error) {
switch {
case si == nil && old == nil:
@ -328,78 +350,100 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []
})
}
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
type updateIndexItemFunc = func(item bucketItem) error
func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
func updateListIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, f updateIndexItemFunc, createBuckets bool) error {
idObj, _ := obj.ID()
cnr, _ := obj.ContainerID()
objKey := objectKey(idObj, make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
idParent, ok := obj.ParentID()
// index parent ids
if ok {
err := f(tx, namedBucketItem{
name: parentBucketName(cnr, bucketName),
key: objectKey(idParent, make([]byte, objectKeySize)),
val: objKey,
})
bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets)
if err != nil {
return err
}
if err := f(bucketItem{
bucket: bkt,
key: objectKey(idParent, make([]byte, objectKeySize)),
val: objKey,
}); err != nil {
return err
}
}
// index split ids
if obj.SplitID() != nil {
err := f(tx, namedBucketItem{
name: splitBucketName(cnr, bucketName),
key: obj.SplitID().ToV2(),
val: objKey,
})
bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets)
if err != nil {
return err
}
if err := f(bucketItem{
bucket: bkt,
key: obj.SplitID().ToV2(),
val: objKey,
}); err != nil {
return err
}
}
if ech := obj.ECHeader(); ech != nil {
err := f(tx, namedBucketItem{
name: ecInfoBucketName(cnr, bucketName),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
val: objKey,
})
bkt, err := getOrCreateBucket(tx, bc, cnr, getECInfoBucket, ecInfoBucketName(cnr, bucketName), createBuckets)
if err != nil {
return err
}
if err := f(bucketItem{
bucket: bkt,
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
val: objKey,
}); err != nil {
return err
}
if ech.ParentSplitID() != nil {
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
err := f(tx, namedBucketItem{
name: splitBucketName(cnr, bucketName),
key: ech.ParentSplitID().ToV2(),
val: objKey,
})
bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets)
if err != nil {
return err
}
if err := f(bucketItem{
bucket: bkt,
key: ech.ParentSplitID().ToV2(),
val: objectKey(ech.Parent(), make([]byte, objectKeySize)),
}); err != nil {
return err
}
}
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
err := f(tx, namedBucketItem{
name: parentBucketName(cnr, bucketName),
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
val: objKey,
})
bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets)
if err != nil {
return err
}
if err := f(bucketItem{
bucket: bkt,
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
val: objectKey(ech.Parent(), make([]byte, objectKeySize)),
}); err != nil {
return err
}
}
}
return nil
}
func getOrCreateBucket(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, getter func(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket,
bucketName []byte, create bool,
) (*bbolt.Bucket, error) {
bkt := getter(bc, tx, cnr)
if bkt == nil && create {
return createBucketLikelyExists(tx, bucketName)
}
return bkt, nil
}
var indexedAttributes = map[string]struct{}{
"S3-Access-Box-CRDT-Name": {},
objectSDK.AttributeFilePath: {},
@ -411,7 +455,9 @@ func IsAtrributeIndexed(attr string) bool {
return found
}
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
type updateFKBTIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateFKBTIndexItemFunc) error {
id, _ := obj.ID()
cnr, _ := obj.ContainerID()
objKey := objectKey(id, make([]byte, objectKeySize))
@ -471,11 +517,9 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
return tx.CreateBucket(name)
}
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {
return fmt.Errorf("create index %v: %w", item.name, err)
}
func updateUniqueIndexItem(item bucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
bkt := item.bucket
assert.True(bkt != nil, bucketNilAsserMsg)
data, err := update(bkt.Get(item.key), item.val)
if err != nil {
@ -484,8 +528,8 @@ func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldDa
return bkt.Put(item.key, data)
}
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
func putUniqueIndexItem(item bucketItem) error {
return updateUniqueIndexItem(item, func(_, val []byte) ([]byte, error) { return val, nil })
}
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
@ -502,11 +546,9 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
return fkbtRoot.Put(item.val, zeroValue)
}
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {
return fmt.Errorf("create index %v: %w", item.name, err)
}
func putListIndexItem(item bucketItem) error {
bkt := item.bucket
assert.True(bkt != nil, bucketNilAsserMsg)
lst, err := decodeList(bkt.Get(item.key))
if err != nil {
@ -595,9 +637,9 @@ func getVarUint(data []byte) (uint64, int, error) {
// setStorageID for existing objects if they were moved from one
// storage location to another.
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
func setStorageID(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, id []byte, override bool) error {
key := make([]byte, bucketKeySize)
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getSmallBucket, smallBucketName(addr.Container(), key), true)
if err != nil {
return err
}
@ -610,9 +652,9 @@ func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) erro
// updateSpliInfo for existing objects if storage filled with extra information
// about last object in split hierarchy or linking object.
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
func updateSplitInfo(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, from *objectSDK.SplitInfo) error {
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), make([]byte, bucketKeySize), from)
}
// splitInfoFromObject returns split info based on last or linkin object.

View file

@ -413,7 +413,7 @@ func (db *DB) selectObjectID(
addr.SetObject(id)
var splitInfoError *objectSDK.SplitInfoError
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
ok, _, err := db.exists(tx, nil, addr, oid.Address{}, currEpoch)
if (err == nil && ok) || errors.As(err, &splitInfoError) {
raw := make([]byte, objectKeySize)
id.Encode(raw)

View file

@ -126,8 +126,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
return res, ErrReadOnlyMode
}
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return setStorageID(tx, prm.addr, prm.id, true)
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
return setStorageID(tx, bc, prm.addr, prm.id, true)
})
success = err == nil
return res, metaerr.Wrap(err)

View file

@ -168,20 +168,7 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a
return nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
if err := putUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
val: zeroValue,
}); err != nil {
return err
}
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
return putUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
val: val,
})
return saveObjectExpirationEpoch(tx, obj)
}); err != nil {
return err
}
@ -201,6 +188,31 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a
return nil
}
func saveObjectExpirationEpoch(tx *bbolt.Tx, obj objectIDToExpEpoch) error {
bkt, err := createBucketLikelyExists(tx, expEpochToObjectBucketName)
if err != nil {
return err
}
if err := putUniqueIndexItem(bucketItem{
bucket: bkt,
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
val: zeroValue,
}); err != nil {
return err
}
bkt, err = createBucketLikelyExists(tx, objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)))
if err != nil {
return err
}
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
return putUniqueIndexItem(bucketItem{
bucket: bkt,
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
val: val,
})
}
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
defer close(objects)

View file

@ -277,24 +277,22 @@ func objectKey(obj oid.ID, key []byte) []byte {
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
//
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
func firstIrregularObjectType(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
if len(objs) == 0 {
panic("empty object list in firstIrregularObjectType")
}
var keys [2][1 + cidSize]byte
irregularTypeBuckets := [...]struct {
typ objectSDK.Type
name []byte
typ objectSDK.Type
bkt *bbolt.Bucket
}{
{objectSDK.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])},
{objectSDK.TypeLock, bucketNameLockers(idCnr, keys[1][:])},
{objectSDK.TypeTombstone, getTombstoneBucket(bc, tx, idCnr)},
{objectSDK.TypeLock, getLockersBucket(bc, tx, idCnr)},
}
for i := range objs {
for j := range irregularTypeBuckets {
if inBucket(tx, irregularTypeBuckets[j].name, objs[i]) {
if inBucket(irregularTypeBuckets[j].bkt, objs[i]) {
return irregularTypeBuckets[j].typ
}
}
@ -304,8 +302,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object
}
// return true if provided object is of LOCK type.
func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
return inBucket(tx,
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
func isLockObject(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, obj oid.ID) bool {
return inBucket(getLockersBucket(bc, tx, idCnr),
objectKey(obj, make([]byte, objectKeySize)))
}