[#9999] metabase: Fix db engine to pebble in exists.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
1aaf3346ae
commit
b8599d9826
9 changed files with 795 additions and 237 deletions
|
@ -26,7 +26,10 @@ var (
|
|||
|
||||
var (
|
||||
errInvalidKeyLenght = errors.New("invalid key length")
|
||||
errInvalidKeyPrefix = errors.New("invalid key prefix")
|
||||
errInvalidValueLenght = errors.New("invalid value length")
|
||||
errInvalidContainerIDValue = errors.New("invalid container ID value")
|
||||
errInvalidAttributeKey = errors.New("invalid userr attribute key")
|
||||
)
|
||||
|
||||
type objectType uint8
|
||||
|
|
|
@ -188,7 +188,7 @@ func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f fun
|
|||
if !ok {
|
||||
return nil
|
||||
}
|
||||
data, err := getSafe(r, val)
|
||||
data, err := valueSafe(r, val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -13,6 +13,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -81,8 +82,8 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
||||
err = db.snapshot(func(s *pebble.Snapshot) error {
|
||||
res.exists, res.locked, err = db.exists(ctx, s, prm.addr, prm.paddr, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -90,13 +91,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
func (db *DB) exists(ctx context.Context, r pebble.Reader, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||
var locked bool
|
||||
var err error
|
||||
if !parent.Equals(oid.Address{}) {
|
||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
||||
locked, err = objectLocked(ctx, r, parent.Container(), parent.Object())
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
}
|
||||
// check graveyard and object expiration first
|
||||
switch objectStatus(tx, addr, currEpoch) {
|
||||
st, err := objectStatus(ctx, r, addr, currEpoch)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
switch st {
|
||||
case 1:
|
||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
case 2:
|
||||
|
@ -105,32 +114,41 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
|
|||
return false, locked, ErrObjectIsExpired
|
||||
}
|
||||
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
|
||||
cnr := addr.Container()
|
||||
key := make([]byte, bucketKeySize)
|
||||
|
||||
// if graveyard is empty, then check if object exists in primary bucket
|
||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
||||
v, err := valueSafe(r, primaryKey(addr.Container(), addr.Object()))
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if v != nil {
|
||||
return true, locked, nil
|
||||
}
|
||||
|
||||
// if primary bucket is empty, then check if object exists in parent bucket
|
||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
||||
children, err := selectByPrefixBatch(ctx, r, parentKeyLongPrefix(addr.Container(), addr.Object()), 1) // try to found any child
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if len(children) > 0 {
|
||||
splitInfo, err := getSplitInfo(r, addr)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists with ec prefix
|
||||
children, err = selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(addr.Container(), addr.Object()), 1) // try to found any child
|
||||
if err != nil {
|
||||
return false, locked, err
|
||||
}
|
||||
|
||||
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
// if parent bucket is empty, then check if object exists in ec bucket
|
||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||
return false, locked, getECInfoError(tx, cnr, data)
|
||||
if len(children) > 0 {
|
||||
return false, locked, getECInfoError(ctx, r, addr)
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists in typed buckets
|
||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
||||
t, err := firstIrregularObjectType(r, addr.Container(), addr.Object())
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
||||
return t != objectSDK.TypeRegular, locked, nil
|
||||
}
|
||||
|
||||
// objectStatus returns:
|
||||
|
@ -138,59 +156,52 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
|
|||
// - 1 if object with GC mark;
|
||||
// - 2 if object is covered with tombstone;
|
||||
// - 3 if object is expired.
|
||||
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
|
||||
func objectStatus(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (uint8, error) {
|
||||
// locked object could not be removed/marked with GC/expired
|
||||
if objectLocked(tx, addr.Container(), addr.Object()) {
|
||||
return 0
|
||||
locked, err := objectLocked(ctx, r, addr.Container(), addr.Object())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if locked {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// we check only if the object is expired in the current
|
||||
// epoch since it is considered the only corner case: the
|
||||
// GC is expected to collect all the objects that have
|
||||
// expired previously for less than the one epoch duration
|
||||
st, err := inGraveyardWithKey(r, addr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if st > 0 {
|
||||
return st, nil
|
||||
}
|
||||
|
||||
expired := isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpoch, addr, currEpoch)
|
||||
if !expired {
|
||||
expired = isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpochNeoFS, addr, currEpoch)
|
||||
expired, err := isExpired(ctx, r, addr, currEpoch)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if expired {
|
||||
return 3
|
||||
return 3, nil
|
||||
}
|
||||
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
addrKey := addressKey(addr, make([]byte, addressKeySize))
|
||||
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
|
||||
if graveyard == nil {
|
||||
// incorrect metabase state, does not make
|
||||
// sense to check garbage bucket
|
||||
return 0
|
||||
func inGraveyardWithKey(r pebble.Reader, addr oid.Address) (uint8, error) {
|
||||
v, err := valueSafe(r, graveyardKey(addr.Container(), addr.Object()))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
val := graveyard.Get(addrKey)
|
||||
if val == nil {
|
||||
if garbageBCK == nil {
|
||||
// incorrect node state
|
||||
return 0
|
||||
if v != nil {
|
||||
return 2, nil
|
||||
}
|
||||
|
||||
val = garbageBCK.Get(addrKey)
|
||||
if val != nil {
|
||||
// object has been marked with GC
|
||||
return 1
|
||||
v, err = valueSafe(r, garbageKey(addr.Container(), addr.Object()))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// neither in the graveyard
|
||||
// nor was marked with GC mark
|
||||
return 0
|
||||
if v != nil {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
// object in the graveyard
|
||||
return 2
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// inBucket checks if key <key> is present in bucket <name>.
|
||||
|
@ -208,19 +219,45 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
|||
|
||||
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
||||
// if there is no `key` record in root index.
|
||||
func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
|
||||
bucketName := rootBucketName(cnr, make([]byte, bucketKeySize))
|
||||
rawSplitInfo := getFromBucket(tx, bucketName, key)
|
||||
if len(rawSplitInfo) == 0 {
|
||||
func getSplitInfo(r pebble.Reader, addr oid.Address) (*objectSDK.SplitInfo, error) {
|
||||
rawSplitInfo, err := valueSafe(r, rootKey(addr.Container(), addr.Object()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(rawSplitInfo) == 0 || bytes.Equal(zeroValue, rawSplitInfo) {
|
||||
return nil, ErrLackSplitInfo
|
||||
}
|
||||
|
||||
splitInfo := objectSDK.NewSplitInfo()
|
||||
|
||||
err := splitInfo.Unmarshal(rawSplitInfo)
|
||||
err = splitInfo.Unmarshal(rawSplitInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
||||
}
|
||||
|
||||
return splitInfo, nil
|
||||
}
|
||||
|
||||
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
||||
//
|
||||
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
||||
func firstIrregularObjectType(r pebble.Reader, idCnr cid.ID, objs ...oid.ID) (objectSDK.Type, error) {
|
||||
for _, objID := range objs {
|
||||
key := tombstoneKey(idCnr, objID)
|
||||
v, err := valueSafe(r, key)
|
||||
if err != nil {
|
||||
return objectSDK.TypeRegular, err
|
||||
}
|
||||
if v != nil {
|
||||
return objectSDK.TypeTombstone, nil
|
||||
}
|
||||
key = lockersKey(idCnr, objID)
|
||||
v, err = valueSafe(r, key)
|
||||
if err != nil {
|
||||
return objectSDK.TypeRegular, err
|
||||
}
|
||||
if v != nil {
|
||||
return objectSDK.TypeLock, nil
|
||||
}
|
||||
}
|
||||
|
||||
return objectSDK.TypeRegular, nil
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -163,3 +165,42 @@ func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID
|
|||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (bool, error) {
|
||||
prefix := []byte{expiredPrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: []byte{expiredPrefix},
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// iteration does in ascending order by expiration epoch.
|
||||
// gc does expired objects collect every epoch, so here should be not so much items.
|
||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
expEpoch, err := expirationEpochFromExpiredKey(it.Key())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if expEpoch >= currEpoch {
|
||||
return false, nil // keys are ordered by epoch, so next items will be discarded anyway.
|
||||
}
|
||||
|
||||
curAddr, err := addressFromExpiredKey(it.Key())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if curAddr == addr {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -191,18 +192,42 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
|||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||
keys, err := decodeList(data)
|
||||
func getECInfoError(ctx context.Context, r pebble.Reader, addr oid.Address) error {
|
||||
var chunkAddresses []oid.Address
|
||||
for {
|
||||
keys, err := selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(addr.Container(), addr.Object()), batchSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ecInfo := objectSDK.NewECInfo()
|
||||
for _, key := range keys {
|
||||
// check in primary index
|
||||
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
||||
if len(ojbData) != 0 {
|
||||
chunkAddress, err := addressOfChunkFromECInfoKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkAddresses = append(chunkAddresses, chunkAddress)
|
||||
}
|
||||
if len(keys) < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
ecInfo := objectSDK.NewECInfo()
|
||||
for _, chunkAddress := range chunkAddresses {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
objData, err := valueSafe(r, primaryKey(chunkAddress.Container(), chunkAddress.Object()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(objData) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(ojbData); err != nil {
|
||||
if err := obj.Unmarshal(objData); err != nil {
|
||||
return err
|
||||
}
|
||||
chunk := objectSDK.ECChunk{}
|
||||
|
@ -212,6 +237,5 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
|||
chunk.Total = obj.ECHeader().Total()
|
||||
ecInfo.AddChunk(chunk)
|
||||
}
|
||||
}
|
||||
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
@ -161,18 +162,13 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
|||
}
|
||||
|
||||
// checks if specified object is locked in the specified container.
|
||||
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
if bucketLocked != nil {
|
||||
key := make([]byte, cidSize)
|
||||
idCnr.Encode(key)
|
||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||
if bucketLockedContainer != nil {
|
||||
return bucketLockedContainer.Get(objectKey(idObj, key)) != nil
|
||||
func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) (bool, error) {
|
||||
prefix := lockedKeyLongPrefix(idCnr, idObj)
|
||||
items, err := selectByPrefixBatch(ctx, r, prefix, 1)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
return len(items) > 0, nil
|
||||
}
|
||||
|
||||
// return `LOCK` id's if specified object is locked in the specified container.
|
||||
|
@ -383,3 +379,13 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er
|
|||
success = err == nil
|
||||
return res, err
|
||||
}
|
||||
|
||||
// return true if provided object is of LOCK type.
|
||||
func isLockObject(r pebble.Reader, idCnr cid.ID, obj oid.ID) (bool, error) {
|
||||
key := lockersKey(idCnr, obj)
|
||||
v, err := valueSafe(r, key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return v != nil, nil
|
||||
}
|
||||
|
|
|
@ -2,12 +2,15 @@ package meta
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
func getSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
||||
const batchSize = 1000
|
||||
|
||||
func valueSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
||||
data, closer, err := r.Get(key)
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
|
@ -21,3 +24,46 @@ func getSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
|||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) batch(f func(batch *pebble.Batch) error) error {
|
||||
b := db.database.NewIndexedBatch()
|
||||
err := f(b)
|
||||
if err != nil {
|
||||
return errors.Join(err, b.Close())
|
||||
}
|
||||
return errors.Join(b.Commit(pebble.Sync), b.Close())
|
||||
}
|
||||
|
||||
func (db *DB) snapshot(f func(*pebble.Snapshot) error) error {
|
||||
s := db.database.NewSnapshot()
|
||||
err := f(s)
|
||||
if err != nil {
|
||||
return errors.Join(err, s.Close())
|
||||
}
|
||||
return s.Close()
|
||||
}
|
||||
|
||||
func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, batchSize int) ([][]byte, error) {
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result [][]byte
|
||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.Join(ctx.Err(), it.Close())
|
||||
default:
|
||||
}
|
||||
|
||||
result = append(result, bytes.Clone(it.Key()))
|
||||
if len(result) == batchSize {
|
||||
return result, it.Close()
|
||||
}
|
||||
}
|
||||
return result, it.Close()
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
@ -86,9 +87,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.database.Batch(func(tx *bbolt.Tx) error {
|
||||
err = db.batch(func(b *pebble.Batch) error {
|
||||
var e error
|
||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
||||
res, e = db.put(b, prm.obj, prm.id, nil, currEpoch)
|
||||
return e
|
||||
})
|
||||
if err == nil {
|
||||
|
@ -101,7 +102,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) put(tx *bbolt.Tx,
|
||||
func (db *DB) put(batch *pebble.Batch,
|
||||
obj *objectSDK.Object,
|
||||
id []byte,
|
||||
si *objectSDK.SplitInfo,
|
||||
|
|
|
@ -3,31 +3,14 @@ package meta
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
// graveyardBucketName stores rows with the objects that have been
|
||||
// covered with Tombstone objects. That objects should not be returned
|
||||
// from the node and should not be accepted by the node from other
|
||||
// nodes.
|
||||
graveyardBucketName = []byte{graveyardPrefix}
|
||||
// garbageBucketName stores rows with the objects that should be physically
|
||||
// deleted by the node (Garbage Collector routine).
|
||||
garbageBucketName = []byte{garbagePrefix}
|
||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
||||
containerCounterBucketName = []byte{containerCountersPrefix}
|
||||
|
||||
zeroValue = []byte{0xFF}
|
||||
|
||||
errInvalidLength = errors.New("invalid length")
|
||||
)
|
||||
var zeroValue = []byte{0xFF}
|
||||
|
||||
// Prefix bytes for database keys. All ids and addresses are encoded in binary
|
||||
// unless specified otherwise.
|
||||
|
@ -42,13 +25,13 @@ const (
|
|||
// Key: object address
|
||||
// Value: dummy value
|
||||
garbagePrefix
|
||||
// toMoveItPrefix is used for bucket containing IDs of objects that are candidates for moving
|
||||
// _ Previous usage was for for bucket containing IDs of objects that are candidates for moving
|
||||
// to another shard.
|
||||
toMoveItPrefix
|
||||
// containerVolumePrefix is used for storing container size estimations.
|
||||
_
|
||||
// containerSizePrefix is used for storing container size estimations.
|
||||
// Key: container ID
|
||||
// Value: container size in bytes as little-endian uint64
|
||||
containerVolumePrefix
|
||||
containerSizePrefix
|
||||
// lockedPrefix is used for storing locked objects information.
|
||||
// Key: container ID
|
||||
// Value: bucket mapping objects locked to the list of corresponding LOCK objects.
|
||||
|
@ -124,6 +107,9 @@ const (
|
|||
// Key: container ID + type
|
||||
// Value: Object id
|
||||
ecInfoPrefix
|
||||
|
||||
// expiredPrefix used to store expiration info.
|
||||
expiredPrefix
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -133,139 +119,553 @@ const (
|
|||
addressKeySize = cidSize + objectKeySize
|
||||
)
|
||||
|
||||
func bucketName(cnr cid.ID, prefix byte, key []byte) []byte {
|
||||
key[0] = prefix
|
||||
cnr.Encode(key[1:])
|
||||
return key[:bucketKeySize]
|
||||
func keyPrefix(cnr cid.ID, prefix byte) []byte {
|
||||
result := make([]byte, 1+cidSize)
|
||||
result[0] = prefix
|
||||
cnr.Encode(result[1:])
|
||||
return result
|
||||
}
|
||||
|
||||
// primaryBucketName returns <CID>.
|
||||
func primaryBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, primaryPrefix, key)
|
||||
func keyObject(prefix byte, cnr cid.ID, objID oid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize+objectKeySize)
|
||||
result[0] = prefix
|
||||
cnr.Encode(result[1:])
|
||||
objID.Encode(result[1+cidSize:])
|
||||
return result
|
||||
}
|
||||
|
||||
// tombstoneBucketName returns <CID>_TS.
|
||||
func tombstoneBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, tombstonePrefix, key)
|
||||
func addressFromKey(prefix byte, key []byte) (oid.Address, error) {
|
||||
if len(key) != 1+cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != prefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cont cid.ID
|
||||
if err := cont.Decode(key[1 : 1+cidSize]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(key[1+cidSize:]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||
}
|
||||
var result oid.Address
|
||||
result.SetContainer(cont)
|
||||
result.SetObject(obj)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// smallBucketName returns <CID>_small.
|
||||
func smallBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, smallPrefix, key)
|
||||
// primaryKeyPrefix returns primaryPrefix_<CID>.
|
||||
func primaryKeyPrefix(cnr cid.ID) []byte {
|
||||
return keyPrefix(cnr, primaryPrefix)
|
||||
}
|
||||
|
||||
// attributeBucketName returns <CID>_attr_<attributeKey>.
|
||||
func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
||||
key[0] = userAttributePrefix
|
||||
cnr.Encode(key[1:])
|
||||
return append(key[:bucketKeySize], attributeKey...)
|
||||
func primaryKey(cnr cid.ID, objID oid.ID) []byte {
|
||||
return keyObject(primaryPrefix, cnr, objID)
|
||||
}
|
||||
|
||||
// returns <CID> from attributeBucketName result, nil otherwise.
|
||||
func cidFromAttributeBucket(val []byte, attributeKey string) []byte {
|
||||
if len(val) < bucketKeySize || val[0] != userAttributePrefix || !bytes.Equal(val[bucketKeySize:], []byte(attributeKey)) {
|
||||
return nil
|
||||
func addressFromPrimaryKey(v []byte) (oid.Address, error) {
|
||||
return addressFromKey(primaryPrefix, v)
|
||||
}
|
||||
|
||||
return val[1:bucketKeySize]
|
||||
// tombstoneKeyPrefix returns tombstonePrefix_<CID>.
|
||||
func tombstoneKeyPrefix(cnr cid.ID) []byte {
|
||||
return keyPrefix(cnr, tombstonePrefix)
|
||||
}
|
||||
|
||||
// payloadHashBucketName returns <CID>_payloadhash.
|
||||
func payloadHashBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, payloadHashPrefix, key)
|
||||
func tombstoneKey(cnr cid.ID, objID oid.ID) []byte {
|
||||
return keyObject(tombstonePrefix, cnr, objID)
|
||||
}
|
||||
|
||||
// rootBucketName returns <CID>_root.
|
||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, rootPrefix, key)
|
||||
func addressFromTombstoneKey(v []byte) (oid.Address, error) {
|
||||
return addressFromKey(tombstonePrefix, v)
|
||||
}
|
||||
|
||||
// ownerBucketName returns <CID>_ownerid.
|
||||
func ownerBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, ownerPrefix, key)
|
||||
func garbageKey(cnr cid.ID, objID oid.ID) []byte {
|
||||
return keyObject(garbagePrefix, cnr, objID)
|
||||
}
|
||||
|
||||
// parentBucketName returns <CID>_parent.
|
||||
func parentBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, parentPrefix, key)
|
||||
func addressFromGarbageKey(v []byte) (oid.Address, error) {
|
||||
return addressFromKey(garbagePrefix, v)
|
||||
}
|
||||
|
||||
// splitBucketName returns <CID>_splitid.
|
||||
func splitBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, splitPrefix, key)
|
||||
func graveyardKey(cnr cid.ID, objID oid.ID) []byte {
|
||||
return keyObject(graveyardPrefix, cnr, objID)
|
||||
}
|
||||
|
||||
// ecInfoBucketName returns <CID>_ecinfo.
|
||||
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, ecInfoPrefix, key)
|
||||
func addressFromGraveyardKey(v []byte) (oid.Address, error) {
|
||||
return addressFromKey(graveyardPrefix, v)
|
||||
}
|
||||
|
||||
// addressKey returns key for K-V tables when key is a whole address.
|
||||
func addressKey(addr oid.Address, key []byte) []byte {
|
||||
addr.Container().Encode(key)
|
||||
addr.Object().Encode(key[cidSize:])
|
||||
return key[:addressKeySize]
|
||||
func smallKey(cnr cid.ID, obj oid.ID) []byte {
|
||||
return keyObject(smallPrefix, cnr, obj)
|
||||
}
|
||||
|
||||
// parses object address formed by addressKey.
|
||||
func decodeAddressFromKey(dst *oid.Address, k []byte) error {
|
||||
if len(k) != addressKeySize {
|
||||
return errInvalidLength
|
||||
// attributeKeyPrefix returns userAttributePrefix_<attributeKey>_<CID>_<attributeValue>.
|
||||
func attributeKeyPrefix(cnr cid.ID, attributeKey, attributeValue string) []byte {
|
||||
result := make([]byte, 1+len(attributeKey)+cidSize+len(attributeValue))
|
||||
result[0] = userAttributePrefix
|
||||
copy(result[1:], []byte(attributeKey))
|
||||
cnr.Encode(result[1+len(attributeKey):])
|
||||
copy(result[1+len(attributeKey)+cidSize:], []byte(attributeValue))
|
||||
return result
|
||||
}
|
||||
|
||||
// userAttributePrefix+attributeKey+<CID>+attributeValue+<OID>.
|
||||
func attributeKey(cnr cid.ID, objID oid.ID, attributeKey, attributeValue string) []byte {
|
||||
result := make([]byte, 1+len(attributeKey)+cidSize+len(attributeValue)+objectKeySize)
|
||||
result[0] = userAttributePrefix
|
||||
copy(result[1:], []byte(attributeKey))
|
||||
cnr.Encode(result[1+len(attributeKey):])
|
||||
copy(result[1+len(attributeKey)+cidSize:], []byte(attributeValue))
|
||||
objID.Encode(result[1+cidSize+len(attributeKey)+len(attributeValue):])
|
||||
return result
|
||||
}
|
||||
|
||||
// returns attributeValue from attributeKey result, nil otherwise.
|
||||
func attributeValueFromAttributeKey(key []byte, attributeKey string) ([]byte, error) {
|
||||
if len(key) < 1+len(attributeKey)+cidSize+objectKeySize {
|
||||
return nil, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != userAttributePrefix {
|
||||
return nil, errInvalidKeyPrefix
|
||||
}
|
||||
if !bytes.Equal(key[1:1+len(attributeKey)], []byte(attributeKey)) {
|
||||
return nil, errInvalidAttributeKey
|
||||
}
|
||||
|
||||
return key[1+len(attributeKey)+cidSize : len(key)-objectKeySize], nil
|
||||
}
|
||||
|
||||
func addressFromAttributeKey(key []byte, attributeKey string) (oid.Address, error) {
|
||||
if len(key) < 1+len(attributeKey)+cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != userAttributePrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
if !bytes.Equal(key[1:1+len(attributeKey)], []byte(attributeKey)) {
|
||||
return oid.Address{}, errInvalidAttributeKey
|
||||
}
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.Decode(key[1+len(attributeKey) : 1+len(attributeKey)+cidSize]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
var objID oid.ID
|
||||
if err := objID.Decode(key[len(key)-objectKeySize:]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||
}
|
||||
var result oid.Address
|
||||
result.SetContainer(cnrID)
|
||||
result.SetObject(objID)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// payloadHashKeyLongPrefix returns payloadHashPrefix_<CID>_hash.
|
||||
func payloadHashKeyLongPrefix(cnr cid.ID, hash []byte) []byte {
|
||||
result := make([]byte, 1+cidSize+len(hash))
|
||||
result[0] = payloadHashPrefix
|
||||
cnr.Encode(result[1:])
|
||||
copy(result[1+cidSize:], hash)
|
||||
return result
|
||||
}
|
||||
|
||||
// payloadHashKeyShortPrefix returns payloadHashPrefix_<CID>.
|
||||
func payloadHashKeyShortPrefix(cnr cid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize)
|
||||
result[0] = payloadHashPrefix
|
||||
cnr.Encode(result[1:])
|
||||
return result
|
||||
}
|
||||
|
||||
// payloadHashKey returns payloadHashPrefix_<CID>_hash_<OID>.
|
||||
func payloadHashKey(cnr cid.ID, obj oid.ID, hash []byte) []byte {
|
||||
result := make([]byte, 1+cidSize+len(hash)+objectKeySize)
|
||||
result[0] = payloadHashPrefix
|
||||
cnr.Encode(result[1:])
|
||||
copy(result[1+cidSize:], hash)
|
||||
obj.Encode(result[1+cidSize+len(hash):])
|
||||
return result
|
||||
}
|
||||
|
||||
func addressFromPayloadHashKey(k []byte) (oid.Address, error) {
|
||||
if len(k) < 1+cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if k[0] != payloadHashPrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cnr cid.ID
|
||||
if err := cnr.Decode(k[:cidSize]); err != nil {
|
||||
return err
|
||||
if err := cnr.Decode(k[1 : 1+cidSize]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(k[cidSize:]); err != nil {
|
||||
return err
|
||||
if err := obj.Decode(k[len(k)-objectKeySize:]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
dst.SetObject(obj)
|
||||
dst.SetContainer(cnr)
|
||||
return nil
|
||||
var result oid.Address
|
||||
result.SetObject(obj)
|
||||
result.SetContainer(cnr)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// objectKey returns key for K-V tables when key is an object id.
|
||||
func objectKey(obj oid.ID, key []byte) []byte {
|
||||
obj.Encode(key)
|
||||
return key[:objectKeySize]
|
||||
func payloadHashFromPayloadHashKey(k []byte) ([]byte, error) {
|
||||
if len(k) < 1+cidSize+objectKeySize {
|
||||
return nil, errInvalidKeyLenght
|
||||
}
|
||||
if k[0] != payloadHashPrefix {
|
||||
return nil, errInvalidKeyPrefix
|
||||
}
|
||||
|
||||
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
||||
//
|
||||
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
||||
func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
|
||||
if len(objs) == 0 {
|
||||
panic("empty object list in firstIrregularObjectType")
|
||||
return bytes.Clone(k[1+cidSize : len(k)-objectKeySize]), nil
|
||||
}
|
||||
|
||||
var keys [2][1 + cidSize]byte
|
||||
|
||||
irregularTypeBuckets := [...]struct {
|
||||
typ objectSDK.Type
|
||||
name []byte
|
||||
}{
|
||||
{objectSDK.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])},
|
||||
{objectSDK.TypeLock, bucketNameLockers(idCnr, keys[1][:])},
|
||||
// rootBucketName returns rootPrefix_<CID>.
|
||||
func rootKeyPrefix(cnr cid.ID) []byte {
|
||||
return keyPrefix(cnr, rootPrefix)
|
||||
}
|
||||
|
||||
for i := range objs {
|
||||
for j := range irregularTypeBuckets {
|
||||
if inBucket(tx, irregularTypeBuckets[j].name, objs[i]) {
|
||||
return irregularTypeBuckets[j].typ
|
||||
}
|
||||
}
|
||||
func rootKey(cnr cid.ID, objID oid.ID) []byte {
|
||||
return keyObject(rootPrefix, cnr, objID)
|
||||
}
|
||||
|
||||
return objectSDK.TypeRegular
|
||||
func addressFromRootKey(key []byte) (oid.Address, error) {
|
||||
return addressFromKey(rootPrefix, key)
|
||||
}
|
||||
|
||||
// return true if provided object is of LOCK type.
|
||||
func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
||||
return inBucket(tx,
|
||||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
||||
objectKey(obj, make([]byte, objectKeySize)))
|
||||
// ownerKey returns ownerPrefix_<CID>_owner_<OID>.
|
||||
func ownerKey(cnr cid.ID, obj oid.ID, owner []byte) []byte {
|
||||
result := make([]byte, 1+cidSize+len(owner)+objectKeySize)
|
||||
result[0] = ownerPrefix
|
||||
cnr.Encode(result[1:])
|
||||
copy(result[1+cidSize:], owner)
|
||||
obj.Encode(result[1+cidSize+len(owner):])
|
||||
return result
|
||||
}
|
||||
|
||||
// ownerKeyShortPrefix returns ownerPrefix_<CID>.
|
||||
func ownerKeyShortPrefix(cnr cid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize)
|
||||
result[0] = ownerPrefix
|
||||
cnr.Encode(result[1:])
|
||||
return result
|
||||
}
|
||||
|
||||
// ownerKeyLongPrefix returns ownerPrefix_<CID>_owner.
|
||||
func ownerKeyLongPrefix(cnr cid.ID, owner []byte) []byte {
|
||||
result := make([]byte, 1+cidSize+len(owner))
|
||||
result[0] = ownerPrefix
|
||||
cnr.Encode(result[1:])
|
||||
copy(result[1+cidSize:], owner)
|
||||
return result
|
||||
}
|
||||
|
||||
func addressFromOwnerKey(k []byte) (oid.Address, error) {
|
||||
if len(k) < 1+cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if k[0] != ownerPrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cnr cid.ID
|
||||
if err := cnr.Decode(k[1 : 1+cidSize]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(k[len(k)-objectKeySize:]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var result oid.Address
|
||||
result.SetObject(obj)
|
||||
result.SetContainer(cnr)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func ownerFromOwnerKey(k []byte) ([]byte, error) {
|
||||
if len(k) < 1+cidSize+objectKeySize {
|
||||
return nil, errInvalidKeyLenght
|
||||
}
|
||||
if k[0] != ownerPrefix {
|
||||
return nil, errInvalidKeyPrefix
|
||||
}
|
||||
return bytes.Clone(k[1+cidSize : len(k)-objectKeySize]), nil
|
||||
}
|
||||
|
||||
// ecInfoLongKeyPrefix returns ecInfoPrefix_<CID>_<parent_OID>.
|
||||
func ecInfoLongKeyPrefix(cnr cid.ID, parent oid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize+objectKeySize)
|
||||
result[0] = ecInfoPrefix
|
||||
cnr.Encode(result[1:])
|
||||
parent.Encode(result[1+cidSize:])
|
||||
return result
|
||||
}
|
||||
|
||||
// ecInfoShortKeyPrefix returns ecInfoPrefix_<CID>.
|
||||
func ecInfoShortKeyPrefix(cnr cid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize)
|
||||
result[0] = ecInfoPrefix
|
||||
cnr.Encode(result[1:])
|
||||
return result
|
||||
}
|
||||
|
||||
func ecInfoKey(cnr cid.ID, parent, chunk oid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize+objectKeySize+objectKeySize)
|
||||
result[0] = ecInfoPrefix
|
||||
cnr.Encode(result[1:])
|
||||
parent.Encode(result[1+cidSize:])
|
||||
chunk.Encode(result[1+cidSize+objectKeySize:])
|
||||
return result
|
||||
}
|
||||
|
||||
func addressOfParentFromECInfoKey(key []byte) (oid.Address, error) {
|
||||
return addressFromKey(ecInfoPrefix, key[:1+cidSize+objectKeySize])
|
||||
}
|
||||
|
||||
func addressOfChunkFromECInfoKey(key []byte) (oid.Address, error) {
|
||||
if len(key) != 1+cidSize+objectKeySize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != ecInfoPrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cont cid.ID
|
||||
if err := cont.Decode(key[1 : 1+cidSize]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(key[1+cidSize+objectKeySize:]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||
}
|
||||
var result oid.Address
|
||||
result.SetContainer(cont)
|
||||
result.SetObject(obj)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// parentKeyShortPrefix returns parentPrefix_<CID>.
|
||||
func parentKeyShortPrefix(cnr cid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize)
|
||||
result[0] = parentPrefix
|
||||
cnr.Encode(result[1:])
|
||||
return result
|
||||
}
|
||||
|
||||
func addressOfParentFromParentKey(key []byte) (oid.Address, error) {
|
||||
return addressFromKey(parentPrefix, key[:1+cidSize+objectKeySize])
|
||||
}
|
||||
|
||||
func addressOfTargetFromParentKey(key []byte) (oid.Address, error) {
|
||||
if len(key) != 1+cidSize+objectKeySize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != parentPrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cont cid.ID
|
||||
if err := cont.Decode(key[1 : 1+cidSize]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||
}
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(key[1+cidSize+objectKeySize:]); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||
}
|
||||
var result oid.Address
|
||||
result.SetContainer(cont)
|
||||
result.SetObject(obj)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// parentKeyLongPrefix returns parentPrefix_<CID>_<parent_OID>.
|
||||
func parentKeyLongPrefix(cnr cid.ID, parentObj oid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize+objectKeySize)
|
||||
result[0] = parentPrefix
|
||||
cnr.Encode(result[1:])
|
||||
parentObj.Encode(result[bucketKeySize:])
|
||||
return result
|
||||
}
|
||||
|
||||
func parentKey(cnr cid.ID, parentObj, obj oid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize+objectKeySize+objectKeySize)
|
||||
result[0] = parentPrefix
|
||||
cnr.Encode(result[1:])
|
||||
parentObj.Encode(result[1+cidSize:])
|
||||
obj.Encode(result[1+cidSize+objectKeySize:])
|
||||
return result
|
||||
}
|
||||
|
||||
// splitKeyLongPrefix returns splitPrefix_<CID>_splitID.
|
||||
func splitKeyLongPrefix(cnr cid.ID, splitID []byte) []byte {
|
||||
result := make([]byte, 1+cidSize+len(splitID))
|
||||
result[0] = splitPrefix
|
||||
cnr.Encode(result[1:])
|
||||
copy(result[1+cidSize:], splitID)
|
||||
return result
|
||||
}
|
||||
|
||||
// splitKeyShortPrefix returns splitPrefix_<CID>.
|
||||
func splitKeyShortPrefix(cnr cid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize)
|
||||
result[0] = splitPrefix
|
||||
cnr.Encode(result[1:])
|
||||
return result
|
||||
}
|
||||
|
||||
// splitKey returns splitPrefix_<CID>_splitID_<OID>.
|
||||
func splitKey(cnr cid.ID, obj oid.ID, splitID []byte) []byte {
|
||||
result := make([]byte, 1+cidSize+len(splitID)+objectKeySize)
|
||||
result[0] = splitPrefix
|
||||
cnr.Encode(result[1:])
|
||||
copy(result[1+cidSize:], splitID)
|
||||
obj.Encode(result[1+cidSize+len(splitID):])
|
||||
return result
|
||||
}
|
||||
|
||||
func addressFromSplitKey(key []byte) (oid.Address, error) {
|
||||
if len(key) < 1+cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != splitPrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cnr cid.ID
|
||||
if err := cnr.Decode(key[1 : 1+cidSize]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(key[len(key)-objectKeySize:]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var result oid.Address
|
||||
result.SetObject(obj)
|
||||
result.SetContainer(cnr)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func splitIDFromSplitKey(key []byte) ([]byte, error) {
|
||||
if len(key) < 1+cidSize+objectKeySize {
|
||||
return nil, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != splitPrefix {
|
||||
return nil, errInvalidKeyPrefix
|
||||
}
|
||||
|
||||
return bytes.Clone(key[1+cidSize : len(key)-objectKeySize]), nil
|
||||
}
|
||||
|
||||
// returns prefix of the keys with objects of type LOCK for specified container.
|
||||
func lockersKeyPrefix(idCnr cid.ID) []byte {
|
||||
return keyPrefix(idCnr, lockersPrefix)
|
||||
}
|
||||
|
||||
func lockersKey(cnrID cid.ID, objID oid.ID) []byte {
|
||||
return keyObject(lockersPrefix, cnrID, objID)
|
||||
}
|
||||
|
||||
func addressFromLockersKey(v []byte) (oid.Address, error) {
|
||||
return addressFromKey(lockersPrefix, v)
|
||||
}
|
||||
|
||||
// returns lockedPrefix_<CID>_<OID>.
|
||||
func lockedKeyLongPrefix(cnrID cid.ID, objID oid.ID) []byte {
|
||||
prefix := make([]byte, 1+cidSize+objectKeySize)
|
||||
prefix[0] = lockedPrefix
|
||||
cnrID.Encode(prefix[1:])
|
||||
objID.Encode(prefix[1+cidSize:])
|
||||
return prefix
|
||||
}
|
||||
|
||||
// returns lockedPrefix_<CID>.
|
||||
func lockedKeyShortPrefix(cnrID cid.ID) []byte {
|
||||
prefix := make([]byte, 1+cidSize)
|
||||
prefix[0] = lockedPrefix
|
||||
cnrID.Encode(prefix[1:])
|
||||
return prefix
|
||||
}
|
||||
|
||||
// returns lockedPrefix_<CID>_<OID>_<LOCKER_OID>.
|
||||
func lockedKey(cnrID cid.ID, objID, lockerObjID oid.ID) []byte {
|
||||
result := make([]byte, 1+cidSize+objectKeySize+objectKeySize)
|
||||
result[0] = lockedPrefix
|
||||
cnrID.Encode(result[1:])
|
||||
objID.Encode(result[1+cidSize:])
|
||||
lockerObjID.Encode(result[1+cidSize+objectKeySize:])
|
||||
return result
|
||||
}
|
||||
|
||||
func lockerObjectIDFromLockedKey(k []byte) (oid.ID, error) {
|
||||
if len(k) != 1+cidSize+objectKeySize+objectKeySize {
|
||||
return oid.ID{}, errInvalidKeyLenght
|
||||
}
|
||||
if k[0] != lockedPrefix {
|
||||
return oid.ID{}, errInvalidKeyPrefix
|
||||
}
|
||||
var result oid.ID
|
||||
if err := result.Decode(k[1+cidSize+objectKeySize:]); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("failed to decode lockers object ID: %w", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func objectIDFromLockedKey(k []byte) (oid.ID, error) {
|
||||
if len(k) != 1+cidSize+objectKeySize+objectKeySize {
|
||||
return oid.ID{}, errInvalidKeyLenght
|
||||
}
|
||||
if k[0] != lockedPrefix {
|
||||
return oid.ID{}, errInvalidKeyPrefix
|
||||
}
|
||||
var result oid.ID
|
||||
if err := result.Decode(k[1+cidSize : 1+cidSize+objectKeySize]); err != nil {
|
||||
return oid.ID{}, fmt.Errorf("failed to decode locked object ID: %w", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func expiredKey(cnr cid.ID, obj oid.ID, epoch uint64) []byte {
|
||||
result := make([]byte, 1+8+cidSize+objectKeySize)
|
||||
result[0] = expiredPrefix
|
||||
// BigEndian is important for iteration order
|
||||
binary.BigEndian.PutUint64(result[1:1+8], epoch)
|
||||
cnr.Encode(result[1+8 : 1+8+cidSize])
|
||||
obj.Encode(result[1+8+cidSize:])
|
||||
return result
|
||||
}
|
||||
|
||||
func expirationEpochFromExpiredKey(key []byte) (uint64, error) {
|
||||
if len(key) != 1+8+cidSize+objectKeySize {
|
||||
return 0, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != expiredPrefix {
|
||||
return 0, errInvalidKeyPrefix
|
||||
}
|
||||
// BigEndian is important for iteration order
|
||||
return binary.BigEndian.Uint64(key[1 : 1+8]), nil
|
||||
}
|
||||
|
||||
func addressFromExpiredKey(key []byte) (oid.Address, error) {
|
||||
if len(key) != 1+8+cidSize+objectKeySize {
|
||||
return oid.Address{}, errInvalidKeyLenght
|
||||
}
|
||||
if key[0] != expiredPrefix {
|
||||
return oid.Address{}, errInvalidKeyPrefix
|
||||
}
|
||||
var cnr cid.ID
|
||||
if err := cnr.Decode(key[1+8 : 1+8+cidSize]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(key[len(key)-objectKeySize:]); err != nil {
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
var result oid.Address
|
||||
result.SetObject(obj)
|
||||
result.SetContainer(cnr)
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue