Compare commits
9 commits
master
...
feat/metab
Author | SHA1 | Date | |
---|---|---|---|
796975e859 | |||
5869be6c11 | |||
fef6aa21c5 | |||
fb8d37f5ff | |||
4ba9d34188 | |||
4e63772632 | |||
84640ca9cf | |||
ce2b08e5a4 | |||
afed6e05b1 |
17 changed files with 589 additions and 278 deletions
148
pkg/local_object_storage/metabase/batch.go
Normal file
148
pkg/local_object_storage/metabase/batch.go
Normal file
|
@ -0,0 +1,148 @@
|
|||
// NOTE: code is partially taken from https://github.com/etcd-io/bbolt/blob/v1.3.10/db.go
|
||||
|
||||
/*
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Ben Johnson
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type fn func(*bbolt.Tx, *bucketCache) error
|
||||
|
||||
type call struct {
|
||||
fn fn
|
||||
err chan<- error
|
||||
}
|
||||
|
||||
type batch struct {
|
||||
db *DB
|
||||
timer *time.Timer
|
||||
start sync.Once
|
||||
calls []call
|
||||
}
|
||||
|
||||
func (b *batch) trigger() {
|
||||
b.start.Do(b.run)
|
||||
}
|
||||
|
||||
func (b *batch) run() {
|
||||
b.db.batchMtx.Lock()
|
||||
b.timer.Stop()
|
||||
// Make sure no new work is added to this batch, but don't break
|
||||
// other batches.
|
||||
if b.db.batch == b {
|
||||
b.db.batch = nil
|
||||
}
|
||||
b.db.batchMtx.Unlock()
|
||||
|
||||
bc := newBucketCache()
|
||||
retry:
|
||||
for len(b.calls) > 0 {
|
||||
failIdx := -1
|
||||
err := b.db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
for i, c := range b.calls {
|
||||
if err := safelyCall(c.fn, tx, bc); err != nil {
|
||||
failIdx = i
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if failIdx >= 0 {
|
||||
// take the failing transaction out of the batch. it's
|
||||
// safe to shorten b.calls here because db.batch no longer
|
||||
// points to us, and we hold the mutex anyway.
|
||||
c := b.calls[failIdx]
|
||||
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
|
||||
// tell the submitter re-run it solo, continue with the rest of the batch
|
||||
c.err <- errTrySolo
|
||||
continue retry
|
||||
}
|
||||
|
||||
// pass success, or bolt internal errors, to all callers
|
||||
for _, c := range b.calls {
|
||||
c.err <- err
|
||||
}
|
||||
break retry
|
||||
}
|
||||
}
|
||||
|
||||
func safelyCall(fn func(*bbolt.Tx, *bucketCache) error, tx *bbolt.Tx, bc *bucketCache) (err error) {
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
err = panicked{p}
|
||||
}
|
||||
}()
|
||||
return fn(tx, bc)
|
||||
}
|
||||
|
||||
func (db *DB) Batch(fn fn) error {
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
db.batchMtx.Lock()
|
||||
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.boltBatchSize) {
|
||||
// There is no existing batch, or the existing batch is full; start a new one.
|
||||
db.batch = &batch{
|
||||
db: db,
|
||||
}
|
||||
db.batch.timer = time.AfterFunc(db.boltBatchDelay, db.batch.trigger)
|
||||
}
|
||||
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
|
||||
if len(db.batch.calls) >= db.boltBatchSize {
|
||||
// wake up batch, it's ready to run
|
||||
go db.batch.trigger()
|
||||
}
|
||||
db.batchMtx.Unlock()
|
||||
|
||||
err := <-errCh
|
||||
if err == errTrySolo {
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return fn(tx, nil)
|
||||
})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// errTrySolo is a special sentinel error value used for signaling that a
|
||||
// transaction function should be re-run. It should never be seen by
|
||||
// callers.
|
||||
var errTrySolo = errors.New("batch function returned an error and should be re-run solo")
|
||||
|
||||
type panicked struct {
|
||||
reason interface{}
|
||||
}
|
||||
|
||||
func (p panicked) Error() string {
|
||||
if err, ok := p.reason.(error); ok {
|
||||
return err.Error()
|
||||
}
|
||||
return fmt.Sprintf("panic: %v", p.reason)
|
||||
}
|
|
@ -9,8 +9,20 @@ type bucketCache struct {
|
|||
locked *bbolt.Bucket
|
||||
graveyard *bbolt.Bucket
|
||||
garbage *bbolt.Bucket
|
||||
expEpoch *bbolt.Bucket
|
||||
contVol *bbolt.Bucket
|
||||
contCount *bbolt.Bucket
|
||||
shardInfo *bbolt.Bucket
|
||||
expired map[cid.ID]*bbolt.Bucket
|
||||
primary map[cid.ID]*bbolt.Bucket
|
||||
parent map[cid.ID]*bbolt.Bucket
|
||||
tombstone map[cid.ID]*bbolt.Bucket
|
||||
lockers map[cid.ID]*bbolt.Bucket
|
||||
small map[cid.ID]*bbolt.Bucket
|
||||
root map[cid.ID]*bbolt.Bucket
|
||||
expObj map[cid.ID]*bbolt.Bucket
|
||||
split map[cid.ID]*bbolt.Bucket
|
||||
ecInfo map[cid.ID]*bbolt.Bucket
|
||||
}
|
||||
|
||||
func newBucketCache() *bucketCache {
|
||||
|
@ -38,6 +50,34 @@ func getGarbageBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
|||
return getBucket(&bc.garbage, tx, garbageBucketName)
|
||||
}
|
||||
|
||||
func getExpEpochToObjectBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
return tx.Bucket(expEpochToObjectBucketName)
|
||||
}
|
||||
return getBucket(&bc.expEpoch, tx, expEpochToObjectBucketName)
|
||||
}
|
||||
|
||||
func getContainerVolumeBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
return tx.Bucket(containerVolumeBucketName)
|
||||
}
|
||||
return getBucket(&bc.contVol, tx, containerVolumeBucketName)
|
||||
}
|
||||
|
||||
func getShardInfoBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
return tx.Bucket(shardInfoBucket)
|
||||
}
|
||||
return getBucket(&bc.shardInfo, tx, shardInfoBucket)
|
||||
}
|
||||
|
||||
func getContainerCounterBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
return tx.Bucket(containerCounterBucketName)
|
||||
}
|
||||
return getBucket(&bc.contCount, tx, containerCounterBucketName)
|
||||
}
|
||||
|
||||
func getBucket(cache **bbolt.Bucket, tx *bbolt.Tx, name []byte) *bbolt.Bucket {
|
||||
if *cache != nil {
|
||||
return *cache
|
||||
|
@ -65,6 +105,78 @@ func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
|||
return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr)
|
||||
}
|
||||
|
||||
func getParentBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = parentBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.parent, tx, parentBucketName, cnr)
|
||||
}
|
||||
|
||||
func getTombstoneBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = tombstoneBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.tombstone, tx, tombstoneBucketName, cnr)
|
||||
}
|
||||
|
||||
func getLockersBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = bucketNameLockers(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.lockers, tx, bucketNameLockers, cnr)
|
||||
}
|
||||
|
||||
func getSmallBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = smallBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.small, tx, smallBucketName, cnr)
|
||||
}
|
||||
|
||||
func getRootBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = rootBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.root, tx, rootBucketName, cnr)
|
||||
}
|
||||
|
||||
func getObjToExpEpochBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = objectToExpirationEpochBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.expObj, tx, objectToExpirationEpochBucketName, cnr)
|
||||
}
|
||||
|
||||
func getSplitBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = splitBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.split, tx, splitBucketName, cnr)
|
||||
}
|
||||
|
||||
func getECInfoBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||
if bc == nil {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
bucketName = ecInfoBucketName(cnr, bucketName)
|
||||
return tx.Bucket(bucketName)
|
||||
}
|
||||
return getMappedBucket(&bc.ecInfo, tx, ecInfoBucketName, cnr)
|
||||
}
|
||||
|
||||
func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
|
||||
value, ok := (*m)[cnr]
|
||||
if ok {
|
||||
|
|
|
@ -100,8 +100,8 @@ func parseContainerSize(v []byte) uint64 {
|
|||
return binary.LittleEndian.Uint64(v)
|
||||
}
|
||||
|
||||
func changeContainerSize(tx *bbolt.Tx, id cid.ID, delta uint64, increase bool) error {
|
||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
||||
func changeContainerSize(tx *bbolt.Tx, bc *bucketCache, id cid.ID, delta uint64, increase bool) error {
|
||||
containerVolume := getContainerVolumeBucket(bc, tx)
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
|
||||
|
|
|
@ -231,10 +231,10 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
|||
return result, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
func (db *DB) incCounters(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error {
|
||||
b := getShardInfoBucket(bc, tx)
|
||||
if b == nil {
|
||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||
return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject)
|
||||
}
|
||||
|
||||
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
||||
|
@ -248,7 +248,7 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
|||
return fmt.Errorf("increase user object counter: %w", err)
|
||||
}
|
||||
}
|
||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||
return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject)
|
||||
}
|
||||
|
||||
func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
|
||||
|
@ -338,8 +338,8 @@ func nextValue(existed, delta uint64, inc bool) uint64 {
|
|||
return existed
|
||||
}
|
||||
|
||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error {
|
||||
b := getContainerCounterBucket(bc, tx)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -44,6 +44,9 @@ type DB struct {
|
|||
boltDB *bbolt.DB
|
||||
|
||||
initialized bool
|
||||
|
||||
batchMtx sync.Mutex
|
||||
batch *batch
|
||||
}
|
||||
|
||||
// Option is an option of DB constructor.
|
||||
|
|
|
@ -110,8 +110,8 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
var err error
|
||||
var res DeleteRes
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
res, err = db.deleteGroup(tx, prm.addrs)
|
||||
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
res, err = db.deleteGroup(tx, bc, prm.addrs)
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
|
@ -127,7 +127,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
|
||||
// deleteGroup deletes object from the metabase. Handles removal of the
|
||||
// references of the split objects.
|
||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
||||
func (db *DB) deleteGroup(tx *bbolt.Tx, bc *bucketCache, addrs []oid.Address) (DeleteRes, error) {
|
||||
res := DeleteRes{
|
||||
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||
}
|
||||
|
@ -135,7 +135,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
|||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
for i := range addrs {
|
||||
r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||
r, err := db.delete(tx, bc, addrs[i], refCounter, currEpoch)
|
||||
if err != nil {
|
||||
return DeleteRes{}, err
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
|||
|
||||
for _, refNum := range refCounter {
|
||||
if refNum.cur == refNum.all {
|
||||
err := db.deleteObject(tx, refNum.obj, true)
|
||||
err := db.deleteObject(tx, bc, refNum.obj, true)
|
||||
if err != nil {
|
||||
return DeleteRes{}, err
|
||||
}
|
||||
|
@ -243,16 +243,16 @@ type deleteSingleResult struct {
|
|||
// non-exist object is error-free). The second return value indicates if an
|
||||
// object was available before the removal (for calculating the logical object
|
||||
// counter). The third return value The fourth return value is removed object payload size.
|
||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
|
||||
func (db *DB) delete(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
|
||||
key := make([]byte, addressKeySize)
|
||||
addrKey := addressKey(addr, key)
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
garbageBKT := getGarbageBucket(bc, tx)
|
||||
graveyardBKT := getGraveyardBucket(bc, tx)
|
||||
|
||||
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
||||
|
||||
// unmarshal object, work only with physically stored (raw == true) objects
|
||||
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
||||
obj, err := db.getWithCache(bc, tx, addr, key, false, true, currEpoch)
|
||||
if err != nil {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
addrKey = addressKey(addr, key)
|
||||
|
@ -293,7 +293,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
nRef, ok := refCounter[k]
|
||||
if !ok {
|
||||
nRef = &referenceNumber{
|
||||
all: parentLength(tx, parAddr),
|
||||
all: parentLength(tx, bc, parAddr),
|
||||
obj: parent,
|
||||
}
|
||||
|
||||
|
@ -306,12 +306,12 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
isUserObject := IsUserObject(obj)
|
||||
|
||||
// remove object
|
||||
err = db.deleteObject(tx, obj, false)
|
||||
err = db.deleteObject(tx, bc, obj, false)
|
||||
if err != nil {
|
||||
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
|
||||
}
|
||||
|
||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
||||
if err := deleteECRelatedInfo(tx, bc, obj, addr.Container(), refCounter); err != nil {
|
||||
return deleteSingleResult{}, err
|
||||
}
|
||||
|
||||
|
@ -325,15 +325,16 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
|
||||
func (db *DB) deleteObject(
|
||||
tx *bbolt.Tx,
|
||||
bc *bucketCache,
|
||||
obj *objectSDK.Object,
|
||||
isParent bool,
|
||||
) error {
|
||||
err := delUniqueIndexes(tx, obj, isParent)
|
||||
err := delUniqueIndexes(tx, bc, obj, isParent)
|
||||
if err != nil {
|
||||
return errFailedToRemoveUniqueIndexes
|
||||
}
|
||||
|
||||
err = updateListIndexes(tx, obj, delListIndexItem)
|
||||
err = updateListIndexes(tx, bc, obj, delListIndexItem, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("remove list indexes: %w", err)
|
||||
}
|
||||
|
@ -345,7 +346,7 @@ func (db *DB) deleteObject(
|
|||
|
||||
if isParent {
|
||||
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
garbageBKT := getGarbageBucket(bc, tx)
|
||||
if garbageBKT != nil {
|
||||
key := make([]byte, addressKeySize)
|
||||
addrKey := addressKey(object.AddressOf(obj), key)
|
||||
|
@ -360,10 +361,10 @@ func (db *DB) deleteObject(
|
|||
}
|
||||
|
||||
// parentLength returns amount of available children from parentid index.
|
||||
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
||||
func parentLength(tx *bbolt.Tx, bc *bucketCache, addr oid.Address) int {
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
|
||||
bkt := getParentBucket(bc, tx, addr.Container())
|
||||
if bkt == nil {
|
||||
return 0
|
||||
}
|
||||
|
@ -376,15 +377,16 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
|||
return len(lst)
|
||||
}
|
||||
|
||||
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
||||
bkt := tx.Bucket(item.name)
|
||||
func delUniqueIndexItem(item bucketItem) error {
|
||||
bkt := item.bucket
|
||||
if bkt != nil {
|
||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
||||
return bkt.Delete(item.key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt := tx.Bucket(item.name)
|
||||
func delListIndexItem(item bucketItem) error {
|
||||
bkt := item.bucket
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -405,19 +407,16 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|||
|
||||
// if list empty, remove the key from <list> bucket
|
||||
if len(lst) == 0 {
|
||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
||||
|
||||
return nil
|
||||
return bkt.Delete(item.key)
|
||||
}
|
||||
|
||||
// if list is not empty, then update it
|
||||
encodedLst, err := encodeList(lst)
|
||||
if err != nil {
|
||||
return nil // ignore error, best effort there
|
||||
return err
|
||||
}
|
||||
|
||||
_ = bkt.Put(item.key, encodedLst) // ignore error, best effort there
|
||||
return nil
|
||||
return bkt.Put(item.key, encodedLst)
|
||||
}
|
||||
|
||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
|
@ -460,67 +459,79 @@ func hasAnyItem(b *bbolt.Bucket) bool {
|
|||
return hasAnyItem
|
||||
}
|
||||
|
||||
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
||||
func delUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, isParent bool) error {
|
||||
addr := object.AddressOf(obj)
|
||||
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
cnr := addr.Container()
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
var bkt *bbolt.Bucket
|
||||
|
||||
// add value to primary unique bucket
|
||||
if !isParent {
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeRegular:
|
||||
bucketName = primaryBucketName(cnr, bucketName)
|
||||
bkt = getPrimaryBucket(bc, tx, cnr)
|
||||
case objectSDK.TypeTombstone:
|
||||
bucketName = tombstoneBucketName(cnr, bucketName)
|
||||
bkt = getTombstoneBucket(bc, tx, cnr)
|
||||
case objectSDK.TypeLock:
|
||||
bucketName = bucketNameLockers(cnr, bucketName)
|
||||
bkt = getLockersBucket(bc, tx, cnr)
|
||||
default:
|
||||
return ErrUnknownObjectType
|
||||
}
|
||||
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: bucketName,
|
||||
key: objKey,
|
||||
})
|
||||
if err := delUniqueIndexItem(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: parentBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
if err := delUniqueIndexItem(bucketItem{
|
||||
bucket: getParentBucket(bc, tx, cnr),
|
||||
key: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
||||
name: smallBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
||||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
if err := delUniqueIndexItem(bucketItem{ // remove from storage id index
|
||||
bucket: getSmallBucket(bc, tx, cnr),
|
||||
key: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := delUniqueIndexItem(bucketItem{ // remove from root index
|
||||
bucket: getRootBucket(bc, tx, cnr),
|
||||
key: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: expEpochToObjectBucketName,
|
||||
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
|
||||
})
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: objKey,
|
||||
})
|
||||
if err := delUniqueIndexItem(bucketItem{
|
||||
bucket: getExpEpochToObjectBucket(bc, tx),
|
||||
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := delUniqueIndexItem(bucketItem{
|
||||
bucket: getObjToExpEpochBucket(bc, tx, cnr),
|
||||
key: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
|
||||
func deleteECRelatedInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
|
||||
ech := obj.ECHeader()
|
||||
if ech == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
|
||||
garbageBKT := getGarbageBucket(bc, tx)
|
||||
hasAnyChunks := hasAnyECChunks(tx, bc, ech, cnr)
|
||||
// drop EC parent GC mark if current EC chunk is the last one
|
||||
if !hasAnyChunks && garbageBKT != nil {
|
||||
var ecParentAddress oid.Address
|
||||
|
@ -535,10 +546,12 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
|||
|
||||
// also drop EC parent root info if current EC chunk is the last one
|
||||
if !hasAnyChunks {
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
})
|
||||
if err := delUniqueIndexItem(bucketItem{
|
||||
bucket: getRootBucket(bc, tx, cnr),
|
||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if ech.ParentSplitParentID() == nil {
|
||||
|
@ -557,7 +570,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
|||
return nil
|
||||
}
|
||||
|
||||
if parentLength(tx, splitParentAddress) > 0 {
|
||||
if parentLength(tx, bc, splitParentAddress) > 0 {
|
||||
// linking object still exists, so leave split info and gc mark deletion for linking object processing
|
||||
return nil
|
||||
}
|
||||
|
@ -572,15 +585,14 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
|||
}
|
||||
|
||||
// drop split info
|
||||
delUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
|
||||
return delUniqueIndexItem(bucketItem{
|
||||
bucket: getRootBucket(bc, tx, cnr),
|
||||
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
|
||||
data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
func hasAnyECChunks(tx *bbolt.Tx, bc *bucketCache, ech *objectSDK.ECHeader, cnr cid.ID) bool {
|
||||
data := getFromBucket(getECInfoBucket(bc, tx, cnr),
|
||||
objectKey(ech.Parent(), make([]byte, objectKeySize)))
|
||||
return len(data) > 0
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
|
||||
res.exists, res.locked, err = db.exists(tx, newBucketCache(), prm.addr, prm.ecParentAddr, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -89,10 +89,10 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
func (db *DB) exists(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
var locked bool
|
||||
if !ecParent.Equals(oid.Address{}) {
|
||||
st, err := objectStatus(tx, ecParent, currEpoch)
|
||||
st, err := objectStatusWithCache(bc, tx, ecParent, currEpoch)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
@ -103,10 +103,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
|||
return false, locked, ErrObjectIsExpired
|
||||
}
|
||||
|
||||
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
||||
locked = objectLockedWithCache(bc, tx, ecParent.Container(), ecParent.Object())
|
||||
}
|
||||
// check graveyard and object expiration first
|
||||
st, err := objectStatus(tx, addr, currEpoch)
|
||||
st, err := objectStatusWithCache(bc, tx, addr, currEpoch)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
@ -122,16 +122,15 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
|||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
|
||||
cnr := addr.Container()
|
||||
key := make([]byte, bucketKeySize)
|
||||
|
||||
// if graveyard is empty, then check if object exists in primary bucket
|
||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
||||
if inBucket(getPrimaryBucket(bc, tx, cnr), objKey) {
|
||||
return true, locked, nil
|
||||
}
|
||||
|
||||
// if primary bucket is empty, then check if object exists in parent bucket
|
||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
||||
if inBucket(getParentBucket(bc, tx, cnr), objKey) {
|
||||
splitInfo, err := getSplitInfo(tx, bc, cnr, objKey)
|
||||
if err != nil {
|
||||
return false, locked, err
|
||||
}
|
||||
|
@ -139,12 +138,12 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
|||
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
// if parent bucket is empty, then check if object exists in ec bucket
|
||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||
return false, locked, getECInfoError(tx, cnr, data)
|
||||
if data := getFromBucket(getECInfoBucket(bc, tx, cnr), objKey); len(data) != 0 {
|
||||
return false, locked, getECInfoError(tx, bc, cnr, data)
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists in typed buckets
|
||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||
return firstIrregularObjectType(tx, bc, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||
}
|
||||
|
||||
// objectStatus returns:
|
||||
|
@ -152,10 +151,6 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
|||
// - 1 if object with GC mark;
|
||||
// - 2 if object is covered with tombstone;
|
||||
// - 3 if object is expired.
|
||||
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
|
||||
return objectStatusWithCache(nil, tx, addr, currEpoch)
|
||||
}
|
||||
|
||||
func objectStatusWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
|
||||
// locked object could not be removed/marked with GC/expired
|
||||
if objectLockedWithCache(bc, tx, addr.Container(), addr.Object()) {
|
||||
|
@ -207,8 +202,7 @@ func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uin
|
|||
}
|
||||
|
||||
// inBucket checks if key <key> is present in bucket <name>.
|
||||
func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
||||
bkt := tx.Bucket(name)
|
||||
func inBucket(bkt *bbolt.Bucket, key []byte) bool {
|
||||
if bkt == nil {
|
||||
return false
|
||||
}
|
||||
|
@ -221,9 +215,8 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
|||
|
||||
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
||||
// if there is no `key` record in root index.
|
||||
func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
|
||||
bucketName := rootBucketName(cnr, make([]byte, bucketKeySize))
|
||||
rawSplitInfo := getFromBucket(tx, bucketName, key)
|
||||
func getSplitInfo(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
|
||||
rawSplitInfo := getFromBucket(getRootBucket(bc, tx, cnr), key)
|
||||
if len(rawSplitInfo) == 0 {
|
||||
return nil, ErrLackSplitInfo
|
||||
}
|
||||
|
|
|
@ -110,7 +110,6 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
|||
key = objectKey(addr.Object(), key)
|
||||
cnr := addr.Container()
|
||||
obj := objectSDK.New()
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
// check in primary index
|
||||
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
|
||||
|
@ -119,43 +118,40 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
|||
}
|
||||
}
|
||||
|
||||
data := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
||||
data := getFromBucket(getECInfoBucket(bc, tx, cnr), key)
|
||||
if len(data) != 0 {
|
||||
return nil, getECInfoError(tx, cnr, data)
|
||||
return nil, getECInfoError(tx, bc, cnr, data)
|
||||
}
|
||||
|
||||
// if not found then check in tombstone index
|
||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||
data = getFromBucket(getTombstoneBucket(bc, tx, cnr), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
// if not found then check in locker index
|
||||
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
||||
data = getFromBucket(getLockersBucket(bc, tx, cnr), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
// if not found then check if object is a virtual
|
||||
return getVirtualObject(tx, cnr, key, raw)
|
||||
return getVirtualObject(tx, bc, cnr, key, raw)
|
||||
}
|
||||
|
||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||
bkt := tx.Bucket(name)
|
||||
func getFromBucket(bkt *bbolt.Bucket, key []byte) []byte {
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return bkt.Get(key)
|
||||
}
|
||||
|
||||
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
|
||||
func getVirtualObject(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
|
||||
if raw {
|
||||
return nil, getSplitInfoError(tx, cnr, key)
|
||||
return nil, getSplitInfoError(tx, bc, cnr, key)
|
||||
}
|
||||
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
parentBucket := tx.Bucket(parentBucketName(cnr, bucketName))
|
||||
parentBucket := getParentBucket(bc, tx, cnr)
|
||||
if parentBucket == nil {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
@ -172,17 +168,17 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
var data []byte
|
||||
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
||||
virtualOID := relativeLst[len(relativeLst)-i-1]
|
||||
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||
data = getFromBucket(getPrimaryBucket(bc, tx, cnr), virtualOID)
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
// check if any of the relatives is an EC object
|
||||
for _, relative := range relativeLst {
|
||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
|
||||
data = getFromBucket(getECInfoBucket(bc, tx, cnr), relative)
|
||||
if len(data) > 0 {
|
||||
// we can't return object headers, but can return error,
|
||||
// so assembler can try to assemble complex object
|
||||
return nil, getSplitInfoError(tx, cnr, key)
|
||||
return nil, getSplitInfoError(tx, bc, cnr, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -203,8 +199,8 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
return par, nil
|
||||
}
|
||||
|
||||
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
||||
splitInfo, err := getSplitInfo(tx, cnr, key)
|
||||
func getSplitInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) error {
|
||||
splitInfo, err := getSplitInfo(tx, bc, cnr, key)
|
||||
if err == nil {
|
||||
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
|
@ -212,7 +208,7 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
|||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||
func getECInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, data []byte) error {
|
||||
keys, err := decodeList(data)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -220,7 +216,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
|||
ecInfo := objectSDK.NewECInfo()
|
||||
for _, key := range keys {
|
||||
// check in primary index
|
||||
objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
||||
objData := getFromBucket(getPrimaryBucket(bc, tx, cnr), key)
|
||||
if len(objData) != 0 {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(objData); err != nil {
|
||||
|
|
|
@ -287,19 +287,19 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh
|
|||
|
||||
var res InhumeRes
|
||||
|
||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
|
||||
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
garbageBKT := getGarbageBucket(bc, tx)
|
||||
graveyardBKT := getGraveyardBucket(bc, tx)
|
||||
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||
targetBucket, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range tss {
|
||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
||||
if err := db.inhumeTxSingle(tx, bc, targetBucket, value, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
|
||||
|
|
|
@ -199,8 +199,8 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||
}
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||
err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
return db.inhumeTx(tx, bc, currEpoch, prm, &res)
|
||||
})
|
||||
success = err == nil
|
||||
if success {
|
||||
|
@ -213,18 +213,15 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||
func (db *DB) inhumeTx(tx *bbolt.Tx, bc *bucketCache, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(getGarbageBucket(bc, tx), getGraveyardBucket(bc, tx), prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
for i := range prm.target {
|
||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
|
||||
if err := db.inhumeTxSingle(tx, bc, bkt, value, prm.target[i], buf, epoch, prm, res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -232,13 +229,12 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
return db.applyInhumeResToCounters(tx, res)
|
||||
}
|
||||
|
||||
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
func (db *DB) inhumeTxSingle(tx *bbolt.Tx, bc *bucketCache, targetBucket *bbolt.Bucket, value []byte, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
id := addr.Object()
|
||||
cnr := addr.Container()
|
||||
tx := bkt.Tx()
|
||||
|
||||
// prevent locked objects to be inhumed
|
||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||
if !prm.forceRemoval && objectLockedWithCache(bc, tx, cnr, id) {
|
||||
return new(apistatus.ObjectLocked)
|
||||
}
|
||||
|
||||
|
@ -248,23 +244,23 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
|||
// if `Inhume` was called not with the
|
||||
// `WithForceGCMark` option
|
||||
if !prm.forceRemoval {
|
||||
if isLockObject(tx, cnr, id) {
|
||||
if isLockObject(tx, bc, cnr, id) {
|
||||
return ErrLockObjectRemoval
|
||||
}
|
||||
|
||||
lockWasChecked = true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, addr, buf, false, true, epoch)
|
||||
obj, err := db.getWithCache(bc, tx, addr, buf, false, true, epoch)
|
||||
targetKey := addressKey(addr, buf)
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if err == nil {
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||
err = db.updateDeleteInfo(tx, bc, targetKey, cnr, obj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if errors.As(err, &ecErr) {
|
||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
||||
err = db.inhumeECInfo(tx, bc, epoch, prm.tomb, res, ecErr.ECInfo(), cnr, targetBucket, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -272,7 +268,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
|||
|
||||
if prm.tomb != nil {
|
||||
var isTomb bool
|
||||
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
||||
isTomb, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), targetKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -283,7 +279,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
|||
}
|
||||
|
||||
// consider checking if target is already in graveyard?
|
||||
err = bkt.Put(targetKey, value)
|
||||
err = targetBucket.Put(targetKey, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -297,15 +293,14 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
|||
return nil
|
||||
}
|
||||
|
||||
if isLockObject(tx, cnr, id) {
|
||||
if isLockObject(tx, bc, cnr, id) {
|
||||
res.deletedLockObj = append(res.deletedLockObj, addr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, bc *bucketCache, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
||||
) error {
|
||||
for _, chunk := range ecInfo.Chunks {
|
||||
|
@ -318,17 +313,17 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
|||
return err
|
||||
}
|
||||
chunkAddr.SetObject(chunkID)
|
||||
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||
chunkObj, err := db.getWithCache(bc, tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
|
||||
err = db.updateDeleteInfo(tx, bc, chunkKey, cnr, chunkObj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tomb != nil {
|
||||
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
||||
_, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), chunkKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -398,16 +393,16 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte
|
|||
return false, garbageBKT.Put(addressKey, zeroValue)
|
||||
}
|
||||
|
||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, bc *bucketCache, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
if inGraveyardWithKey(targetKey, getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx)) == 0 {
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
|
||||
}
|
||||
|
||||
// if object is stored, and it is regular object then update bucket
|
||||
// with container size estimations
|
||||
if obj.Type() == objectSDK.TypeRegular {
|
||||
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
|
||||
err := changeContainerSize(tx, bc, cnr, obj.PayloadSize(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -76,7 +76,8 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
|
|||
}
|
||||
|
||||
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
||||
b := tx.Bucket(expEpochToObjectBucketName)
|
||||
bc := newBucketCache()
|
||||
b := getExpEpochToObjectBucket(bc, tx)
|
||||
c := b.Cursor()
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
|
||||
|
@ -87,7 +88,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
|||
if expiresAfter >= epoch {
|
||||
return nil
|
||||
}
|
||||
if objectLocked(tx, cnr, obj) {
|
||||
if objectLockedWithCache(bc, tx, cnr, obj) {
|
||||
continue
|
||||
}
|
||||
var addr oid.Address
|
||||
|
@ -95,7 +96,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
|||
addr.SetObject(obj)
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
err = h(&ExpiredObject{
|
||||
typ: firstIrregularObjectType(tx, cnr, objKey),
|
||||
typ: firstIrregularObjectType(tx, bc, cnr, objKey),
|
||||
addr: addr,
|
||||
})
|
||||
if err == nil {
|
||||
|
|
|
@ -79,12 +79,12 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
|||
}
|
||||
key := make([]byte, cidSize)
|
||||
|
||||
return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
||||
return metaerr.Wrap(db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
if firstIrregularObjectType(tx, bc, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
||||
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
||||
}
|
||||
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
bucketLocked := getLockedBucket(bc, tx)
|
||||
|
||||
cnr.Encode(key)
|
||||
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
|
||||
|
@ -144,9 +144,9 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
|||
|
||||
var unlockedObjects []oid.Address
|
||||
|
||||
if err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
if err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
for i := range lockers {
|
||||
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
||||
unlocked, err := freePotentialLocks(tx, bc, lockers[i].Container(), lockers[i].Object())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -211,9 +211,9 @@ func getLocks(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
|||
// Operation is very resource-intensive, which is caused by the admissibility
|
||||
// of multiple locks. Also, if we knew what objects are locked, it would be
|
||||
// possible to speed up the execution.
|
||||
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
|
||||
func freePotentialLocks(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
|
||||
var unlockedObjects []oid.Address
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
bucketLocked := getLockedBucket(bc, tx)
|
||||
if bucketLocked == nil {
|
||||
return unlockedObjects, nil
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
|
@ -28,6 +29,11 @@ type (
|
|||
namedBucketItem struct {
|
||||
name, key, val []byte
|
||||
}
|
||||
|
||||
bucketItem struct {
|
||||
bucket *bbolt.Bucket
|
||||
key, val []byte
|
||||
}
|
||||
)
|
||||
|
||||
// PutPrm groups the parameters of Put operation.
|
||||
|
@ -63,6 +69,8 @@ var (
|
|||
ErrIncorrectRootObject = errors.New("invalid root object")
|
||||
)
|
||||
|
||||
const bucketNilAsserMsg = "bucket to put data is nil"
|
||||
|
||||
// Put saves object header in metabase. Object payload expected to be cut.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
|
@ -93,9 +101,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
var e error
|
||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
|
||||
res, e = db.put(tx, bc, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
|
||||
return e
|
||||
})
|
||||
if err == nil {
|
||||
|
@ -109,6 +117,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|||
}
|
||||
|
||||
func (db *DB) put(tx *bbolt.Tx,
|
||||
bc *bucketCache,
|
||||
obj *objectSDK.Object,
|
||||
id []byte,
|
||||
si *objectSDK.SplitInfo,
|
||||
|
@ -128,7 +137,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
|
||||
isParent := si != nil
|
||||
|
||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||
exists, _, err := db.exists(tx, bc, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
if errors.As(err, &splitInfoError) {
|
||||
|
@ -138,51 +147,51 @@ func (db *DB) put(tx *bbolt.Tx,
|
|||
}
|
||||
|
||||
if exists {
|
||||
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
||||
return PutRes{}, db.updateObj(tx, bc, obj, id, si, isParent)
|
||||
}
|
||||
|
||||
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
|
||||
return PutRes{Inserted: true}, db.insertObject(tx, bc, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
|
||||
}
|
||||
|
||||
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||
func (db *DB) updateObj(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||
// most right child and split header overlap parent so we have to
|
||||
// check if object exists to not overwrite it twice
|
||||
|
||||
// When storage engine moves objects between different sub-storages,
|
||||
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
||||
if !isParent && id != nil {
|
||||
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
|
||||
return setStorageID(tx, bc, objectCore.AddressOf(obj), id, true)
|
||||
}
|
||||
|
||||
// when storage already has last object in split hierarchy and there is
|
||||
// a linking object to put (or vice versa), we should update split info
|
||||
// with object ids of these objects
|
||||
if isParent {
|
||||
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
|
||||
return updateSplitInfo(tx, bc, objectCore.AddressOf(obj), si)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
||||
func (db *DB) insertObject(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
||||
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
||||
parentSI, err := splitInfoFromObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes)
|
||||
_, err = db.put(tx, bc, par, id, parentSI, currEpoch, indexAttributes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := putUniqueIndexes(tx, obj, si, id)
|
||||
err := putUniqueIndexes(tx, bc, obj, si, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("put unique indexes: %w", err)
|
||||
}
|
||||
|
||||
err = updateListIndexes(tx, obj, putListIndexItem)
|
||||
err = updateListIndexes(tx, bc, obj, putListIndexItem, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("put list indexes: %w", err)
|
||||
}
|
||||
|
@ -196,14 +205,14 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
|
||||
// update container volume size estimation
|
||||
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
||||
err = changeContainerSize(tx, bc, cnr, obj.PayloadSize(), true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !isParent {
|
||||
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
|
||||
if err = db.incCounters(tx, bc, cnr, IsUserObject(obj)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -211,69 +220,78 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
return nil
|
||||
}
|
||||
|
||||
func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
|
||||
func putUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
|
||||
isParent := si != nil
|
||||
addr := objectCore.AddressOf(obj)
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
if !isParent {
|
||||
err := putRawObjectData(tx, obj, bucketName, addr, objKey)
|
||||
err := putRawObjectData(tx, bc, obj, bucketName, addr, objKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if id != nil {
|
||||
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
||||
if err = setStorageID(tx, bc, objectCore.AddressOf(obj), id, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
|
||||
if err := putExpirationEpoch(tx, bc, obj, addr, objKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return putSplitInfo(tx, obj, bucketName, addr, si, objKey)
|
||||
return putSplitInfo(tx, bc, obj, bucketName, addr, si, objKey)
|
||||
}
|
||||
|
||||
func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
|
||||
func putRawObjectData(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
|
||||
var bkt *bbolt.Bucket
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeRegular:
|
||||
bucketName = primaryBucketName(addr.Container(), bucketName)
|
||||
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getPrimaryBucket, primaryBucketName(addr.Container(), bucketName), true)
|
||||
case objectSDK.TypeTombstone:
|
||||
bucketName = tombstoneBucketName(addr.Container(), bucketName)
|
||||
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getTombstoneBucket, tombstoneBucketName(addr.Container(), bucketName), true)
|
||||
case objectSDK.TypeLock:
|
||||
bucketName = bucketNameLockers(addr.Container(), bucketName)
|
||||
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getLockersBucket, bucketNameLockers(addr.Container(), bucketName), true)
|
||||
default:
|
||||
return ErrUnknownObjectType
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rawObject, err := obj.CutPayload().Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal object header: %w", err)
|
||||
}
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: bucketName,
|
||||
key: objKey,
|
||||
val: rawObject,
|
||||
return putUniqueIndexItem(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objKey,
|
||||
val: rawObject,
|
||||
})
|
||||
}
|
||||
|
||||
func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
|
||||
func putExpirationEpoch(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
|
||||
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
||||
err := putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: expEpochToObjectBucketName,
|
||||
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
|
||||
val: zeroValue,
|
||||
err := putUniqueIndexItem(bucketItem{
|
||||
bucket: getExpEpochToObjectBucket(bc, tx),
|
||||
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
|
||||
val: zeroValue,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getObjToExpEpochBucket, objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)), true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
val := make([]byte, epochSize)
|
||||
binary.LittleEndian.PutUint64(val, expEpoch)
|
||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)),
|
||||
key: objKey,
|
||||
val: val,
|
||||
err = putUniqueIndexItem(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objKey,
|
||||
val: val,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -282,7 +300,7 @@ func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, o
|
|||
return nil
|
||||
}
|
||||
|
||||
func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
|
||||
func putSplitInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
|
||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||
if ecHead := obj.ECHeader(); ecHead != nil {
|
||||
parentID := ecHead.Parent()
|
||||
|
@ -300,15 +318,19 @@ func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr o
|
|||
}
|
||||
objKey = objectKey(parentID, objKey)
|
||||
}
|
||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si)
|
||||
return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), bucketName, si)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
||||
return updateUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
func updateSplitInfoIndex(tx *bbolt.Tx, bc *bucketCache, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
||||
bkt, err := getOrCreateBucket(tx, bc, cnr, getRootBucket, rootBucketName(cnr, bucketName), true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return updateUniqueIndexItem(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objKey,
|
||||
}, func(old, _ []byte) ([]byte, error) {
|
||||
switch {
|
||||
case si == nil && old == nil:
|
||||
|
@ -328,78 +350,100 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []
|
|||
})
|
||||
}
|
||||
|
||||
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
||||
type updateIndexItemFunc = func(item bucketItem) error
|
||||
|
||||
func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||
func updateListIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, f updateIndexItemFunc, createBuckets bool) error {
|
||||
idObj, _ := obj.ID()
|
||||
cnr, _ := obj.ContainerID()
|
||||
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
idParent, ok := obj.ParentID()
|
||||
|
||||
// index parent ids
|
||||
if ok {
|
||||
err := f(tx, namedBucketItem{
|
||||
name: parentBucketName(cnr, bucketName),
|
||||
key: objectKey(idParent, make([]byte, objectKeySize)),
|
||||
val: objKey,
|
||||
})
|
||||
bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objectKey(idParent, make([]byte, objectKeySize)),
|
||||
val: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// index split ids
|
||||
if obj.SplitID() != nil {
|
||||
err := f(tx, namedBucketItem{
|
||||
name: splitBucketName(cnr, bucketName),
|
||||
key: obj.SplitID().ToV2(),
|
||||
val: objKey,
|
||||
})
|
||||
bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f(bucketItem{
|
||||
bucket: bkt,
|
||||
key: obj.SplitID().ToV2(),
|
||||
val: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if ech := obj.ECHeader(); ech != nil {
|
||||
err := f(tx, namedBucketItem{
|
||||
name: ecInfoBucketName(cnr, bucketName),
|
||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
val: objKey,
|
||||
})
|
||||
bkt, err := getOrCreateBucket(tx, bc, cnr, getECInfoBucket, ecInfoBucketName(cnr, bucketName), createBuckets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
val: objKey,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ech.ParentSplitID() != nil {
|
||||
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
||||
err := f(tx, namedBucketItem{
|
||||
name: splitBucketName(cnr, bucketName),
|
||||
key: ech.ParentSplitID().ToV2(),
|
||||
val: objKey,
|
||||
})
|
||||
bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f(bucketItem{
|
||||
bucket: bkt,
|
||||
key: ech.ParentSplitID().ToV2(),
|
||||
val: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
||||
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
||||
err := f(tx, namedBucketItem{
|
||||
name: parentBucketName(cnr, bucketName),
|
||||
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
|
||||
val: objKey,
|
||||
})
|
||||
bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
|
||||
val: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getOrCreateBucket(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, getter func(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket,
|
||||
bucketName []byte, create bool,
|
||||
) (*bbolt.Bucket, error) {
|
||||
bkt := getter(bc, tx, cnr)
|
||||
if bkt == nil && create {
|
||||
return createBucketLikelyExists(tx, bucketName)
|
||||
}
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
var indexedAttributes = map[string]struct{}{
|
||||
"S3-Access-Box-CRDT-Name": {},
|
||||
objectSDK.AttributeFilePath: {},
|
||||
|
@ -411,7 +455,9 @@ func IsAtrributeIndexed(attr string) bool {
|
|||
return found
|
||||
}
|
||||
|
||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||
type updateFKBTIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
||||
|
||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateFKBTIndexItemFunc) error {
|
||||
id, _ := obj.ID()
|
||||
cnr, _ := obj.ContainerID()
|
||||
objKey := objectKey(id, make([]byte, objectKeySize))
|
||||
|
@ -471,11 +517,9 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
|
|||
return tx.CreateBucket(name)
|
||||
}
|
||||
|
||||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||
}
|
||||
func updateUniqueIndexItem(item bucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||
bkt := item.bucket
|
||||
assert.True(bkt != nil, bucketNilAsserMsg)
|
||||
|
||||
data, err := update(bkt.Get(item.key), item.val)
|
||||
if err != nil {
|
||||
|
@ -484,8 +528,8 @@ func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldDa
|
|||
return bkt.Put(item.key, data)
|
||||
}
|
||||
|
||||
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||
func putUniqueIndexItem(item bucketItem) error {
|
||||
return updateUniqueIndexItem(item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||
}
|
||||
|
||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
|
@ -502,11 +546,9 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|||
return fkbtRoot.Put(item.val, zeroValue)
|
||||
}
|
||||
|
||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
||||
}
|
||||
func putListIndexItem(item bucketItem) error {
|
||||
bkt := item.bucket
|
||||
assert.True(bkt != nil, bucketNilAsserMsg)
|
||||
|
||||
lst, err := decodeList(bkt.Get(item.key))
|
||||
if err != nil {
|
||||
|
@ -595,9 +637,9 @@ func getVarUint(data []byte) (uint64, int, error) {
|
|||
|
||||
// setStorageID for existing objects if they were moved from one
|
||||
// storage location to another.
|
||||
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
|
||||
func setStorageID(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, id []byte, override bool) error {
|
||||
key := make([]byte, bucketKeySize)
|
||||
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
||||
bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getSmallBucket, smallBucketName(addr.Container(), key), true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -610,9 +652,9 @@ func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) erro
|
|||
|
||||
// updateSpliInfo for existing objects if storage filled with extra information
|
||||
// about last object in split hierarchy or linking object.
|
||||
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
|
||||
func updateSplitInfo(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, from *objectSDK.SplitInfo) error {
|
||||
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
||||
return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
||||
}
|
||||
|
||||
// splitInfoFromObject returns split info based on last or linkin object.
|
||||
|
|
|
@ -413,7 +413,7 @@ func (db *DB) selectObjectID(
|
|||
addr.SetObject(id)
|
||||
|
||||
var splitInfoError *objectSDK.SplitInfoError
|
||||
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
|
||||
ok, _, err := db.exists(tx, nil, addr, oid.Address{}, currEpoch)
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
raw := make([]byte, objectKeySize)
|
||||
id.Encode(raw)
|
||||
|
|
|
@ -126,8 +126,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
|||
return res, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
return setStorageID(tx, prm.addr, prm.id, true)
|
||||
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||
return setStorageID(tx, bc, prm.addr, prm.id, true)
|
||||
})
|
||||
success = err == nil
|
||||
return res, metaerr.Wrap(err)
|
||||
|
|
|
@ -168,20 +168,7 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a
|
|||
return nil
|
||||
}
|
||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||
if err := putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: expEpochToObjectBucketName,
|
||||
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||
val: zeroValue,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
val := make([]byte, epochSize)
|
||||
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
|
||||
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||
val: val,
|
||||
})
|
||||
return saveObjectExpirationEpoch(tx, obj)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -201,6 +188,31 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a
|
|||
return nil
|
||||
}
|
||||
|
||||
func saveObjectExpirationEpoch(tx *bbolt.Tx, obj objectIDToExpEpoch) error {
|
||||
bkt, err := createBucketLikelyExists(tx, expEpochToObjectBucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := putUniqueIndexItem(bucketItem{
|
||||
bucket: bkt,
|
||||
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||
val: zeroValue,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
bkt, err = createBucketLikelyExists(tx, objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
val := make([]byte, epochSize)
|
||||
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||
return putUniqueIndexItem(bucketItem{
|
||||
bucket: bkt,
|
||||
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||
val: val,
|
||||
})
|
||||
}
|
||||
|
||||
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
|
||||
defer close(objects)
|
||||
|
||||
|
|
|
@ -277,24 +277,22 @@ func objectKey(obj oid.ID, key []byte) []byte {
|
|||
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
||||
//
|
||||
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
||||
func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
|
||||
func firstIrregularObjectType(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
|
||||
if len(objs) == 0 {
|
||||
panic("empty object list in firstIrregularObjectType")
|
||||
}
|
||||
|
||||
var keys [2][1 + cidSize]byte
|
||||
|
||||
irregularTypeBuckets := [...]struct {
|
||||
typ objectSDK.Type
|
||||
name []byte
|
||||
typ objectSDK.Type
|
||||
bkt *bbolt.Bucket
|
||||
}{
|
||||
{objectSDK.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])},
|
||||
{objectSDK.TypeLock, bucketNameLockers(idCnr, keys[1][:])},
|
||||
{objectSDK.TypeTombstone, getTombstoneBucket(bc, tx, idCnr)},
|
||||
{objectSDK.TypeLock, getLockersBucket(bc, tx, idCnr)},
|
||||
}
|
||||
|
||||
for i := range objs {
|
||||
for j := range irregularTypeBuckets {
|
||||
if inBucket(tx, irregularTypeBuckets[j].name, objs[i]) {
|
||||
if inBucket(irregularTypeBuckets[j].bkt, objs[i]) {
|
||||
return irregularTypeBuckets[j].typ
|
||||
}
|
||||
}
|
||||
|
@ -304,8 +302,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object
|
|||
}
|
||||
|
||||
// return true if provided object is of LOCK type.
|
||||
func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
||||
return inBucket(tx,
|
||||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
||||
func isLockObject(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, obj oid.ID) bool {
|
||||
return inBucket(getLockersBucket(bc, tx, idCnr),
|
||||
objectKey(obj, make([]byte, objectKeySize)))
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue