Compare commits
9 commits
master
...
feat/metab
Author | SHA1 | Date | |
---|---|---|---|
796975e859 | |||
5869be6c11 | |||
fef6aa21c5 | |||
fb8d37f5ff | |||
4ba9d34188 | |||
4e63772632 | |||
84640ca9cf | |||
ce2b08e5a4 | |||
afed6e05b1 |
17 changed files with 589 additions and 278 deletions
148
pkg/local_object_storage/metabase/batch.go
Normal file
148
pkg/local_object_storage/metabase/batch.go
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
// NOTE: code is partially taken from https://github.com/etcd-io/bbolt/blob/v1.3.10/db.go
|
||||||
|
|
||||||
|
/*
|
||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
Copyright (c) 2013 Ben Johnson
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
this software and associated documentation files (the "Software"), to deal in
|
||||||
|
the Software without restriction, including without limitation the rights to
|
||||||
|
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fn func(*bbolt.Tx, *bucketCache) error
|
||||||
|
|
||||||
|
type call struct {
|
||||||
|
fn fn
|
||||||
|
err chan<- error
|
||||||
|
}
|
||||||
|
|
||||||
|
type batch struct {
|
||||||
|
db *DB
|
||||||
|
timer *time.Timer
|
||||||
|
start sync.Once
|
||||||
|
calls []call
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batch) trigger() {
|
||||||
|
b.start.Do(b.run)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *batch) run() {
|
||||||
|
b.db.batchMtx.Lock()
|
||||||
|
b.timer.Stop()
|
||||||
|
// Make sure no new work is added to this batch, but don't break
|
||||||
|
// other batches.
|
||||||
|
if b.db.batch == b {
|
||||||
|
b.db.batch = nil
|
||||||
|
}
|
||||||
|
b.db.batchMtx.Unlock()
|
||||||
|
|
||||||
|
bc := newBucketCache()
|
||||||
|
retry:
|
||||||
|
for len(b.calls) > 0 {
|
||||||
|
failIdx := -1
|
||||||
|
err := b.db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
for i, c := range b.calls {
|
||||||
|
if err := safelyCall(c.fn, tx, bc); err != nil {
|
||||||
|
failIdx = i
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if failIdx >= 0 {
|
||||||
|
// take the failing transaction out of the batch. it's
|
||||||
|
// safe to shorten b.calls here because db.batch no longer
|
||||||
|
// points to us, and we hold the mutex anyway.
|
||||||
|
c := b.calls[failIdx]
|
||||||
|
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
|
||||||
|
// tell the submitter re-run it solo, continue with the rest of the batch
|
||||||
|
c.err <- errTrySolo
|
||||||
|
continue retry
|
||||||
|
}
|
||||||
|
|
||||||
|
// pass success, or bolt internal errors, to all callers
|
||||||
|
for _, c := range b.calls {
|
||||||
|
c.err <- err
|
||||||
|
}
|
||||||
|
break retry
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func safelyCall(fn func(*bbolt.Tx, *bucketCache) error, tx *bbolt.Tx, bc *bucketCache) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
err = panicked{p}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return fn(tx, bc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) Batch(fn fn) error {
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
db.batchMtx.Lock()
|
||||||
|
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.boltBatchSize) {
|
||||||
|
// There is no existing batch, or the existing batch is full; start a new one.
|
||||||
|
db.batch = &batch{
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
db.batch.timer = time.AfterFunc(db.boltBatchDelay, db.batch.trigger)
|
||||||
|
}
|
||||||
|
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
|
||||||
|
if len(db.batch.calls) >= db.boltBatchSize {
|
||||||
|
// wake up batch, it's ready to run
|
||||||
|
go db.batch.trigger()
|
||||||
|
}
|
||||||
|
db.batchMtx.Unlock()
|
||||||
|
|
||||||
|
err := <-errCh
|
||||||
|
if err == errTrySolo {
|
||||||
|
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
return fn(tx, nil)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// errTrySolo is a special sentinel error value used for signaling that a
|
||||||
|
// transaction function should be re-run. It should never be seen by
|
||||||
|
// callers.
|
||||||
|
var errTrySolo = errors.New("batch function returned an error and should be re-run solo")
|
||||||
|
|
||||||
|
type panicked struct {
|
||||||
|
reason interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p panicked) Error() string {
|
||||||
|
if err, ok := p.reason.(error); ok {
|
||||||
|
return err.Error()
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("panic: %v", p.reason)
|
||||||
|
}
|
|
@ -9,8 +9,20 @@ type bucketCache struct {
|
||||||
locked *bbolt.Bucket
|
locked *bbolt.Bucket
|
||||||
graveyard *bbolt.Bucket
|
graveyard *bbolt.Bucket
|
||||||
garbage *bbolt.Bucket
|
garbage *bbolt.Bucket
|
||||||
|
expEpoch *bbolt.Bucket
|
||||||
|
contVol *bbolt.Bucket
|
||||||
|
contCount *bbolt.Bucket
|
||||||
|
shardInfo *bbolt.Bucket
|
||||||
expired map[cid.ID]*bbolt.Bucket
|
expired map[cid.ID]*bbolt.Bucket
|
||||||
primary map[cid.ID]*bbolt.Bucket
|
primary map[cid.ID]*bbolt.Bucket
|
||||||
|
parent map[cid.ID]*bbolt.Bucket
|
||||||
|
tombstone map[cid.ID]*bbolt.Bucket
|
||||||
|
lockers map[cid.ID]*bbolt.Bucket
|
||||||
|
small map[cid.ID]*bbolt.Bucket
|
||||||
|
root map[cid.ID]*bbolt.Bucket
|
||||||
|
expObj map[cid.ID]*bbolt.Bucket
|
||||||
|
split map[cid.ID]*bbolt.Bucket
|
||||||
|
ecInfo map[cid.ID]*bbolt.Bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBucketCache() *bucketCache {
|
func newBucketCache() *bucketCache {
|
||||||
|
@ -38,6 +50,34 @@ func getGarbageBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||||
return getBucket(&bc.garbage, tx, garbageBucketName)
|
return getBucket(&bc.garbage, tx, garbageBucketName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getExpEpochToObjectBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
return tx.Bucket(expEpochToObjectBucketName)
|
||||||
|
}
|
||||||
|
return getBucket(&bc.expEpoch, tx, expEpochToObjectBucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getContainerVolumeBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
return tx.Bucket(containerVolumeBucketName)
|
||||||
|
}
|
||||||
|
return getBucket(&bc.contVol, tx, containerVolumeBucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getShardInfoBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
return tx.Bucket(shardInfoBucket)
|
||||||
|
}
|
||||||
|
return getBucket(&bc.shardInfo, tx, shardInfoBucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getContainerCounterBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
return tx.Bucket(containerCounterBucketName)
|
||||||
|
}
|
||||||
|
return getBucket(&bc.contCount, tx, containerCounterBucketName)
|
||||||
|
}
|
||||||
|
|
||||||
func getBucket(cache **bbolt.Bucket, tx *bbolt.Tx, name []byte) *bbolt.Bucket {
|
func getBucket(cache **bbolt.Bucket, tx *bbolt.Tx, name []byte) *bbolt.Bucket {
|
||||||
if *cache != nil {
|
if *cache != nil {
|
||||||
return *cache
|
return *cache
|
||||||
|
@ -65,6 +105,78 @@ func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr)
|
return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getParentBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = parentBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.parent, tx, parentBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTombstoneBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = tombstoneBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.tombstone, tx, tombstoneBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLockersBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = bucketNameLockers(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.lockers, tx, bucketNameLockers, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSmallBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = smallBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.small, tx, smallBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRootBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = rootBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.root, tx, rootBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getObjToExpEpochBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = objectToExpirationEpochBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.expObj, tx, objectToExpirationEpochBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSplitBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = splitBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.split, tx, splitBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getECInfoBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
if bc == nil {
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
bucketName = ecInfoBucketName(cnr, bucketName)
|
||||||
|
return tx.Bucket(bucketName)
|
||||||
|
}
|
||||||
|
return getMappedBucket(&bc.ecInfo, tx, ecInfoBucketName, cnr)
|
||||||
|
}
|
||||||
|
|
||||||
func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
|
func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
|
||||||
value, ok := (*m)[cnr]
|
value, ok := (*m)[cnr]
|
||||||
if ok {
|
if ok {
|
||||||
|
|
|
@ -100,8 +100,8 @@ func parseContainerSize(v []byte) uint64 {
|
||||||
return binary.LittleEndian.Uint64(v)
|
return binary.LittleEndian.Uint64(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func changeContainerSize(tx *bbolt.Tx, id cid.ID, delta uint64, increase bool) error {
|
func changeContainerSize(tx *bbolt.Tx, bc *bucketCache, id cid.ID, delta uint64, increase bool) error {
|
||||||
containerVolume := tx.Bucket(containerVolumeBucketName)
|
containerVolume := getContainerVolumeBucket(bc, tx)
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
id.Encode(key)
|
id.Encode(key)
|
||||||
|
|
||||||
|
|
|
@ -231,10 +231,10 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
||||||
return result, metaerr.Wrap(err)
|
return result, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
func (db *DB) incCounters(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error {
|
||||||
b := tx.Bucket(shardInfoBucket)
|
b := getShardInfoBucket(bc, tx)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
||||||
|
@ -248,7 +248,7 @@ func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
return fmt.Errorf("increase user object counter: %w", err)
|
return fmt.Errorf("increase user object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
return db.incContainerObjectCounter(tx, bc, cnrID, isUserObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
|
func (db *DB) decShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint64) error {
|
||||||
|
@ -338,8 +338,8 @@ func nextValue(existed, delta uint64, inc bool) uint64 {
|
||||||
return existed
|
return existed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
func (db *DB) incContainerObjectCounter(tx *bbolt.Tx, bc *bucketCache, cnrID cid.ID, isUserObject bool) error {
|
||||||
b := tx.Bucket(containerCounterBucketName)
|
b := getContainerCounterBucket(bc, tx)
|
||||||
if b == nil {
|
if b == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,9 @@ type DB struct {
|
||||||
boltDB *bbolt.DB
|
boltDB *bbolt.DB
|
||||||
|
|
||||||
initialized bool
|
initialized bool
|
||||||
|
|
||||||
|
batchMtx sync.Mutex
|
||||||
|
batch *batch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is an option of DB constructor.
|
// Option is an option of DB constructor.
|
||||||
|
|
|
@ -110,8 +110,8 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
var err error
|
var err error
|
||||||
var res DeleteRes
|
var res DeleteRes
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
res, err = db.deleteGroup(tx, prm.addrs)
|
res, err = db.deleteGroup(tx, bc, prm.addrs)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -127,7 +127,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
|
|
||||||
// deleteGroup deletes object from the metabase. Handles removal of the
|
// deleteGroup deletes object from the metabase. Handles removal of the
|
||||||
// references of the split objects.
|
// references of the split objects.
|
||||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
func (db *DB) deleteGroup(tx *bbolt.Tx, bc *bucketCache, addrs []oid.Address) (DeleteRes, error) {
|
||||||
res := DeleteRes{
|
res := DeleteRes{
|
||||||
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||||
}
|
}
|
||||||
|
@ -135,7 +135,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
r, err := db.delete(tx, bc, addrs[i], refCounter, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeleteRes{}, err
|
return DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error)
|
||||||
|
|
||||||
for _, refNum := range refCounter {
|
for _, refNum := range refCounter {
|
||||||
if refNum.cur == refNum.all {
|
if refNum.cur == refNum.all {
|
||||||
err := db.deleteObject(tx, refNum.obj, true)
|
err := db.deleteObject(tx, bc, refNum.obj, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DeleteRes{}, err
|
return DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
@ -243,16 +243,16 @@ type deleteSingleResult struct {
|
||||||
// non-exist object is error-free). The second return value indicates if an
|
// non-exist object is error-free). The second return value indicates if an
|
||||||
// object was available before the removal (for calculating the logical object
|
// object was available before the removal (for calculating the logical object
|
||||||
// counter). The third return value The fourth return value is removed object payload size.
|
// counter). The third return value The fourth return value is removed object payload size.
|
||||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
|
func (db *DB) delete(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
|
||||||
key := make([]byte, addressKeySize)
|
key := make([]byte, addressKeySize)
|
||||||
addrKey := addressKey(addr, key)
|
addrKey := addressKey(addr, key)
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := getGarbageBucket(bc, tx)
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
graveyardBKT := getGraveyardBucket(bc, tx)
|
||||||
|
|
||||||
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
||||||
|
|
||||||
// unmarshal object, work only with physically stored (raw == true) objects
|
// unmarshal object, work only with physically stored (raw == true) objects
|
||||||
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
obj, err := db.getWithCache(bc, tx, addr, key, false, true, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrObjectNotFound(err) {
|
if client.IsErrObjectNotFound(err) {
|
||||||
addrKey = addressKey(addr, key)
|
addrKey = addressKey(addr, key)
|
||||||
|
@ -293,7 +293,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
nRef, ok := refCounter[k]
|
nRef, ok := refCounter[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
nRef = &referenceNumber{
|
nRef = &referenceNumber{
|
||||||
all: parentLength(tx, parAddr),
|
all: parentLength(tx, bc, parAddr),
|
||||||
obj: parent,
|
obj: parent,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,12 +306,12 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
isUserObject := IsUserObject(obj)
|
isUserObject := IsUserObject(obj)
|
||||||
|
|
||||||
// remove object
|
// remove object
|
||||||
err = db.deleteObject(tx, obj, false)
|
err = db.deleteObject(tx, bc, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
|
return deleteSingleResult{}, fmt.Errorf("remove object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
if err := deleteECRelatedInfo(tx, bc, obj, addr.Container(), refCounter); err != nil {
|
||||||
return deleteSingleResult{}, err
|
return deleteSingleResult{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,15 +325,16 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
|
|
||||||
func (db *DB) deleteObject(
|
func (db *DB) deleteObject(
|
||||||
tx *bbolt.Tx,
|
tx *bbolt.Tx,
|
||||||
|
bc *bucketCache,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
isParent bool,
|
isParent bool,
|
||||||
) error {
|
) error {
|
||||||
err := delUniqueIndexes(tx, obj, isParent)
|
err := delUniqueIndexes(tx, bc, obj, isParent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errFailedToRemoveUniqueIndexes
|
return errFailedToRemoveUniqueIndexes
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, delListIndexItem)
|
err = updateListIndexes(tx, bc, obj, delListIndexItem, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("remove list indexes: %w", err)
|
return fmt.Errorf("remove list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -345,7 +346,7 @@ func (db *DB) deleteObject(
|
||||||
|
|
||||||
if isParent {
|
if isParent {
|
||||||
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := getGarbageBucket(bc, tx)
|
||||||
if garbageBKT != nil {
|
if garbageBKT != nil {
|
||||||
key := make([]byte, addressKeySize)
|
key := make([]byte, addressKeySize)
|
||||||
addrKey := addressKey(object.AddressOf(obj), key)
|
addrKey := addressKey(object.AddressOf(obj), key)
|
||||||
|
@ -360,10 +361,10 @@ func (db *DB) deleteObject(
|
||||||
}
|
}
|
||||||
|
|
||||||
// parentLength returns amount of available children from parentid index.
|
// parentLength returns amount of available children from parentid index.
|
||||||
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
func parentLength(tx *bbolt.Tx, bc *bucketCache, addr oid.Address) int {
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
|
bkt := getParentBucket(bc, tx, addr.Container())
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -376,15 +377,16 @@ func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
||||||
return len(lst)
|
return len(lst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
func delUniqueIndexItem(item bucketItem) error {
|
||||||
bkt := tx.Bucket(item.name)
|
bkt := item.bucket
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
return bkt.Delete(item.key)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func delListIndexItem(item bucketItem) error {
|
||||||
bkt := tx.Bucket(item.name)
|
bkt := item.bucket
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -405,19 +407,16 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
|
|
||||||
// if list empty, remove the key from <list> bucket
|
// if list empty, remove the key from <list> bucket
|
||||||
if len(lst) == 0 {
|
if len(lst) == 0 {
|
||||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
return bkt.Delete(item.key)
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if list is not empty, then update it
|
// if list is not empty, then update it
|
||||||
encodedLst, err := encodeList(lst)
|
encodedLst, err := encodeList(lst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil // ignore error, best effort there
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = bkt.Put(item.key, encodedLst) // ignore error, best effort there
|
return bkt.Put(item.key, encodedLst)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
|
@ -460,67 +459,79 @@ func hasAnyItem(b *bbolt.Bucket) bool {
|
||||||
return hasAnyItem
|
return hasAnyItem
|
||||||
}
|
}
|
||||||
|
|
||||||
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
func delUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, isParent bool) error {
|
||||||
addr := object.AddressOf(obj)
|
addr := object.AddressOf(obj)
|
||||||
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
cnr := addr.Container()
|
cnr := addr.Container()
|
||||||
bucketName := make([]byte, bucketKeySize)
|
var bkt *bbolt.Bucket
|
||||||
|
|
||||||
// add value to primary unique bucket
|
// add value to primary unique bucket
|
||||||
if !isParent {
|
if !isParent {
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeRegular:
|
case objectSDK.TypeRegular:
|
||||||
bucketName = primaryBucketName(cnr, bucketName)
|
bkt = getPrimaryBucket(bc, tx, cnr)
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
bucketName = tombstoneBucketName(cnr, bucketName)
|
bkt = getTombstoneBucket(bc, tx, cnr)
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
bucketName = bucketNameLockers(cnr, bucketName)
|
bkt = getLockersBucket(bc, tx, cnr)
|
||||||
default:
|
default:
|
||||||
return ErrUnknownObjectType
|
return ErrUnknownObjectType
|
||||||
}
|
}
|
||||||
|
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
if err := delUniqueIndexItem(bucketItem{
|
||||||
name: bucketName,
|
bucket: bkt,
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
if err := delUniqueIndexItem(bucketItem{
|
||||||
name: parentBucketName(cnr, bucketName),
|
bucket: getParentBucket(bc, tx, cnr),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
if err := delUniqueIndexItem(bucketItem{ // remove from storage id index
|
||||||
name: smallBucketName(cnr, bucketName),
|
bucket: getSmallBucket(bc, tx, cnr),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
return err
|
||||||
name: rootBucketName(cnr, bucketName),
|
}
|
||||||
|
if err := delUniqueIndexItem(bucketItem{ // remove from root index
|
||||||
|
bucket: getRootBucket(bc, tx, cnr),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
if err := delUniqueIndexItem(bucketItem{
|
||||||
name: expEpochToObjectBucketName,
|
bucket: getExpEpochToObjectBucket(bc, tx),
|
||||||
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
|
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
|
||||||
})
|
}); err != nil {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
return err
|
||||||
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
|
}
|
||||||
|
if err := delUniqueIndexItem(bucketItem{
|
||||||
|
bucket: getObjToExpEpochBucket(bc, tx, cnr),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
|
func deleteECRelatedInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
|
||||||
ech := obj.ECHeader()
|
ech := obj.ECHeader()
|
||||||
if ech == nil {
|
if ech == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
garbageBKT := getGarbageBucket(bc, tx)
|
||||||
hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
|
hasAnyChunks := hasAnyECChunks(tx, bc, ech, cnr)
|
||||||
// drop EC parent GC mark if current EC chunk is the last one
|
// drop EC parent GC mark if current EC chunk is the last one
|
||||||
if !hasAnyChunks && garbageBKT != nil {
|
if !hasAnyChunks && garbageBKT != nil {
|
||||||
var ecParentAddress oid.Address
|
var ecParentAddress oid.Address
|
||||||
|
@ -535,10 +546,12 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
|
|
||||||
// also drop EC parent root info if current EC chunk is the last one
|
// also drop EC parent root info if current EC chunk is the last one
|
||||||
if !hasAnyChunks {
|
if !hasAnyChunks {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
if err := delUniqueIndexItem(bucketItem{
|
||||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
bucket: getRootBucket(bc, tx, cnr),
|
||||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||||
})
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ech.ParentSplitParentID() == nil {
|
if ech.ParentSplitParentID() == nil {
|
||||||
|
@ -557,7 +570,7 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if parentLength(tx, splitParentAddress) > 0 {
|
if parentLength(tx, bc, splitParentAddress) > 0 {
|
||||||
// linking object still exists, so leave split info and gc mark deletion for linking object processing
|
// linking object still exists, so leave split info and gc mark deletion for linking object processing
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -572,15 +585,14 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop split info
|
// drop split info
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
return delUniqueIndexItem(bucketItem{
|
||||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
bucket: getRootBucket(bc, tx, cnr),
|
||||||
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
|
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
|
||||||
})
|
})
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
|
func hasAnyECChunks(tx *bbolt.Tx, bc *bucketCache, ech *objectSDK.ECHeader, cnr cid.ID) bool {
|
||||||
data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
data := getFromBucket(getECInfoBucket(bc, tx, cnr),
|
||||||
objectKey(ech.Parent(), make([]byte, objectKeySize)))
|
objectKey(ech.Parent(), make([]byte, objectKeySize)))
|
||||||
return len(data) > 0
|
return len(data) > 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.ecParentAddr, currEpoch)
|
res.exists, res.locked, err = db.exists(tx, newBucketCache(), prm.addr, prm.ecParentAddr, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -89,10 +89,10 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
func (db *DB) exists(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, ecParent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||||
var locked bool
|
var locked bool
|
||||||
if !ecParent.Equals(oid.Address{}) {
|
if !ecParent.Equals(oid.Address{}) {
|
||||||
st, err := objectStatus(tx, ecParent, currEpoch)
|
st, err := objectStatusWithCache(bc, tx, ecParent, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
|
@ -103,10 +103,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
||||||
return false, locked, ErrObjectIsExpired
|
return false, locked, ErrObjectIsExpired
|
||||||
}
|
}
|
||||||
|
|
||||||
locked = objectLocked(tx, ecParent.Container(), ecParent.Object())
|
locked = objectLockedWithCache(bc, tx, ecParent.Container(), ecParent.Object())
|
||||||
}
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
st, err := objectStatus(tx, addr, currEpoch)
|
st, err := objectStatusWithCache(bc, tx, addr, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
|
@ -122,16 +122,15 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
|
||||||
cnr := addr.Container()
|
cnr := addr.Container()
|
||||||
key := make([]byte, bucketKeySize)
|
|
||||||
|
|
||||||
// if graveyard is empty, then check if object exists in primary bucket
|
// if graveyard is empty, then check if object exists in primary bucket
|
||||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
if inBucket(getPrimaryBucket(bc, tx, cnr), objKey) {
|
||||||
return true, locked, nil
|
return true, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if primary bucket is empty, then check if object exists in parent bucket
|
// if primary bucket is empty, then check if object exists in parent bucket
|
||||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
if inBucket(getParentBucket(bc, tx, cnr), objKey) {
|
||||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
splitInfo, err := getSplitInfo(tx, bc, cnr, objKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, locked, err
|
return false, locked, err
|
||||||
}
|
}
|
||||||
|
@ -139,12 +138,12 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
||||||
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||||
}
|
}
|
||||||
// if parent bucket is empty, then check if object exists in ec bucket
|
// if parent bucket is empty, then check if object exists in ec bucket
|
||||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
if data := getFromBucket(getECInfoBucket(bc, tx, cnr), objKey); len(data) != 0 {
|
||||||
return false, locked, getECInfoError(tx, cnr, data)
|
return false, locked, getECInfoError(tx, bc, cnr, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if parent bucket is empty, then check if object exists in typed buckets
|
// if parent bucket is empty, then check if object exists in typed buckets
|
||||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
return firstIrregularObjectType(tx, bc, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectStatus returns:
|
// objectStatus returns:
|
||||||
|
@ -152,10 +151,6 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, ecParent oid.Address, currE
|
||||||
// - 1 if object with GC mark;
|
// - 1 if object with GC mark;
|
||||||
// - 2 if object is covered with tombstone;
|
// - 2 if object is covered with tombstone;
|
||||||
// - 3 if object is expired.
|
// - 3 if object is expired.
|
||||||
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
|
|
||||||
return objectStatusWithCache(nil, tx, addr, currEpoch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func objectStatusWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
|
func objectStatusWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
|
||||||
// locked object could not be removed/marked with GC/expired
|
// locked object could not be removed/marked with GC/expired
|
||||||
if objectLockedWithCache(bc, tx, addr.Container(), addr.Object()) {
|
if objectLockedWithCache(bc, tx, addr.Container(), addr.Object()) {
|
||||||
|
@ -207,8 +202,7 @@ func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uin
|
||||||
}
|
}
|
||||||
|
|
||||||
// inBucket checks if key <key> is present in bucket <name>.
|
// inBucket checks if key <key> is present in bucket <name>.
|
||||||
func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
func inBucket(bkt *bbolt.Bucket, key []byte) bool {
|
||||||
bkt := tx.Bucket(name)
|
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -221,9 +215,8 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
||||||
|
|
||||||
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
||||||
// if there is no `key` record in root index.
|
// if there is no `key` record in root index.
|
||||||
func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
|
func getSplitInfo(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
|
||||||
bucketName := rootBucketName(cnr, make([]byte, bucketKeySize))
|
rawSplitInfo := getFromBucket(getRootBucket(bc, tx, cnr), key)
|
||||||
rawSplitInfo := getFromBucket(tx, bucketName, key)
|
|
||||||
if len(rawSplitInfo) == 0 {
|
if len(rawSplitInfo) == 0 {
|
||||||
return nil, ErrLackSplitInfo
|
return nil, ErrLackSplitInfo
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,6 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
||||||
key = objectKey(addr.Object(), key)
|
key = objectKey(addr.Object(), key)
|
||||||
cnr := addr.Container()
|
cnr := addr.Container()
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
bucketName := make([]byte, bucketKeySize)
|
|
||||||
|
|
||||||
// check in primary index
|
// check in primary index
|
||||||
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
|
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
|
||||||
|
@ -119,43 +118,40 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
data := getFromBucket(getECInfoBucket(bc, tx, cnr), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return nil, getECInfoError(tx, cnr, data)
|
return nil, getECInfoError(tx, bc, cnr, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check in tombstone index
|
// if not found then check in tombstone index
|
||||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
data = getFromBucket(getTombstoneBucket(bc, tx, cnr), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(data)
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check in locker index
|
// if not found then check in locker index
|
||||||
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
data = getFromBucket(getLockersBucket(bc, tx, cnr), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(data)
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check if object is a virtual
|
// if not found then check if object is a virtual
|
||||||
return getVirtualObject(tx, cnr, key, raw)
|
return getVirtualObject(tx, bc, cnr, key, raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
func getFromBucket(bkt *bbolt.Bucket, key []byte) []byte {
|
||||||
bkt := tx.Bucket(name)
|
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return bkt.Get(key)
|
return bkt.Get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
|
func getVirtualObject(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
|
||||||
if raw {
|
if raw {
|
||||||
return nil, getSplitInfoError(tx, cnr, key)
|
return nil, getSplitInfoError(tx, bc, cnr, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
parentBucket := getParentBucket(bc, tx, cnr)
|
||||||
parentBucket := tx.Bucket(parentBucketName(cnr, bucketName))
|
|
||||||
if parentBucket == nil {
|
if parentBucket == nil {
|
||||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
@ -172,17 +168,17 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
||||||
var data []byte
|
var data []byte
|
||||||
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
||||||
virtualOID := relativeLst[len(relativeLst)-i-1]
|
virtualOID := relativeLst[len(relativeLst)-i-1]
|
||||||
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
data = getFromBucket(getPrimaryBucket(bc, tx, cnr), virtualOID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
// check if any of the relatives is an EC object
|
// check if any of the relatives is an EC object
|
||||||
for _, relative := range relativeLst {
|
for _, relative := range relativeLst {
|
||||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
|
data = getFromBucket(getECInfoBucket(bc, tx, cnr), relative)
|
||||||
if len(data) > 0 {
|
if len(data) > 0 {
|
||||||
// we can't return object headers, but can return error,
|
// we can't return object headers, but can return error,
|
||||||
// so assembler can try to assemble complex object
|
// so assembler can try to assemble complex object
|
||||||
return nil, getSplitInfoError(tx, cnr, key)
|
return nil, getSplitInfoError(tx, bc, cnr, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,8 +199,8 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
||||||
return par, nil
|
return par, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
func getSplitInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, key []byte) error {
|
||||||
splitInfo, err := getSplitInfo(tx, cnr, key)
|
splitInfo, err := getSplitInfo(tx, bc, cnr, key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||||
}
|
}
|
||||||
|
@ -212,7 +208,7 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
func getECInfoError(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, data []byte) error {
|
||||||
keys, err := decodeList(data)
|
keys, err := decodeList(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -220,7 +216,7 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||||
ecInfo := objectSDK.NewECInfo()
|
ecInfo := objectSDK.NewECInfo()
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
// check in primary index
|
// check in primary index
|
||||||
objData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
objData := getFromBucket(getPrimaryBucket(bc, tx, cnr), key)
|
||||||
if len(objData) != 0 {
|
if len(objData) != 0 {
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(objData); err != nil {
|
if err := obj.Unmarshal(objData); err != nil {
|
||||||
|
|
|
@ -287,19 +287,19 @@ func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (Inh
|
||||||
|
|
||||||
var res InhumeRes
|
var res InhumeRes
|
||||||
|
|
||||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
|
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
|
||||||
|
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := getGarbageBucket(bc, tx)
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
graveyardBKT := getGraveyardBucket(bc, tx)
|
||||||
|
|
||||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
targetBucket, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range tss {
|
for i := range tss {
|
||||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
if err := db.inhumeTxSingle(tx, bc, targetBucket, value, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
|
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
|
||||||
|
|
|
@ -199,8 +199,8 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||||
}
|
}
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
return db.inhumeTx(tx, bc, currEpoch, prm, &res)
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
if success {
|
if success {
|
||||||
|
@ -213,18 +213,15 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
func (db *DB) inhumeTx(tx *bbolt.Tx, bc *bucketCache, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
bkt, value, err := db.getInhumeTargetBucketAndValue(getGarbageBucket(bc, tx), getGraveyardBucket(bc, tx), prm)
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
|
||||||
|
|
||||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, addressKeySize)
|
buf := make([]byte, addressKeySize)
|
||||||
for i := range prm.target {
|
for i := range prm.target {
|
||||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
|
if err := db.inhumeTxSingle(tx, bc, bkt, value, prm.target[i], buf, epoch, prm, res); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,13 +229,12 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
return db.applyInhumeResToCounters(tx, res)
|
return db.applyInhumeResToCounters(tx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
func (db *DB) inhumeTxSingle(tx *bbolt.Tx, bc *bucketCache, targetBucket *bbolt.Bucket, value []byte, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||||
id := addr.Object()
|
id := addr.Object()
|
||||||
cnr := addr.Container()
|
cnr := addr.Container()
|
||||||
tx := bkt.Tx()
|
|
||||||
|
|
||||||
// prevent locked objects to be inhumed
|
// prevent locked objects to be inhumed
|
||||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
if !prm.forceRemoval && objectLockedWithCache(bc, tx, cnr, id) {
|
||||||
return new(apistatus.ObjectLocked)
|
return new(apistatus.ObjectLocked)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,23 +244,23 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
||||||
// if `Inhume` was called not with the
|
// if `Inhume` was called not with the
|
||||||
// `WithForceGCMark` option
|
// `WithForceGCMark` option
|
||||||
if !prm.forceRemoval {
|
if !prm.forceRemoval {
|
||||||
if isLockObject(tx, cnr, id) {
|
if isLockObject(tx, bc, cnr, id) {
|
||||||
return ErrLockObjectRemoval
|
return ErrLockObjectRemoval
|
||||||
}
|
}
|
||||||
|
|
||||||
lockWasChecked = true
|
lockWasChecked = true
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := db.get(tx, addr, buf, false, true, epoch)
|
obj, err := db.getWithCache(bc, tx, addr, buf, false, true, epoch)
|
||||||
targetKey := addressKey(addr, buf)
|
targetKey := addressKey(addr, buf)
|
||||||
var ecErr *objectSDK.ECInfoError
|
var ecErr *objectSDK.ECInfoError
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
err = db.updateDeleteInfo(tx, bc, targetKey, cnr, obj, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if errors.As(err, &ecErr) {
|
} else if errors.As(err, &ecErr) {
|
||||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
err = db.inhumeECInfo(tx, bc, epoch, prm.tomb, res, ecErr.ECInfo(), cnr, targetBucket, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -272,7 +268,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
||||||
|
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
var isTomb bool
|
var isTomb bool
|
||||||
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
isTomb, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), targetKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -283,7 +279,7 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
||||||
}
|
}
|
||||||
|
|
||||||
// consider checking if target is already in graveyard?
|
// consider checking if target is already in graveyard?
|
||||||
err = bkt.Put(targetKey, value)
|
err = targetBucket.Put(targetKey, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -297,15 +293,14 @@ func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garb
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if isLockObject(tx, cnr, id) {
|
if isLockObject(tx, bc, cnr, id) {
|
||||||
res.deletedLockObj = append(res.deletedLockObj, addr)
|
res.deletedLockObj = append(res.deletedLockObj, addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
func (db *DB) inhumeECInfo(tx *bbolt.Tx, bc *bucketCache, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
|
||||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
||||||
) error {
|
) error {
|
||||||
for _, chunk := range ecInfo.Chunks {
|
for _, chunk := range ecInfo.Chunks {
|
||||||
|
@ -318,17 +313,17 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
chunkAddr.SetObject(chunkID)
|
chunkAddr.SetObject(chunkID)
|
||||||
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
|
chunkObj, err := db.getWithCache(bc, tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, chunkKey, cnr, chunkObj, res)
|
err = db.updateDeleteInfo(tx, bc, chunkKey, cnr, chunkObj, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if tomb != nil {
|
if tomb != nil {
|
||||||
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
_, err = db.markAsGC(getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx), chunkKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -398,16 +393,16 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, addressKey []byte
|
||||||
return false, garbageBKT.Put(addressKey, zeroValue)
|
return false, garbageBKT.Put(addressKey, zeroValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, bc *bucketCache, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||||
containerID, _ := obj.ContainerID()
|
containerID, _ := obj.ContainerID()
|
||||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
if inGraveyardWithKey(targetKey, getGraveyardBucket(bc, tx), getGarbageBucket(bc, tx)) == 0 {
|
||||||
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
|
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
|
||||||
}
|
}
|
||||||
|
|
||||||
// if object is stored, and it is regular object then update bucket
|
// if object is stored, and it is regular object then update bucket
|
||||||
// with container size estimations
|
// with container size estimations
|
||||||
if obj.Type() == objectSDK.TypeRegular {
|
if obj.Type() == objectSDK.TypeRegular {
|
||||||
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
|
err := changeContainerSize(tx, bc, cnr, obj.PayloadSize(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,8 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
||||||
b := tx.Bucket(expEpochToObjectBucketName)
|
bc := newBucketCache()
|
||||||
|
b := getExpEpochToObjectBucket(bc, tx)
|
||||||
c := b.Cursor()
|
c := b.Cursor()
|
||||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||||
expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
|
expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
|
||||||
|
@ -87,7 +88,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
||||||
if expiresAfter >= epoch {
|
if expiresAfter >= epoch {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if objectLocked(tx, cnr, obj) {
|
if objectLockedWithCache(bc, tx, cnr, obj) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
@ -95,7 +96,7 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
||||||
addr.SetObject(obj)
|
addr.SetObject(obj)
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
err = h(&ExpiredObject{
|
err = h(&ExpiredObject{
|
||||||
typ: firstIrregularObjectType(tx, cnr, objKey),
|
typ: firstIrregularObjectType(tx, bc, cnr, objKey),
|
||||||
addr: addr,
|
addr: addr,
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -79,12 +79,12 @@ func (db *DB) lockInternal(locked []oid.ID, cnr cid.ID, locker oid.ID) error {
|
||||||
}
|
}
|
||||||
key := make([]byte, cidSize)
|
key := make([]byte, cidSize)
|
||||||
|
|
||||||
return metaerr.Wrap(db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
return metaerr.Wrap(db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
if firstIrregularObjectType(tx, bc, cnr, bucketKeysLocked...) != objectSDK.TypeRegular {
|
||||||
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
return logicerr.Wrap(new(apistatus.LockNonRegularObject))
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
bucketLocked := getLockedBucket(bc, tx)
|
||||||
|
|
||||||
cnr.Encode(key)
|
cnr.Encode(key)
|
||||||
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
|
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists(key)
|
||||||
|
@ -144,9 +144,9 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
||||||
|
|
||||||
var unlockedObjects []oid.Address
|
var unlockedObjects []oid.Address
|
||||||
|
|
||||||
if err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
if err := db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
for i := range lockers {
|
for i := range lockers {
|
||||||
unlocked, err := freePotentialLocks(tx, lockers[i].Container(), lockers[i].Object())
|
unlocked, err := freePotentialLocks(tx, bc, lockers[i].Container(), lockers[i].Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -211,9 +211,9 @@ func getLocks(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) ([]oid.ID, error) {
|
||||||
// Operation is very resource-intensive, which is caused by the admissibility
|
// Operation is very resource-intensive, which is caused by the admissibility
|
||||||
// of multiple locks. Also, if we knew what objects are locked, it would be
|
// of multiple locks. Also, if we knew what objects are locked, it would be
|
||||||
// possible to speed up the execution.
|
// possible to speed up the execution.
|
||||||
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
|
func freePotentialLocks(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, locker oid.ID) ([]oid.Address, error) {
|
||||||
var unlockedObjects []oid.Address
|
var unlockedObjects []oid.Address
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
bucketLocked := getLockedBucket(bc, tx)
|
||||||
if bucketLocked == nil {
|
if bucketLocked == nil {
|
||||||
return unlockedObjects, nil
|
return unlockedObjects, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
@ -28,6 +29,11 @@ type (
|
||||||
namedBucketItem struct {
|
namedBucketItem struct {
|
||||||
name, key, val []byte
|
name, key, val []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bucketItem struct {
|
||||||
|
bucket *bbolt.Bucket
|
||||||
|
key, val []byte
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// PutPrm groups the parameters of Put operation.
|
// PutPrm groups the parameters of Put operation.
|
||||||
|
@ -63,6 +69,8 @@ var (
|
||||||
ErrIncorrectRootObject = errors.New("invalid root object")
|
ErrIncorrectRootObject = errors.New("invalid root object")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const bucketNilAsserMsg = "bucket to put data is nil"
|
||||||
|
|
||||||
// Put saves object header in metabase. Object payload expected to be cut.
|
// Put saves object header in metabase. Object payload expected to be cut.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||||
|
@ -93,9 +101,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
var e error
|
var e error
|
||||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
|
res, e = db.put(tx, bc, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -109,6 +117,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) put(tx *bbolt.Tx,
|
func (db *DB) put(tx *bbolt.Tx,
|
||||||
|
bc *bucketCache,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
id []byte,
|
id []byte,
|
||||||
si *objectSDK.SplitInfo,
|
si *objectSDK.SplitInfo,
|
||||||
|
@ -128,7 +137,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
|
||||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
exists, _, err := db.exists(tx, bc, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
|
@ -138,51 +147,51 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
return PutRes{}, db.updateObj(tx, bc, obj, id, si, isParent)
|
||||||
}
|
}
|
||||||
|
|
||||||
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
|
return PutRes{Inserted: true}, db.insertObject(tx, bc, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
func (db *DB) updateObj(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||||
// most right child and split header overlap parent so we have to
|
// most right child and split header overlap parent so we have to
|
||||||
// check if object exists to not overwrite it twice
|
// check if object exists to not overwrite it twice
|
||||||
|
|
||||||
// When storage engine moves objects between different sub-storages,
|
// When storage engine moves objects between different sub-storages,
|
||||||
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
||||||
if !isParent && id != nil {
|
if !isParent && id != nil {
|
||||||
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
|
return setStorageID(tx, bc, objectCore.AddressOf(obj), id, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// when storage already has last object in split hierarchy and there is
|
// when storage already has last object in split hierarchy and there is
|
||||||
// a linking object to put (or vice versa), we should update split info
|
// a linking object to put (or vice versa), we should update split info
|
||||||
// with object ids of these objects
|
// with object ids of these objects
|
||||||
if isParent {
|
if isParent {
|
||||||
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
|
return updateSplitInfo(tx, bc, objectCore.AddressOf(obj), si)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
func (db *DB) insertObject(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
||||||
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
||||||
parentSI, err := splitInfoFromObject(obj)
|
parentSI, err := splitInfoFromObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes)
|
_, err = db.put(tx, bc, par, id, parentSI, currEpoch, indexAttributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := putUniqueIndexes(tx, obj, si, id)
|
err := putUniqueIndexes(tx, bc, obj, si, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("put unique indexes: %w", err)
|
return fmt.Errorf("put unique indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, putListIndexItem)
|
err = updateListIndexes(tx, bc, obj, putListIndexItem, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("put list indexes: %w", err)
|
return fmt.Errorf("put list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -196,14 +205,14 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
|
|
||||||
// update container volume size estimation
|
// update container volume size estimation
|
||||||
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||||
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
err = changeContainerSize(tx, bc, cnr, obj.PayloadSize(), true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isParent {
|
if !isParent {
|
||||||
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
|
if err = db.incCounters(tx, bc, cnr, IsUserObject(obj)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -211,67 +220,76 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
|
func putUniqueIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
if !isParent {
|
if !isParent {
|
||||||
err := putRawObjectData(tx, obj, bucketName, addr, objKey)
|
err := putRawObjectData(tx, bc, obj, bucketName, addr, objKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if id != nil {
|
if id != nil {
|
||||||
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
if err = setStorageID(tx, bc, objectCore.AddressOf(obj), id, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
|
if err := putExpirationEpoch(tx, bc, obj, addr, objKey); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return putSplitInfo(tx, obj, bucketName, addr, si, objKey)
|
return putSplitInfo(tx, bc, obj, bucketName, addr, si, objKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
|
func putRawObjectData(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
|
||||||
|
var bkt *bbolt.Bucket
|
||||||
|
var err error
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeRegular:
|
case objectSDK.TypeRegular:
|
||||||
bucketName = primaryBucketName(addr.Container(), bucketName)
|
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getPrimaryBucket, primaryBucketName(addr.Container(), bucketName), true)
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
bucketName = tombstoneBucketName(addr.Container(), bucketName)
|
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getTombstoneBucket, tombstoneBucketName(addr.Container(), bucketName), true)
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
bucketName = bucketNameLockers(addr.Container(), bucketName)
|
bkt, err = getOrCreateBucket(tx, bc, addr.Container(), getLockersBucket, bucketNameLockers(addr.Container(), bucketName), true)
|
||||||
default:
|
default:
|
||||||
return ErrUnknownObjectType
|
return ErrUnknownObjectType
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
rawObject, err := obj.CutPayload().Marshal()
|
rawObject, err := obj.CutPayload().Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("marshal object header: %w", err)
|
return fmt.Errorf("marshal object header: %w", err)
|
||||||
}
|
}
|
||||||
return putUniqueIndexItem(tx, namedBucketItem{
|
return putUniqueIndexItem(bucketItem{
|
||||||
name: bucketName,
|
bucket: bkt,
|
||||||
key: objKey,
|
key: objKey,
|
||||||
val: rawObject,
|
val: rawObject,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
|
func putExpirationEpoch(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
|
||||||
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
||||||
err := putUniqueIndexItem(tx, namedBucketItem{
|
err := putUniqueIndexItem(bucketItem{
|
||||||
name: expEpochToObjectBucketName,
|
bucket: getExpEpochToObjectBucket(bc, tx),
|
||||||
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
|
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
|
||||||
val: zeroValue,
|
val: zeroValue,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getObjToExpEpochBucket, objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)), true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
val := make([]byte, epochSize)
|
val := make([]byte, epochSize)
|
||||||
binary.LittleEndian.PutUint64(val, expEpoch)
|
binary.LittleEndian.PutUint64(val, expEpoch)
|
||||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
err = putUniqueIndexItem(bucketItem{
|
||||||
name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)),
|
bucket: bkt,
|
||||||
key: objKey,
|
key: objKey,
|
||||||
val: val,
|
val: val,
|
||||||
})
|
})
|
||||||
|
@ -282,7 +300,7 @@ func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, o
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
|
func putSplitInfo(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
|
||||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||||
if ecHead := obj.ECHeader(); ecHead != nil {
|
if ecHead := obj.ECHeader(); ecHead != nil {
|
||||||
parentID := ecHead.Parent()
|
parentID := ecHead.Parent()
|
||||||
|
@ -300,14 +318,18 @@ func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr o
|
||||||
}
|
}
|
||||||
objKey = objectKey(parentID, objKey)
|
objKey = objectKey(parentID, objKey)
|
||||||
}
|
}
|
||||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si)
|
return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), bucketName, si)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
func updateSplitInfoIndex(tx *bbolt.Tx, bc *bucketCache, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
||||||
return updateUniqueIndexItem(tx, namedBucketItem{
|
bkt, err := getOrCreateBucket(tx, bc, cnr, getRootBucket, rootBucketName(cnr, bucketName), true)
|
||||||
name: rootBucketName(cnr, bucketName),
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return updateUniqueIndexItem(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
key: objKey,
|
key: objKey,
|
||||||
}, func(old, _ []byte) ([]byte, error) {
|
}, func(old, _ []byte) ([]byte, error) {
|
||||||
switch {
|
switch {
|
||||||
|
@ -328,78 +350,100 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
type updateIndexItemFunc = func(item bucketItem) error
|
||||||
|
|
||||||
func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
func updateListIndexes(tx *bbolt.Tx, bc *bucketCache, obj *objectSDK.Object, f updateIndexItemFunc, createBuckets bool) error {
|
||||||
idObj, _ := obj.ID()
|
idObj, _ := obj.ID()
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
idParent, ok := obj.ParentID()
|
idParent, ok := obj.ParentID()
|
||||||
|
|
||||||
// index parent ids
|
// index parent ids
|
||||||
if ok {
|
if ok {
|
||||||
err := f(tx, namedBucketItem{
|
bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets)
|
||||||
name: parentBucketName(cnr, bucketName),
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
key: objectKey(idParent, make([]byte, objectKeySize)),
|
key: objectKey(idParent, make([]byte, objectKeySize)),
|
||||||
val: objKey,
|
val: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// index split ids
|
// index split ids
|
||||||
if obj.SplitID() != nil {
|
if obj.SplitID() != nil {
|
||||||
err := f(tx, namedBucketItem{
|
bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets)
|
||||||
name: splitBucketName(cnr, bucketName),
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
key: obj.SplitID().ToV2(),
|
key: obj.SplitID().ToV2(),
|
||||||
val: objKey,
|
val: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ech := obj.ECHeader(); ech != nil {
|
if ech := obj.ECHeader(); ech != nil {
|
||||||
err := f(tx, namedBucketItem{
|
bkt, err := getOrCreateBucket(tx, bc, cnr, getECInfoBucket, ecInfoBucketName(cnr, bucketName), createBuckets)
|
||||||
name: ecInfoBucketName(cnr, bucketName),
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||||
val: objKey,
|
val: objKey,
|
||||||
})
|
}); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if ech.ParentSplitID() != nil {
|
if ech.ParentSplitID() != nil {
|
||||||
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
bkt, err := getOrCreateBucket(tx, bc, cnr, getSplitBucket, splitBucketName(cnr, bucketName), createBuckets)
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: splitBucketName(cnr, bucketName),
|
|
||||||
key: ech.ParentSplitID().ToV2(),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := f(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
|
key: ech.ParentSplitID().ToV2(),
|
||||||
|
val: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
||||||
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
bkt, err := getOrCreateBucket(tx, bc, cnr, getParentBucket, parentBucketName(cnr, bucketName), createBuckets)
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: parentBucketName(cnr, bucketName),
|
|
||||||
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := f(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
|
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
|
||||||
|
val: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getOrCreateBucket(tx *bbolt.Tx, bc *bucketCache, cnr cid.ID, getter func(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket,
|
||||||
|
bucketName []byte, create bool,
|
||||||
|
) (*bbolt.Bucket, error) {
|
||||||
|
bkt := getter(bc, tx, cnr)
|
||||||
|
if bkt == nil && create {
|
||||||
|
return createBucketLikelyExists(tx, bucketName)
|
||||||
|
}
|
||||||
|
return bkt, nil
|
||||||
|
}
|
||||||
|
|
||||||
var indexedAttributes = map[string]struct{}{
|
var indexedAttributes = map[string]struct{}{
|
||||||
"S3-Access-Box-CRDT-Name": {},
|
"S3-Access-Box-CRDT-Name": {},
|
||||||
objectSDK.AttributeFilePath: {},
|
objectSDK.AttributeFilePath: {},
|
||||||
|
@ -411,7 +455,9 @@ func IsAtrributeIndexed(attr string) bool {
|
||||||
return found
|
return found
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
type updateFKBTIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
||||||
|
|
||||||
|
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateFKBTIndexItemFunc) error {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
objKey := objectKey(id, make([]byte, objectKeySize))
|
objKey := objectKey(id, make([]byte, objectKeySize))
|
||||||
|
@ -471,11 +517,9 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
|
||||||
return tx.CreateBucket(name)
|
return tx.CreateBucket(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
func updateUniqueIndexItem(item bucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt := item.bucket
|
||||||
if err != nil {
|
assert.True(bkt != nil, bucketNilAsserMsg)
|
||||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := update(bkt.Get(item.key), item.val)
|
data, err := update(bkt.Get(item.key), item.val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -484,8 +528,8 @@ func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldDa
|
||||||
return bkt.Put(item.key, data)
|
return bkt.Put(item.key, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putUniqueIndexItem(item bucketItem) error {
|
||||||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
return updateUniqueIndexItem(item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||||
}
|
}
|
||||||
|
|
||||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
|
@ -502,11 +546,9 @@ func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
return fkbtRoot.Put(item.val, zeroValue)
|
return fkbtRoot.Put(item.val, zeroValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putListIndexItem(item bucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt := item.bucket
|
||||||
if err != nil {
|
assert.True(bkt != nil, bucketNilAsserMsg)
|
||||||
return fmt.Errorf("create index %v: %w", item.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lst, err := decodeList(bkt.Get(item.key))
|
lst, err := decodeList(bkt.Get(item.key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -595,9 +637,9 @@ func getVarUint(data []byte) (uint64, int, error) {
|
||||||
|
|
||||||
// setStorageID for existing objects if they were moved from one
|
// setStorageID for existing objects if they were moved from one
|
||||||
// storage location to another.
|
// storage location to another.
|
||||||
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
|
func setStorageID(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, id []byte, override bool) error {
|
||||||
key := make([]byte, bucketKeySize)
|
key := make([]byte, bucketKeySize)
|
||||||
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
bkt, err := getOrCreateBucket(tx, bc, addr.Container(), getSmallBucket, smallBucketName(addr.Container(), key), true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -610,9 +652,9 @@ func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) erro
|
||||||
|
|
||||||
// updateSpliInfo for existing objects if storage filled with extra information
|
// updateSpliInfo for existing objects if storage filled with extra information
|
||||||
// about last object in split hierarchy or linking object.
|
// about last object in split hierarchy or linking object.
|
||||||
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
|
func updateSplitInfo(tx *bbolt.Tx, bc *bucketCache, addr oid.Address, from *objectSDK.SplitInfo) error {
|
||||||
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
||||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
return updateSplitInfoIndex(tx, bc, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
||||||
}
|
}
|
||||||
|
|
||||||
// splitInfoFromObject returns split info based on last or linkin object.
|
// splitInfoFromObject returns split info based on last or linkin object.
|
||||||
|
|
|
@ -413,7 +413,7 @@ func (db *DB) selectObjectID(
|
||||||
addr.SetObject(id)
|
addr.SetObject(id)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
|
ok, _, err := db.exists(tx, nil, addr, oid.Address{}, currEpoch)
|
||||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||||
raw := make([]byte, objectKeySize)
|
raw := make([]byte, objectKeySize)
|
||||||
id.Encode(raw)
|
id.Encode(raw)
|
||||||
|
|
|
@ -126,8 +126,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
||||||
return res, ErrReadOnlyMode
|
return res, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.Batch(func(tx *bbolt.Tx, bc *bucketCache) error {
|
||||||
return setStorageID(tx, prm.addr, prm.id, true)
|
return setStorageID(tx, bc, prm.addr, prm.id, true)
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
|
|
|
@ -168,20 +168,7 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||||
if err := putUniqueIndexItem(tx, namedBucketItem{
|
return saveObjectExpirationEpoch(tx, obj)
|
||||||
name: expEpochToObjectBucketName,
|
|
||||||
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
|
||||||
val: zeroValue,
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
val := make([]byte, epochSize)
|
|
||||||
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
|
||||||
return putUniqueIndexItem(tx, namedBucketItem{
|
|
||||||
name: objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)),
|
|
||||||
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
|
||||||
val: val,
|
|
||||||
})
|
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -201,6 +188,31 @@ func createExpirationEpochBuckets(ctx context.Context, db *bbolt.DB, log func(a
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func saveObjectExpirationEpoch(tx *bbolt.Tx, obj objectIDToExpEpoch) error {
|
||||||
|
bkt, err := createBucketLikelyExists(tx, expEpochToObjectBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := putUniqueIndexItem(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
|
key: expirationEpochKey(obj.expirationEpoch, obj.containerID, obj.objectID),
|
||||||
|
val: zeroValue,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bkt, err = createBucketLikelyExists(tx, objectToExpirationEpochBucketName(obj.containerID, make([]byte, bucketKeySize)))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
val := make([]byte, epochSize)
|
||||||
|
binary.LittleEndian.PutUint64(val, obj.expirationEpoch)
|
||||||
|
return putUniqueIndexItem(bucketItem{
|
||||||
|
bucket: bkt,
|
||||||
|
key: objectKey(obj.objectID, make([]byte, objectKeySize)),
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
|
func selectObjectsWithExpirationEpoch(ctx context.Context, db *bbolt.DB, objects chan objectIDToExpEpoch) error {
|
||||||
defer close(objects)
|
defer close(objects)
|
||||||
|
|
||||||
|
|
|
@ -277,24 +277,22 @@ func objectKey(obj oid.ID, key []byte) []byte {
|
||||||
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
||||||
//
|
//
|
||||||
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
||||||
func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
|
func firstIrregularObjectType(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
|
||||||
if len(objs) == 0 {
|
if len(objs) == 0 {
|
||||||
panic("empty object list in firstIrregularObjectType")
|
panic("empty object list in firstIrregularObjectType")
|
||||||
}
|
}
|
||||||
|
|
||||||
var keys [2][1 + cidSize]byte
|
|
||||||
|
|
||||||
irregularTypeBuckets := [...]struct {
|
irregularTypeBuckets := [...]struct {
|
||||||
typ objectSDK.Type
|
typ objectSDK.Type
|
||||||
name []byte
|
bkt *bbolt.Bucket
|
||||||
}{
|
}{
|
||||||
{objectSDK.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])},
|
{objectSDK.TypeTombstone, getTombstoneBucket(bc, tx, idCnr)},
|
||||||
{objectSDK.TypeLock, bucketNameLockers(idCnr, keys[1][:])},
|
{objectSDK.TypeLock, getLockersBucket(bc, tx, idCnr)},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range objs {
|
for i := range objs {
|
||||||
for j := range irregularTypeBuckets {
|
for j := range irregularTypeBuckets {
|
||||||
if inBucket(tx, irregularTypeBuckets[j].name, objs[i]) {
|
if inBucket(irregularTypeBuckets[j].bkt, objs[i]) {
|
||||||
return irregularTypeBuckets[j].typ
|
return irregularTypeBuckets[j].typ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -304,8 +302,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object
|
||||||
}
|
}
|
||||||
|
|
||||||
// return true if provided object is of LOCK type.
|
// return true if provided object is of LOCK type.
|
||||||
func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
func isLockObject(tx *bbolt.Tx, bc *bucketCache, idCnr cid.ID, obj oid.ID) bool {
|
||||||
return inBucket(tx,
|
return inBucket(getLockersBucket(bc, tx, idCnr),
|
||||||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
|
||||||
objectKey(obj, make([]byte, objectKeySize)))
|
objectKey(obj, make([]byte, objectKeySize)))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue