metabase: Drop search-only indexes #1323
14 changed files with 373 additions and 429 deletions
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
This file describes changes between the metabase versions.
|
This file describes changes between the metabase versions.
|
||||||
|
|
||||||
|
Warning: database schema below is outdated and incomplete, see source code.
|
||||||
|
|
||||||
## Current
|
## Current
|
||||||
|
|
||||||
### Primary buckets
|
### Primary buckets
|
||||||
|
@ -86,6 +88,11 @@ This file describes changes between the metabase versions.
|
||||||
|
|
||||||
# History
|
# History
|
||||||
|
|
||||||
|
## Version 3
|
||||||
|
|
||||||
|
- Payload hash, owner ID and FKBT buckets deleted
|
||||||
|
- Expiration epoch to object ID and object ID to expiration epoch added
|
||||||
|
|
||||||
## Version 2
|
## Version 2
|
||||||
|
|
||||||
- Container ID is encoded as 32-byte slice
|
- Container ID is encoded as 32-byte slice
|
||||||
|
|
|
@ -29,6 +29,7 @@ var (
|
||||||
string(garbageBucketName): {},
|
string(garbageBucketName): {},
|
||||||
string(shardInfoBucket): {},
|
string(shardInfoBucket): {},
|
||||||
string(bucketNameLocked): {},
|
string(bucketNameLocked): {},
|
||||||
|
string(expEpochToObjectBucketName): {},
|
||||||
}
|
}
|
||||||
|
|
||||||
// deprecatedBuckets buckets that are not used anymore.
|
// deprecatedBuckets buckets that are not used anymore.
|
||||||
|
|
|
@ -341,11 +341,6 @@ func (db *DB) deleteObject(
|
||||||
return fmt.Errorf("can't remove list indexes: %w", err)
|
return fmt.Errorf("can't remove list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if isParent {
|
if isParent {
|
||||||
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
|
@ -386,21 +381,6 @@ func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
bkt := tx.Bucket(item.name)
|
|
||||||
if bkt == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fkbtRoot := bkt.Bucket(item.key)
|
|
||||||
if fkbtRoot == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = fkbtRoot.Delete(item.val) // ignore error, best effort there
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt := tx.Bucket(item.name)
|
bkt := tx.Bucket(item.name)
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
|
@ -478,6 +458,17 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
||||||
key: objKey,
|
key: objKey,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
||||||
|
delUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: expEpochToObjectBucketName,
|
||||||
|
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
|
||||||
|
})
|
||||||
|
delUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
|
||||||
|
key: objKey,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -418,7 +418,8 @@ func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||||
if bytes.Equal(name, shardInfoBucket) ||
|
if bytes.Equal(name, shardInfoBucket) ||
|
||||||
bytes.Equal(name, containerCounterBucketName) ||
|
bytes.Equal(name, containerCounterBucketName) ||
|
||||||
bytes.Equal(name, containerVolumeBucketName) {
|
bytes.Equal(name, containerVolumeBucketName) ||
|
||||||
|
bytes.Equal(name, expEpochToObjectBucketName) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return testBucketEmpty(name, b)
|
return testBucketEmpty(name, b)
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -96,7 +95,11 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
|
||||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||||
}
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
st, err := objectStatus(tx, addr, currEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
switch st {
|
||||||
case 1:
|
case 1:
|
||||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
case 2:
|
case 2:
|
||||||
|
@ -138,30 +141,25 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
|
||||||
// - 1 if object with GC mark;
|
// - 1 if object with GC mark;
|
||||||
// - 2 if object is covered with tombstone;
|
// - 2 if object is covered with tombstone;
|
||||||
// - 3 if object is expired.
|
// - 3 if object is expired.
|
||||||
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
|
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (uint8, error) {
|
||||||
// locked object could not be removed/marked with GC/expired
|
// locked object could not be removed/marked with GC/expired
|
||||||
if objectLocked(tx, addr.Container(), addr.Object()) {
|
if objectLocked(tx, addr.Container(), addr.Object()) {
|
||||||
return 0
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// we check only if the object is expired in the current
|
expired, err := isExpired(tx, addr, currEpoch)
|
||||||
// epoch since it is considered the only corner case: the
|
if err != nil {
|
||||||
// GC is expected to collect all the objects that have
|
return 0, err
|
||||||
// expired previously for less than the one epoch duration
|
|
||||||
|
|
||||||
expired := isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpoch, addr, currEpoch)
|
|
||||||
if !expired {
|
|
||||||
expired = isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpochNeoFS, addr, currEpoch)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if expired {
|
if expired {
|
||||||
return 3
|
return 3, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||||
garbageBkt := tx.Bucket(garbageBucketName)
|
garbageBkt := tx.Bucket(garbageBucketName)
|
||||||
addrKey := addressKey(addr, make([]byte, addressKeySize))
|
addrKey := addressKey(addr, make([]byte, addressKeySize))
|
||||||
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)
|
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
|
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
|
||||||
|
|
|
@ -2,12 +2,11 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -17,6 +16,8 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errInvalidEpochValueLength = errors.New("could not parse expiration epoch: invalid data length")
|
||||||
|
|
||||||
// FilterExpired return expired items from addresses.
|
// FilterExpired return expired items from addresses.
|
||||||
// Address considered expired if metabase does contain information about expiration and
|
// Address considered expired if metabase does contain information about expiration and
|
||||||
// expiration epoch is less than epoch.
|
// expiration epoch is less than epoch.
|
||||||
|
@ -57,29 +58,11 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
expiredNeoFS, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpochNeoFS, epoch, containerID, objectIDs)
|
expired, err := selectExpiredObjects(tx, epoch, containerID, objectIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
result = append(result, expired...)
|
||||||
expiredSys, err := selectExpiredObjectIDs(tx, objectV2.SysAttributeExpEpoch, epoch, containerID, objectIDs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, o := range expiredNeoFS {
|
|
||||||
var a oid.Address
|
|
||||||
a.SetContainer(containerID)
|
|
||||||
a.SetObject(o)
|
|
||||||
result = append(result, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, o := range expiredSys {
|
|
||||||
var a oid.Address
|
|
||||||
a.SetContainer(containerID)
|
|
||||||
a.SetObject(o)
|
|
||||||
result = append(result, a)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -90,76 +73,39 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isExpiredWithAttribute(tx *bbolt.Tx, attr string, addr oid.Address, currEpoch uint64) bool {
|
func isExpired(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (bool, error) {
|
||||||
// bucket with objects that have expiration attr
|
bucketName := make([]byte, bucketKeySize)
|
||||||
attrKey := make([]byte, bucketKeySize+len(attr))
|
bucketName = objectToExpirationEpochBucketName(addr.Container(), bucketName)
|
||||||
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), attr, attrKey))
|
b := tx.Bucket(bucketName)
|
||||||
if expirationBucket != nil {
|
if b == nil {
|
||||||
// bucket that contains objects that expire in the current epoch
|
return false, nil
|
||||||
prevEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch-1, 10)))
|
|
||||||
if prevEpochBkt != nil {
|
|
||||||
rawOID := objectKey(addr.Object(), make([]byte, objectKeySize))
|
|
||||||
if prevEpochBkt.Get(rawOID) != nil {
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
key := make([]byte, objectKeySize)
|
||||||
|
addr.Object().Encode(key)
|
||||||
|
val := b.Get(key)
|
||||||
|
if len(val) == 0 {
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
if len(val) != epochSize {
|
||||||
|
return false, errInvalidEpochValueLength
|
||||||
}
|
}
|
||||||
|
expEpoch := binary.LittleEndian.Uint64(val)
|
||||||
return false
|
return expEpoch < currEpoch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.ID, error) {
|
func selectExpiredObjects(tx *bbolt.Tx, epoch uint64, containerID cid.ID, objectIDs []oid.ID) ([]oid.Address, error) {
|
||||||
result := make([]oid.ID, 0)
|
result := make([]oid.Address, 0)
|
||||||
notResolved := make(map[oid.ID]struct{})
|
var addr oid.Address
|
||||||
for _, oid := range objectIDs {
|
addr.SetContainer(containerID)
|
||||||
notResolved[oid] = struct{}{}
|
for _, objID := range objectIDs {
|
||||||
}
|
addr.SetObject(objID)
|
||||||
|
expired, err := isExpired(tx, addr, epoch)
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
expiredBuffer := make([]oid.ID, 0)
|
|
||||||
objectKeyBuffer := make([]byte, objectKeySize)
|
|
||||||
|
|
||||||
expirationBucketKey := make([]byte, bucketKeySize+len(attr))
|
|
||||||
expirationBucket := tx.Bucket(attributeBucketName(containerID, attr, expirationBucketKey))
|
|
||||||
if expirationBucket == nil {
|
|
||||||
return result, nil // all not expired
|
|
||||||
}
|
|
||||||
|
|
||||||
err := expirationBucket.ForEach(func(epochExpBucketKey, _ []byte) error {
|
|
||||||
bucketExpiresAfter, err := strconv.ParseUint(string(epochExpBucketKey), 10, 64)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
|
||||||
} else if bucketExpiresAfter >= epoch {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
epochExpirationBucket := expirationBucket.Bucket(epochExpBucketKey)
|
|
||||||
if epochExpirationBucket == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
expiredBuffer = expiredBuffer[:0]
|
|
||||||
for oid := range notResolved {
|
|
||||||
key := objectKey(oid, objectKeyBuffer)
|
|
||||||
if epochExpirationBucket.Get(key) != nil {
|
|
||||||
expiredBuffer = append(expiredBuffer, oid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, oid := range expiredBuffer {
|
|
||||||
delete(notResolved, oid)
|
|
||||||
result = append(result, oid)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(notResolved) == 0 {
|
|
||||||
return errBreakBucketForEach
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil && !errors.Is(err, errBreakBucketForEach) {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if expired {
|
||||||
|
result = append(result, addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,11 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
||||||
|
|
||||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||||
if checkStatus {
|
if checkStatus {
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
st, err := objectStatus(tx, addr, currEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch st {
|
||||||
case 1:
|
case 1:
|
||||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
case 2:
|
case 2:
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -79,63 +78,37 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
||||||
err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
b := tx.Bucket(expEpochToObjectBucketName)
|
||||||
cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch)
|
c := b.Cursor()
|
||||||
if cidBytes == nil {
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||||
cidBytes = cidFromAttributeBucket(name, objectV2.SysAttributeExpEpochNeoFS)
|
expiresAfter, cnr, obj, err := parseExpirationEpochKey(k)
|
||||||
if cidBytes == nil {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// bucket keys ordered by epoch, no need to continue lookup
|
||||||
|
if expiresAfter >= epoch {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if objectLocked(tx, cnr, obj) {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var cnrID cid.ID
|
|
||||||
err := cnrID.Decode(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not parse container ID of expired bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.ForEachBucket(func(expKey []byte) error {
|
|
||||||
bktExpired := b.Bucket(expKey)
|
|
||||||
expiresAfter, err := strconv.ParseUint(string(expKey), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not parse expiration epoch: %w", err)
|
|
||||||
} else if expiresAfter >= epoch {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return bktExpired.ForEach(func(idKey, _ []byte) error {
|
|
||||||
var id oid.ID
|
|
||||||
|
|
||||||
err = id.Decode(idKey)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not parse ID of expired object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ignore locked objects.
|
|
||||||
//
|
|
||||||
// To slightly optimize performance we can check only REGULAR objects
|
|
||||||
// (only they can be locked), but it's more reliable.
|
|
||||||
if objectLocked(tx, cnrID, id) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnrID)
|
addr.SetContainer(cnr)
|
||||||
addr.SetObject(id)
|
addr.SetObject(obj)
|
||||||
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
return h(&ExpiredObject{
|
err = h(&ExpiredObject{
|
||||||
typ: firstIrregularObjectType(tx, cnrID, idKey),
|
typ: firstIrregularObjectType(tx, cnr, objKey),
|
||||||
addr: addr,
|
addr: addr,
|
||||||
})
|
})
|
||||||
})
|
if err == nil {
|
||||||
})
|
continue
|
||||||
})
|
}
|
||||||
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
return nil
|
||||||
err = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateCoveredByTombstones iterates over all objects in DB which are covered
|
// IterateCoveredByTombstones iterates over all objects in DB which are covered
|
||||||
|
|
|
@ -6,8 +6,10 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
gio "io"
|
gio "io"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
@ -173,11 +175,6 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
return fmt.Errorf("can't put list indexes: %w", err)
|
return fmt.Errorf("can't put list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// update container volume size estimation
|
// update container volume size estimation
|
||||||
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||||
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
||||||
|
@ -195,46 +192,17 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func putUniqueIndexes(
|
func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
|
||||||
tx *bbolt.Tx,
|
|
||||||
obj *objectSDK.Object,
|
|
||||||
si *objectSDK.SplitInfo,
|
|
||||||
id []byte,
|
|
||||||
) error {
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
cnr := addr.Container()
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
// add value to primary unique bucket
|
|
||||||
if !isParent {
|
if !isParent {
|
||||||
switch obj.Type() {
|
err := putRawObjectData(tx, obj, bucketName, addr, objKey)
|
||||||
case objectSDK.TypeRegular:
|
|
||||||
bucketName = primaryBucketName(cnr, bucketName)
|
|
||||||
case objectSDK.TypeTombstone:
|
|
||||||
bucketName = tombstoneBucketName(cnr, bucketName)
|
|
||||||
case objectSDK.TypeLock:
|
|
||||||
bucketName = bucketNameLockers(cnr, bucketName)
|
|
||||||
default:
|
|
||||||
return ErrUnknownObjectType
|
|
||||||
}
|
|
||||||
|
|
||||||
rawObject, err := obj.CutPayload().Marshal()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't marshal object header: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
|
||||||
name: bucketName,
|
|
||||||
key: objKey,
|
|
||||||
val: rawObject,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// index storageID if it is present
|
|
||||||
if id != nil {
|
if id != nil {
|
||||||
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -242,7 +210,60 @@ func putUniqueIndexes(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// index root object
|
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return putSplitInfo(tx, obj, bucketName, addr, si, objKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
|
||||||
|
switch obj.Type() {
|
||||||
|
case objectSDK.TypeRegular:
|
||||||
|
bucketName = primaryBucketName(addr.Container(), bucketName)
|
||||||
|
case objectSDK.TypeTombstone:
|
||||||
|
bucketName = tombstoneBucketName(addr.Container(), bucketName)
|
||||||
|
case objectSDK.TypeLock:
|
||||||
|
bucketName = bucketNameLockers(addr.Container(), bucketName)
|
||||||
|
default:
|
||||||
|
return ErrUnknownObjectType
|
||||||
|
}
|
||||||
|
rawObject, err := obj.CutPayload().Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't marshal object header: %w", err)
|
||||||
|
}
|
||||||
|
return putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: bucketName,
|
||||||
|
key: objKey,
|
||||||
|
val: rawObject,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
|
||||||
|
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
||||||
|
err := putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: expEpochToObjectBucketName,
|
||||||
|
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
|
||||||
|
val: zeroValue,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
val := make([]byte, epochSize)
|
||||||
|
binary.LittleEndian.PutUint64(val, expEpoch)
|
||||||
|
err = putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)),
|
||||||
|
key: objKey,
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
|
||||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||||
if ecHead := obj.ECHeader(); ecHead != nil {
|
if ecHead := obj.ECHeader(); ecHead != nil {
|
||||||
parentID := ecHead.Parent()
|
parentID := ecHead.Parent()
|
||||||
|
@ -260,9 +281,8 @@ func putUniqueIndexes(
|
||||||
}
|
}
|
||||||
objKey = objectKey(parentID, objKey)
|
objKey = objectKey(parentID, objKey)
|
||||||
}
|
}
|
||||||
return updateSplitInfoIndex(tx, objKey, cnr, bucketName, si)
|
return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si)
|
||||||
}
|
}
|
||||||
|
|
||||||
fyrchik
commented
:( :(
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,18 +317,6 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
cs, _ := obj.PayloadChecksum()
|
|
||||||
|
|
||||||
// index payload hashes
|
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: payloadHashBucketName(cnr, bucketName),
|
|
||||||
key: cs.Value(),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
idParent, ok := obj.ParentID()
|
idParent, ok := obj.ParentID()
|
||||||
|
|
||||||
// index parent ids
|
// index parent ids
|
||||||
|
@ -373,43 +381,22 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
|
||||||
id, _ := obj.ID()
|
attributes := obj.Attributes()
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Is it ok if an object has an EC-header? The best way is to check Is it ok if an object has an EC-header? The best way is to check `object search` for EC-objects
dstepanov-yadro
commented
Thx, fixed Thx, fixed
|
|||||||
cnr, _ := obj.ContainerID()
|
if ech := obj.ECHeader(); ech != nil {
|
||||||
objKey := objectKey(id, make([]byte, objectKeySize))
|
attributes = ech.ParentAttributes()
|
||||||
|
|
||||||
key := make([]byte, bucketKeySize)
|
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: ownerBucketName(cnr, key),
|
|
||||||
key: []byte(obj.OwnerID().EncodeToString()),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
for _, attr := range attributes {
|
||||||
var attrs []objectSDK.Attribute
|
if attr.Key() == objectV2.SysAttributeExpEpochNeoFS {
|
||||||
if obj.ECHeader() != nil {
|
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
|
||||||
attrs = obj.ECHeader().ParentAttributes()
|
return expEpoch, err == nil
|
||||||
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
|
|
||||||
} else {
|
|
||||||
attrs = obj.Attributes()
|
|
||||||
}
|
}
|
||||||
|
if attr.Key() == objectV2.SysAttributeExpEpoch {
|
||||||
// user specified attributes
|
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
|
||||||
for i := range attrs {
|
return expEpoch, err == nil
|
||||||
key = attributeBucketName(cnr, attrs[i].Key(), key)
|
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: key,
|
|
||||||
key: []byte(attrs[i].Value()),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return 0, false
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type bucketContainer interface {
|
type bucketContainer interface {
|
||||||
|
@ -442,20 +429,6 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||||
}
|
}
|
||||||
|
|
||||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return fkbtRoot.Put(item.val, zeroValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -142,8 +142,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(cnr)
|
addr.SetContainer(cnr)
|
||||||
addr.SetObject(id)
|
addr.SetObject(id)
|
||||||
|
st, err := objectStatus(tx, addr, currEpoch)
|
||||||
if objectStatus(tx, addr, currEpoch) > 0 {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if st > 0 {
|
||||||
continue // ignore removed objects
|
continue // ignore removed objects
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,12 +198,6 @@ func (db *DB) selectFastFilter(
|
||||||
switch f.Header() {
|
switch f.Header() {
|
||||||
case v2object.FilterHeaderObjectID:
|
case v2object.FilterHeaderObjectID:
|
||||||
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
|
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
|
||||||
case v2object.FilterHeaderOwnerID:
|
|
||||||
bucketName := ownerBucketName(cnr, bucketName)
|
|
||||||
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
|
||||||
case v2object.FilterHeaderPayloadHash:
|
|
||||||
bucketName := payloadHashBucketName(cnr, bucketName)
|
|
||||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
|
||||||
case v2object.FilterHeaderObjectType:
|
case v2object.FilterHeaderObjectType:
|
||||||
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
||||||
selectAllFromBucket(tx, bucketName, to, fNum)
|
selectAllFromBucket(tx, bucketName, to, fNum)
|
||||||
|
@ -220,14 +217,7 @@ func (db *DB) selectFastFilter(
|
||||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
||||||
default: // user attribute
|
default:
|
||||||
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
|
||||||
|
|
||||||
if f.Operation() == objectSDK.MatchNotPresent {
|
|
||||||
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
|
|
||||||
} else {
|
|
||||||
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,16 +227,6 @@ var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
|
||||||
v2object.TypeLock.String(): {bucketNameLockers},
|
v2object.TypeLock.String(): {bucketNameLockers},
|
||||||
}
|
}
|
||||||
|
|
||||||
func allBucketNames(cnr cid.ID) (names [][]byte) {
|
|
||||||
for _, fns := range mBucketNaming {
|
|
||||||
for _, fn := range fns {
|
|
||||||
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
|
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
|
||||||
appendNames := func(key string) {
|
appendNames := func(key string) {
|
||||||
fns, ok := mBucketNaming[key]
|
fns, ok := mBucketNaming[key]
|
||||||
|
@ -278,83 +258,6 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectFromList looks into <fkbt> index to find list of addresses to add in
|
|
||||||
// resulting cache.
|
|
||||||
func (db *DB) selectFromFKBT(
|
|
||||||
tx *bbolt.Tx,
|
|
||||||
name []byte, // fkbt root bucket name
|
|
||||||
f objectSDK.SearchFilter, // filter for operation and value
|
|
||||||
to map[string]int, // resulting cache
|
|
||||||
fNum int, // index of filter
|
|
||||||
) { //
|
|
||||||
matchFunc, ok := db.matchers[f.Operation()]
|
|
||||||
if !ok {
|
|
||||||
db.log.Debug(logs.MetabaseMissingMatcher, zap.Uint32("operation", uint32(f.Operation())))
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fkbtRoot := tx.Bucket(name)
|
|
||||||
if fkbtRoot == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
|
|
||||||
fkbtLeaf := fkbtRoot.Bucket(k)
|
|
||||||
if fkbtLeaf == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fkbtLeaf.ForEach(func(k, _ []byte) error {
|
|
||||||
markAddressInCache(to, fNum, string(k))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
|
|
||||||
// resulting cache.
|
|
||||||
func selectOutsideFKBT(
|
|
||||||
tx *bbolt.Tx,
|
|
||||||
incl [][]byte, // buckets
|
|
||||||
name []byte, // fkbt root bucket name
|
|
||||||
to map[string]int, // resulting cache
|
|
||||||
fNum int, // index of filter
|
|
||||||
) {
|
|
||||||
mExcl := make(map[string]struct{})
|
|
||||||
|
|
||||||
bktExcl := tx.Bucket(name)
|
|
||||||
if bktExcl != nil {
|
|
||||||
_ = bktExcl.ForEachBucket(func(k []byte) error {
|
|
||||||
exclBktLeaf := bktExcl.Bucket(k)
|
|
||||||
return exclBktLeaf.ForEach(func(k, _ []byte) error {
|
|
||||||
mExcl[string(k)] = struct{}{}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range incl {
|
|
||||||
bktIncl := tx.Bucket(incl[i])
|
|
||||||
if bktIncl == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = bktIncl.ForEach(func(k, _ []byte) error {
|
|
||||||
if _, ok := mExcl[string(k)]; !ok {
|
|
||||||
markAddressInCache(to, fNum, string(k))
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectFromList looks into <list> index to find list of addresses to add in
|
// selectFromList looks into <list> index to find list of addresses to add in
|
||||||
// resulting cache.
|
// resulting cache.
|
||||||
func (db *DB) selectFromList(
|
func (db *DB) selectFromList(
|
||||||
|
@ -491,13 +394,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range f {
|
for i := range f {
|
||||||
matchFunc, ok := db.matchers[f[i].Operation()]
|
|
||||||
if !ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
var data []byte
|
var data []byte
|
||||||
|
|
||||||
switch f[i].Header() {
|
switch f[i].Header() {
|
||||||
case v2object.FilterHeaderVersion:
|
case v2object.FilterHeaderVersion:
|
||||||
data = []byte(obj.Version().String())
|
data = []byte(obj.Version().String())
|
||||||
|
@ -510,8 +407,23 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
||||||
case v2object.FilterHeaderPayloadLength:
|
case v2object.FilterHeaderPayloadLength:
|
||||||
data = make([]byte, 8)
|
data = make([]byte, 8)
|
||||||
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
|
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
|
||||||
default:
|
case v2object.FilterHeaderOwnerID:
|
||||||
continue // ignore unknown search attributes
|
data = []byte(obj.OwnerID().EncodeToString())
|
||||||
|
case v2object.FilterHeaderPayloadHash:
|
||||||
|
cs, _ := obj.PayloadChecksum()
|
||||||
|
data = cs.Value()
|
||||||
|
default: // user attribute
|
||||||
|
v, ok := attributeValue(obj, f[i].Header())
|
||||||
|
if ok {
|
||||||
|
data = []byte(v)
|
||||||
|
} else {
|
||||||
|
return f[i].Operation() == objectSDK.MatchNotPresent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
matchFunc, ok := db.matchers[f[i].Operation()]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
|
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
|
||||||
|
@ -522,6 +434,19 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
||||||
|
objectAttributes := obj.Attributes()
|
||||||
|
if ech := obj.ECHeader(); ech != nil {
|
||||||
|
objectAttributes = ech.ParentAttributes()
|
||||||
|
}
|
||||||
|
for _, attr := range objectAttributes {
|
||||||
|
if attr.Key() == attribute {
|
||||||
|
return attr.Value(), true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
// groupFilters divides filters in two groups: fast and slow. Fast filters
|
// groupFilters divides filters in two groups: fast and slow. Fast filters
|
||||||
// processed by indexes and slow filters processed after by unmarshaling
|
// processed by indexes and slow filters processed after by unmarshaling
|
||||||
// object headers.
|
// object headers.
|
||||||
|
@ -540,14 +465,17 @@ func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
res.withCnrFilter = true
|
res.withCnrFilter = true
|
||||||
case // slow filters
|
case // fast filters
|
||||||
v2object.FilterHeaderVersion,
|
v2object.FilterHeaderObjectID,
|
||||||
v2object.FilterHeaderCreationEpoch,
|
v2object.FilterHeaderObjectType,
|
||||||
v2object.FilterHeaderPayloadLength,
|
v2object.FilterHeaderParent,
|
||||||
v2object.FilterHeaderHomomorphicHash:
|
v2object.FilterHeaderSplitID,
|
||||||
res.slowFilters = append(res.slowFilters, filters[i])
|
v2object.FilterHeaderECParent,
|
||||||
default: // fast filters or user attributes if unknown
|
v2object.FilterPropertyRoot,
|
||||||
|
v2object.FilterPropertyPhy:
|
||||||
res.fastFilters = append(res.fastFilters, filters[i])
|
res.fastFilters = append(res.fastFilters, filters[i])
|
||||||
|
default:
|
||||||
|
res.slowFilters = append(res.slowFilters, filters[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -633,6 +633,112 @@ func TestDB_SelectObjectID(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDB_SelectOwnerID(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
db := newDB(t)
|
||||||
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
// prepare
|
||||||
|
|
||||||
|
parent := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
|
regular := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
idParent, _ := parent.ID()
|
||||||
|
regular.SetParentID(idParent)
|
||||||
|
regular.SetParent(parent)
|
||||||
|
|
||||||
|
err := putBig(db, regular)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ts := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
ts.SetType(objectSDK.TypeTombstone)
|
||||||
|
err = putBig(db, ts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
lock := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
lock.SetType(objectSDK.TypeLock)
|
||||||
|
err = putBig(db, lock)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("not found objects", func(t *testing.T) {
|
||||||
|
raw := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
|
fs := objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, raw.OwnerID())
|
||||||
|
|
||||||
|
testSelect(t, db, cnr, fs)
|
||||||
|
|
||||||
|
fs = objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, raw.OwnerID())
|
||||||
|
|
||||||
|
testSelect(t, db, cnr, fs,
|
||||||
|
object.AddressOf(regular),
|
||||||
|
object.AddressOf(parent),
|
||||||
|
object.AddressOf(ts),
|
||||||
|
object.AddressOf(lock),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("regular objects", func(t *testing.T) {
|
||||||
|
fs := objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, regular.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs, object.AddressOf(regular))
|
||||||
|
|
||||||
|
fs = objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, regular.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs,
|
||||||
|
object.AddressOf(parent),
|
||||||
|
object.AddressOf(ts),
|
||||||
|
object.AddressOf(lock),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("tombstone objects", func(t *testing.T) {
|
||||||
|
fs := objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, ts.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs, object.AddressOf(ts))
|
||||||
|
|
||||||
|
fs = objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, ts.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs,
|
||||||
|
object.AddressOf(regular),
|
||||||
|
object.AddressOf(parent),
|
||||||
|
object.AddressOf(lock),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("parent objects", func(t *testing.T) {
|
||||||
|
fs := objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, parent.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs, object.AddressOf(parent))
|
||||||
|
|
||||||
|
fs = objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, parent.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs,
|
||||||
|
object.AddressOf(regular),
|
||||||
|
object.AddressOf(ts),
|
||||||
|
object.AddressOf(lock),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("lock objects", func(t *testing.T) {
|
||||||
|
fs := objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, lock.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs, object.AddressOf(lock))
|
||||||
|
|
||||||
|
fs = objectSDK.SearchFilters{}
|
||||||
|
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, lock.OwnerID())
|
||||||
|
testSelect(t, db, cnr, fs,
|
||||||
|
object.AddressOf(regular),
|
||||||
|
object.AddressOf(parent),
|
||||||
|
object.AddressOf(ts),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
type testTarget struct {
|
type testTarget struct {
|
||||||
objects []*objectSDK.Object
|
objects []*objectSDK.Object
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -23,6 +24,7 @@ var (
|
||||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
toMoveItBucketName = []byte{toMoveItPrefix}
|
||||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
containerVolumeBucketName = []byte{containerVolumePrefix}
|
||||||
containerCounterBucketName = []byte{containerCountersPrefix}
|
containerCounterBucketName = []byte{containerCountersPrefix}
|
||||||
|
expEpochToObjectBucketName = []byte{expirationEpochToObjectPrefix}
|
||||||
|
|
||||||
zeroValue = []byte{0xFF}
|
zeroValue = []byte{0xFF}
|
||||||
|
|
||||||
|
@ -89,23 +91,23 @@ const (
|
||||||
// FKBT index buckets.
|
// FKBT index buckets.
|
||||||
// ====================
|
// ====================
|
||||||
|
|
||||||
// ownerPrefix is used for prefixing FKBT index buckets mapping owner to object IDs.
|
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
|
||||||
// Key: owner ID
|
// Key: owner ID
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
ownerPrefix
|
_
|
||||||
// userAttributePrefix is used for prefixing FKBT index buckets containing objects.
|
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||||
// Key: attribute value
|
// Key: attribute value
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
userAttributePrefix
|
_
|
||||||
|
|
||||||
// ====================
|
// ====================
|
||||||
// List index buckets.
|
// List index buckets.
|
||||||
// ====================
|
// ====================
|
||||||
|
|
||||||
// payloadHashPrefix is used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
// payloadHashPrefix was used for prefixing List index buckets mapping payload hash to a list of object IDs.
|
||||||
// Key: payload hash
|
// Key: payload hash
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
payloadHashPrefix
|
_
|
||||||
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
// parentPrefix is used for prefixing List index buckets mapping parent ID to a list of children IDs.
|
||||||
// Key: parent ID
|
// Key: parent ID
|
||||||
// Value: list of object IDs
|
// Value: list of object IDs
|
||||||
|
@ -124,6 +126,16 @@ const (
|
||||||
// Key: container ID + type
|
// Key: container ID + type
|
||||||
// Value: Object id
|
// Value: Object id
|
||||||
ecInfoPrefix
|
ecInfoPrefix
|
||||||
|
|
||||||
|
// expirationEpochToObjectPrefix is used for storing relation between expiration epoch and object id.
|
||||||
|
// Key: expiration epoch + object address
|
||||||
|
// Value: zero
|
||||||
|
expirationEpochToObjectPrefix
|
||||||
|
|
||||||
|
// objectToExpirationEpochPrefix is used for storing relation between expiration epoch and object id.
|
||||||
|
// Key: object address
|
||||||
|
// Value: expiration epoch
|
||||||
|
objectToExpirationEpochPrefix
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -131,6 +143,7 @@ const (
|
||||||
bucketKeySize = 1 + cidSize
|
bucketKeySize = 1 + cidSize
|
||||||
objectKeySize = sha256.Size
|
objectKeySize = sha256.Size
|
||||||
addressKeySize = cidSize + objectKeySize
|
addressKeySize = cidSize + objectKeySize
|
||||||
|
epochSize = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
func bucketName(cnr cid.ID, prefix byte, key []byte) []byte {
|
func bucketName(cnr cid.ID, prefix byte, key []byte) []byte {
|
||||||
|
@ -154,37 +167,11 @@ func smallBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, smallPrefix, key)
|
return bucketName(cnr, smallPrefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// attributeBucketName returns <CID>_attr_<attributeKey>.
|
|
||||||
func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
|
||||||
key[0] = userAttributePrefix
|
|
||||||
cnr.Encode(key[1:])
|
|
||||||
return append(key[:bucketKeySize], attributeKey...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns <CID> from attributeBucketName result, nil otherwise.
|
|
||||||
func cidFromAttributeBucket(val []byte, attributeKey string) []byte {
|
|
||||||
if len(val) < bucketKeySize || val[0] != userAttributePrefix || !bytes.Equal(val[bucketKeySize:], []byte(attributeKey)) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return val[1:bucketKeySize]
|
|
||||||
}
|
|
||||||
|
|
||||||
// payloadHashBucketName returns <CID>_payloadhash.
|
|
||||||
func payloadHashBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, payloadHashPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// rootBucketName returns <CID>_root.
|
// rootBucketName returns <CID>_root.
|
||||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, rootPrefix, key)
|
return bucketName(cnr, rootPrefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ownerBucketName returns <CID>_ownerid.
|
|
||||||
func ownerBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, ownerPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// parentBucketName returns <CID>_parent.
|
// parentBucketName returns <CID>_parent.
|
||||||
func parentBucketName(cnr cid.ID, key []byte) []byte {
|
func parentBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, parentPrefix, key)
|
return bucketName(cnr, parentPrefix, key)
|
||||||
|
@ -200,6 +187,35 @@ func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, ecInfoPrefix, key)
|
return bucketName(cnr, ecInfoPrefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// objectToExpirationEpochBucketName returns objectToExpirationEpochPrefix_<CID>.
|
||||||
|
func objectToExpirationEpochBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
|
return bucketName(cnr, objectToExpirationEpochPrefix, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func expirationEpochKey(epoch uint64, cnr cid.ID, obj oid.ID) []byte {
|
||||||
|
result := make([]byte, epochSize+addressKeySize)
|
||||||
|
binary.BigEndian.PutUint64(result, epoch)
|
||||||
|
cnr.Encode(result[epochSize:])
|
||||||
|
obj.Encode(result[epochSize+cidSize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseExpirationEpochKey(key []byte) (uint64, cid.ID, oid.ID, error) {
|
||||||
|
if len(key) != epochSize+addressKeySize {
|
||||||
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("unexpected expiration epoch to object key length: %d", len(key))
|
||||||
|
}
|
||||||
|
epoch := binary.BigEndian.Uint64(key)
|
||||||
|
var cnr cid.ID
|
||||||
|
if err := cnr.Decode(key[epochSize : epochSize+cidSize]); err != nil {
|
||||||
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (container ID): %w", err)
|
||||||
|
}
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(key[epochSize+cidSize:]); err != nil {
|
||||||
|
return 0, cid.ID{}, oid.ID{}, fmt.Errorf("failed to decode expiration epoch to object key (object ID): %w", err)
|
||||||
|
}
|
||||||
|
return epoch, cnr, obj, nil
|
||||||
|
}
|
||||||
|
|
||||||
// addressKey returns key for K-V tables when key is a whole address.
|
// addressKey returns key for K-V tables when key is a whole address.
|
||||||
func addressKey(addr oid.Address, key []byte) []byte {
|
func addressKey(addr oid.Address, key []byte) []byte {
|
||||||
addr.Container().Encode(key)
|
addr.Container().Encode(key)
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// version contains current metabase version.
|
// version contains current metabase version.
|
||||||
const version = 2
|
const version = 3
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Could you also change Could you also change `VERSION.md` accordingly?
dstepanov-yadro
commented
Done Done
|
|||||||
|
|
||||||
var versionKey = []byte("version")
|
var versionKey = []byte("version")
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(objectCore.AddressOf(obj))
|
getPrm.SetAddress(objectCore.AddressOf(obj))
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted")
|
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired object must be deleted")
|
||||||
fyrchik
commented
How come this test passed before? How come this test passed before?
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
|
@ -168,7 +168,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
||||||
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires")
|
require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {
|
func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue
On a side-note, we allocate a slice here, only to convert it later from
[]oid.ID
to[]oid.Address
. Maybe return[]oid.Address
from this function?fixed