[#9999] metabase: Fix db engine to pebble in exists.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
cebf9da40a
commit
c7955c63de
9 changed files with 795 additions and 237 deletions
|
@ -26,7 +26,10 @@ var (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errInvalidKeyLenght = errors.New("invalid key length")
|
errInvalidKeyLenght = errors.New("invalid key length")
|
||||||
|
errInvalidKeyPrefix = errors.New("invalid key prefix")
|
||||||
errInvalidValueLenght = errors.New("invalid value length")
|
errInvalidValueLenght = errors.New("invalid value length")
|
||||||
|
errInvalidContainerIDValue = errors.New("invalid container ID value")
|
||||||
|
errInvalidAttributeKey = errors.New("invalid userr attribute key")
|
||||||
)
|
)
|
||||||
|
|
||||||
type objectType uint8
|
type objectType uint8
|
||||||
|
|
|
@ -188,7 +188,7 @@ func stringEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f fun
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
data, err := getSafe(r, val)
|
data, err := valueSafe(r, val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -13,6 +13,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -81,8 +82,8 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
err = db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
res.exists, res.locked, err = db.exists(tx, prm.addr, prm.paddr, currEpoch)
|
res.exists, res.locked, err = db.exists(ctx, s, prm.addr, prm.paddr, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -90,13 +91,21 @@ func (db *DB) Exists(ctx context.Context, prm ExistsPrm) (res ExistsRes, err err
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
func (db *DB) exists(ctx context.Context, r pebble.Reader, addr oid.Address, parent oid.Address, currEpoch uint64) (bool, bool, error) {
|
||||||
var locked bool
|
var locked bool
|
||||||
|
var err error
|
||||||
if !parent.Equals(oid.Address{}) {
|
if !parent.Equals(oid.Address{}) {
|
||||||
locked = objectLocked(tx, parent.Container(), parent.Object())
|
locked, err = objectLocked(ctx, r, parent.Container(), parent.Object())
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// check graveyard and object expiration first
|
// check graveyard and object expiration first
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
st, err := objectStatus(ctx, r, addr, currEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
switch st {
|
||||||
case 1:
|
case 1:
|
||||||
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return false, locked, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
case 2:
|
case 2:
|
||||||
|
@ -105,32 +114,41 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
|
||||||
return false, locked, ErrObjectIsExpired
|
return false, locked, ErrObjectIsExpired
|
||||||
}
|
}
|
||||||
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
v, err := valueSafe(r, primaryKey(addr.Container(), addr.Object()))
|
||||||
|
if err != nil {
|
||||||
cnr := addr.Container()
|
return false, false, err
|
||||||
key := make([]byte, bucketKeySize)
|
}
|
||||||
|
if v != nil {
|
||||||
// if graveyard is empty, then check if object exists in primary bucket
|
|
||||||
if inBucket(tx, primaryBucketName(cnr, key), objKey) {
|
|
||||||
return true, locked, nil
|
return true, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if primary bucket is empty, then check if object exists in parent bucket
|
children, err := selectByPrefixBatch(ctx, r, parentKeyLongPrefix(addr.Container(), addr.Object()), 1) // try to found any child
|
||||||
if inBucket(tx, parentBucketName(cnr, key), objKey) {
|
if err != nil {
|
||||||
splitInfo, err := getSplitInfo(tx, cnr, objKey)
|
return false, false, err
|
||||||
|
}
|
||||||
|
if len(children) > 0 {
|
||||||
|
splitInfo, err := getSplitInfo(r, addr)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||||
|
}
|
||||||
|
|
||||||
|
// if parent bucket is empty, then check if object exists with ec prefix
|
||||||
|
children, err = selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(addr.Container(), addr.Object()), 1) // try to found any child
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, locked, err
|
return false, locked, err
|
||||||
}
|
}
|
||||||
|
if len(children) > 0 {
|
||||||
return false, locked, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
return false, locked, getECInfoError(ctx, r, addr)
|
||||||
}
|
|
||||||
// if parent bucket is empty, then check if object exists in ec bucket
|
|
||||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
|
||||||
return false, locked, getECInfoError(tx, cnr, data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if parent bucket is empty, then check if object exists in typed buckets
|
t, err := firstIrregularObjectType(r, addr.Container(), addr.Object())
|
||||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, locked, nil
|
if err != nil {
|
||||||
|
return false, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return t != objectSDK.TypeRegular, locked, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectStatus returns:
|
// objectStatus returns:
|
||||||
|
@ -138,59 +156,52 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, parent oid.Address, currEpo
|
||||||
// - 1 if object with GC mark;
|
// - 1 if object with GC mark;
|
||||||
// - 2 if object is covered with tombstone;
|
// - 2 if object is covered with tombstone;
|
||||||
// - 3 if object is expired.
|
// - 3 if object is expired.
|
||||||
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
|
func objectStatus(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (uint8, error) {
|
||||||
// locked object could not be removed/marked with GC/expired
|
// locked object could not be removed/marked with GC/expired
|
||||||
if objectLocked(tx, addr.Container(), addr.Object()) {
|
locked, err := objectLocked(ctx, r, addr.Container(), addr.Object())
|
||||||
return 0
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if locked {
|
||||||
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// we check only if the object is expired in the current
|
st, err := inGraveyardWithKey(r, addr)
|
||||||
// epoch since it is considered the only corner case: the
|
if err != nil {
|
||||||
// GC is expected to collect all the objects that have
|
return 0, err
|
||||||
// expired previously for less than the one epoch duration
|
}
|
||||||
|
if st > 0 {
|
||||||
|
return st, nil
|
||||||
|
}
|
||||||
|
|
||||||
expired := isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpoch, addr, currEpoch)
|
expired, err := isExpired(ctx, r, addr, currEpoch)
|
||||||
if !expired {
|
if err != nil {
|
||||||
expired = isExpiredWithAttribute(tx, objectV2.SysAttributeExpEpochNeoFS, addr, currEpoch)
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if expired {
|
if expired {
|
||||||
return 3
|
return 3, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
return 0, nil
|
||||||
garbageBkt := tx.Bucket(garbageBucketName)
|
|
||||||
addrKey := addressKey(addr, make([]byte, addressKeySize))
|
|
||||||
return inGraveyardWithKey(addrKey, graveyardBkt, garbageBkt)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func inGraveyardWithKey(addrKey []byte, graveyard, garbageBCK *bbolt.Bucket) uint8 {
|
func inGraveyardWithKey(r pebble.Reader, addr oid.Address) (uint8, error) {
|
||||||
if graveyard == nil {
|
v, err := valueSafe(r, graveyardKey(addr.Container(), addr.Object()))
|
||||||
// incorrect metabase state, does not make
|
if err != nil {
|
||||||
// sense to check garbage bucket
|
return 0, err
|
||||||
return 0
|
|
||||||
}
|
}
|
||||||
|
if v != nil {
|
||||||
val := graveyard.Get(addrKey)
|
return 2, nil
|
||||||
if val == nil {
|
|
||||||
if garbageBCK == nil {
|
|
||||||
// incorrect node state
|
|
||||||
return 0
|
|
||||||
}
|
}
|
||||||
|
v, err = valueSafe(r, garbageKey(addr.Container(), addr.Object()))
|
||||||
val = garbageBCK.Get(addrKey)
|
if err != nil {
|
||||||
if val != nil {
|
return 0, err
|
||||||
// object has been marked with GC
|
|
||||||
return 1
|
|
||||||
}
|
}
|
||||||
|
if v != nil {
|
||||||
// neither in the graveyard
|
return 1, nil
|
||||||
// nor was marked with GC mark
|
|
||||||
return 0
|
|
||||||
}
|
}
|
||||||
|
return 0, nil
|
||||||
// object in the graveyard
|
|
||||||
return 2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// inBucket checks if key <key> is present in bucket <name>.
|
// inBucket checks if key <key> is present in bucket <name>.
|
||||||
|
@ -208,19 +219,45 @@ func inBucket(tx *bbolt.Tx, name, key []byte) bool {
|
||||||
|
|
||||||
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
// getSplitInfo returns SplitInfo structure from root index. Returns error
|
||||||
// if there is no `key` record in root index.
|
// if there is no `key` record in root index.
|
||||||
func getSplitInfo(tx *bbolt.Tx, cnr cid.ID, key []byte) (*objectSDK.SplitInfo, error) {
|
func getSplitInfo(r pebble.Reader, addr oid.Address) (*objectSDK.SplitInfo, error) {
|
||||||
bucketName := rootBucketName(cnr, make([]byte, bucketKeySize))
|
rawSplitInfo, err := valueSafe(r, rootKey(addr.Container(), addr.Object()))
|
||||||
rawSplitInfo := getFromBucket(tx, bucketName, key)
|
if err != nil {
|
||||||
if len(rawSplitInfo) == 0 {
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(rawSplitInfo) == 0 || bytes.Equal(zeroValue, rawSplitInfo) {
|
||||||
return nil, ErrLackSplitInfo
|
return nil, ErrLackSplitInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
splitInfo := objectSDK.NewSplitInfo()
|
splitInfo := objectSDK.NewSplitInfo()
|
||||||
|
err = splitInfo.Unmarshal(rawSplitInfo)
|
||||||
err := splitInfo.Unmarshal(rawSplitInfo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
return nil, fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return splitInfo, nil
|
return splitInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
||||||
|
//
|
||||||
|
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
||||||
|
func firstIrregularObjectType(r pebble.Reader, idCnr cid.ID, objs ...oid.ID) (objectSDK.Type, error) {
|
||||||
|
for _, objID := range objs {
|
||||||
|
key := tombstoneKey(idCnr, objID)
|
||||||
|
v, err := valueSafe(r, key)
|
||||||
|
if err != nil {
|
||||||
|
return objectSDK.TypeRegular, err
|
||||||
|
}
|
||||||
|
if v != nil {
|
||||||
|
return objectSDK.TypeTombstone, nil
|
||||||
|
}
|
||||||
|
key = lockersKey(idCnr, objID)
|
||||||
|
v, err = valueSafe(r, key)
|
||||||
|
if err != nil {
|
||||||
|
return objectSDK.TypeRegular, err
|
||||||
|
}
|
||||||
|
if v != nil {
|
||||||
|
return objectSDK.TypeLock, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return objectSDK.TypeRegular, nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -163,3 +165,42 @@ func selectExpiredObjectIDs(tx *bbolt.Tx, attr string, epoch uint64, containerID
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (bool, error) {
|
||||||
|
prefix := []byte{expiredPrefix}
|
||||||
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: []byte{expiredPrefix},
|
||||||
|
OnlyReadGuaranteedDurable: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// iteration does in ascending order by expiration epoch.
|
||||||
|
// gc does expired objects collect every epoch, so here should be not so much items.
|
||||||
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
expEpoch, err := expirationEpochFromExpiredKey(it.Key())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if expEpoch >= currEpoch {
|
||||||
|
return false, nil // keys are ordered by epoch, so next items will be discarded anyway.
|
||||||
|
}
|
||||||
|
|
||||||
|
curAddr, err := addressFromExpiredKey(it.Key())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if curAddr == addr {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -191,18 +192,42 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
func getECInfoError(ctx context.Context, r pebble.Reader, addr oid.Address) error {
|
||||||
keys, err := decodeList(data)
|
var chunkAddresses []oid.Address
|
||||||
|
for {
|
||||||
|
keys, err := selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(addr.Container(), addr.Object()), batchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ecInfo := objectSDK.NewECInfo()
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
// check in primary index
|
chunkAddress, err := addressOfChunkFromECInfoKey(key)
|
||||||
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
if err != nil {
|
||||||
if len(ojbData) != 0 {
|
return err
|
||||||
|
}
|
||||||
|
chunkAddresses = append(chunkAddresses, chunkAddress)
|
||||||
|
}
|
||||||
|
if len(keys) < batchSize {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ecInfo := objectSDK.NewECInfo()
|
||||||
|
for _, chunkAddress := range chunkAddresses {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
objData, err := valueSafe(r, primaryKey(chunkAddress.Container(), chunkAddress.Object()))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(objData) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(ojbData); err != nil {
|
if err := obj.Unmarshal(objData); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
chunk := objectSDK.ECChunk{}
|
chunk := objectSDK.ECChunk{}
|
||||||
|
@ -212,6 +237,5 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||||
chunk.Total = obj.ECHeader().Total()
|
chunk.Total = obj.ECHeader().Total()
|
||||||
ecInfo.AddChunk(chunk)
|
ecInfo.AddChunk(chunk)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
|
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -161,18 +162,13 @@ func (db *DB) FreeLockedBy(lockers []oid.Address) ([]oid.Address, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// checks if specified object is locked in the specified container.
|
// checks if specified object is locked in the specified container.
|
||||||
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
func objectLocked(ctx context.Context, r pebble.Reader, idCnr cid.ID, idObj oid.ID) (bool, error) {
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
prefix := lockedKeyLongPrefix(idCnr, idObj)
|
||||||
if bucketLocked != nil {
|
items, err := selectByPrefixBatch(ctx, r, prefix, 1)
|
||||||
key := make([]byte, cidSize)
|
if err != nil {
|
||||||
idCnr.Encode(key)
|
return false, err
|
||||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
|
||||||
if bucketLockedContainer != nil {
|
|
||||||
return bucketLockedContainer.Get(objectKey(idObj, key)) != nil
|
|
||||||
}
|
}
|
||||||
}
|
return len(items) > 0, nil
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// return `LOCK` id's if specified object is locked in the specified container.
|
// return `LOCK` id's if specified object is locked in the specified container.
|
||||||
|
@ -383,3 +379,13 @@ func (db *DB) GetLocked(ctx context.Context, addr oid.Address) (res []oid.ID, er
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// return true if provided object is of LOCK type.
|
||||||
|
func isLockObject(r pebble.Reader, idCnr cid.ID, obj oid.ID) (bool, error) {
|
||||||
|
key := lockersKey(idCnr, obj)
|
||||||
|
v, err := valueSafe(r, key)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return v != nil, nil
|
||||||
|
}
|
||||||
|
|
|
@ -2,12 +2,15 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
const batchSize = 1000
|
||||||
|
|
||||||
|
func valueSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
||||||
data, closer, err := r.Get(key)
|
data, closer, err := r.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, pebble.ErrNotFound) {
|
if errors.Is(err, pebble.ErrNotFound) {
|
||||||
|
@ -21,3 +24,46 @@ func getSafe(r pebble.Reader, key []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) batch(f func(batch *pebble.Batch) error) error {
|
||||||
|
b := db.database.NewIndexedBatch()
|
||||||
|
err := f(b)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Join(err, b.Close())
|
||||||
|
}
|
||||||
|
return errors.Join(b.Commit(pebble.Sync), b.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) snapshot(f func(*pebble.Snapshot) error) error {
|
||||||
|
s := db.database.NewSnapshot()
|
||||||
|
err := f(s)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Join(err, s.Close())
|
||||||
|
}
|
||||||
|
return s.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, batchSize int) ([][]byte, error) {
|
||||||
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: prefix,
|
||||||
|
OnlyReadGuaranteedDurable: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result [][]byte
|
||||||
|
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, errors.Join(ctx.Err(), it.Close())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, bytes.Clone(it.Key()))
|
||||||
|
if len(result) == batchSize {
|
||||||
|
return result, it.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, it.Close()
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -86,9 +87,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
|
||||||
err = db.database.Batch(func(tx *bbolt.Tx) error {
|
err = db.batch(func(b *pebble.Batch) error {
|
||||||
var e error
|
var e error
|
||||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
res, e = db.put(b, prm.obj, prm.id, nil, currEpoch)
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -101,7 +102,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) put(tx *bbolt.Tx,
|
func (db *DB) put(batch *pebble.Batch,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
id []byte,
|
id []byte,
|
||||||
si *objectSDK.SplitInfo,
|
si *objectSDK.SplitInfo,
|
||||||
|
|
|
@ -3,31 +3,14 @@ package meta
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"errors"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var zeroValue = []byte{0xFF}
|
||||||
// graveyardBucketName stores rows with the objects that have been
|
|
||||||
// covered with Tombstone objects. That objects should not be returned
|
|
||||||
// from the node and should not be accepted by the node from other
|
|
||||||
// nodes.
|
|
||||||
graveyardBucketName = []byte{graveyardPrefix}
|
|
||||||
// garbageBucketName stores rows with the objects that should be physically
|
|
||||||
// deleted by the node (Garbage Collector routine).
|
|
||||||
garbageBucketName = []byte{garbagePrefix}
|
|
||||||
toMoveItBucketName = []byte{toMoveItPrefix}
|
|
||||||
containerVolumeBucketName = []byte{containerVolumePrefix}
|
|
||||||
containerCounterBucketName = []byte{containerCountersPrefix}
|
|
||||||
|
|
||||||
zeroValue = []byte{0xFF}
|
|
||||||
|
|
||||||
errInvalidLength = errors.New("invalid length")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Prefix bytes for database keys. All ids and addresses are encoded in binary
|
// Prefix bytes for database keys. All ids and addresses are encoded in binary
|
||||||
// unless specified otherwise.
|
// unless specified otherwise.
|
||||||
|
@ -42,13 +25,13 @@ const (
|
||||||
// Key: object address
|
// Key: object address
|
||||||
// Value: dummy value
|
// Value: dummy value
|
||||||
garbagePrefix
|
garbagePrefix
|
||||||
// toMoveItPrefix is used for bucket containing IDs of objects that are candidates for moving
|
// _ Previous usage was for for bucket containing IDs of objects that are candidates for moving
|
||||||
// to another shard.
|
// to another shard.
|
||||||
toMoveItPrefix
|
_
|
||||||
// containerVolumePrefix is used for storing container size estimations.
|
// containerSizePrefix is used for storing container size estimations.
|
||||||
// Key: container ID
|
// Key: container ID
|
||||||
// Value: container size in bytes as little-endian uint64
|
// Value: container size in bytes as little-endian uint64
|
||||||
containerVolumePrefix
|
containerSizePrefix
|
||||||
// lockedPrefix is used for storing locked objects information.
|
// lockedPrefix is used for storing locked objects information.
|
||||||
// Key: container ID
|
// Key: container ID
|
||||||
// Value: bucket mapping objects locked to the list of corresponding LOCK objects.
|
// Value: bucket mapping objects locked to the list of corresponding LOCK objects.
|
||||||
|
@ -124,6 +107,9 @@ const (
|
||||||
// Key: container ID + type
|
// Key: container ID + type
|
||||||
// Value: Object id
|
// Value: Object id
|
||||||
ecInfoPrefix
|
ecInfoPrefix
|
||||||
|
|
||||||
|
// expiredPrefix used to store expiration info.
|
||||||
|
expiredPrefix
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -133,139 +119,553 @@ const (
|
||||||
addressKeySize = cidSize + objectKeySize
|
addressKeySize = cidSize + objectKeySize
|
||||||
)
|
)
|
||||||
|
|
||||||
func bucketName(cnr cid.ID, prefix byte, key []byte) []byte {
|
func keyPrefix(cnr cid.ID, prefix byte) []byte {
|
||||||
key[0] = prefix
|
result := make([]byte, 1+cidSize)
|
||||||
cnr.Encode(key[1:])
|
result[0] = prefix
|
||||||
return key[:bucketKeySize]
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// primaryBucketName returns <CID>.
|
func keyObject(prefix byte, cnr cid.ID, objID oid.ID) []byte {
|
||||||
func primaryBucketName(cnr cid.ID, key []byte) []byte {
|
result := make([]byte, 1+cidSize+objectKeySize)
|
||||||
return bucketName(cnr, primaryPrefix, key)
|
result[0] = prefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
objID.Encode(result[1+cidSize:])
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// tombstoneBucketName returns <CID>_TS.
|
func addressFromKey(prefix byte, key []byte) (oid.Address, error) {
|
||||||
func tombstoneBucketName(cnr cid.ID, key []byte) []byte {
|
if len(key) != 1+cidSize+objectKeySize {
|
||||||
return bucketName(cnr, tombstonePrefix, key)
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != prefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var cont cid.ID
|
||||||
|
if err := cont.Decode(key[1 : 1+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(key[1+cidSize:]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||||
|
}
|
||||||
|
var result oid.Address
|
||||||
|
result.SetContainer(cont)
|
||||||
|
result.SetObject(obj)
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// smallBucketName returns <CID>_small.
|
// primaryKeyPrefix returns primaryPrefix_<CID>.
|
||||||
func smallBucketName(cnr cid.ID, key []byte) []byte {
|
func primaryKeyPrefix(cnr cid.ID) []byte {
|
||||||
return bucketName(cnr, smallPrefix, key)
|
return keyPrefix(cnr, primaryPrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
// attributeBucketName returns <CID>_attr_<attributeKey>.
|
func primaryKey(cnr cid.ID, objID oid.ID) []byte {
|
||||||
func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
return keyObject(primaryPrefix, cnr, objID)
|
||||||
key[0] = userAttributePrefix
|
|
||||||
cnr.Encode(key[1:])
|
|
||||||
return append(key[:bucketKeySize], attributeKey...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns <CID> from attributeBucketName result, nil otherwise.
|
func addressFromPrimaryKey(v []byte) (oid.Address, error) {
|
||||||
func cidFromAttributeBucket(val []byte, attributeKey string) []byte {
|
return addressFromKey(primaryPrefix, v)
|
||||||
if len(val) < bucketKeySize || val[0] != userAttributePrefix || !bytes.Equal(val[bucketKeySize:], []byte(attributeKey)) {
|
}
|
||||||
return nil
|
|
||||||
|
// tombstoneKeyPrefix returns tombstonePrefix_<CID>.
|
||||||
|
func tombstoneKeyPrefix(cnr cid.ID) []byte {
|
||||||
|
return keyPrefix(cnr, tombstonePrefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
func tombstoneKey(cnr cid.ID, objID oid.ID) []byte {
|
||||||
|
return keyObject(tombstonePrefix, cnr, objID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromTombstoneKey(v []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(tombstonePrefix, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func garbageKey(cnr cid.ID, objID oid.ID) []byte {
|
||||||
|
return keyObject(garbagePrefix, cnr, objID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromGarbageKey(v []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(garbagePrefix, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func graveyardKey(cnr cid.ID, objID oid.ID) []byte {
|
||||||
|
return keyObject(graveyardPrefix, cnr, objID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromGraveyardKey(v []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(graveyardPrefix, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func smallKey(cnr cid.ID, obj oid.ID) []byte {
|
||||||
|
return keyObject(smallPrefix, cnr, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
// attributeKeyPrefix returns userAttributePrefix_<attributeKey>_<CID>_<attributeValue>.
|
||||||
|
func attributeKeyPrefix(cnr cid.ID, attributeKey, attributeValue string) []byte {
|
||||||
|
result := make([]byte, 1+len(attributeKey)+cidSize+len(attributeValue))
|
||||||
|
result[0] = userAttributePrefix
|
||||||
|
copy(result[1:], []byte(attributeKey))
|
||||||
|
cnr.Encode(result[1+len(attributeKey):])
|
||||||
|
copy(result[1+len(attributeKey)+cidSize:], []byte(attributeValue))
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// userAttributePrefix+attributeKey+<CID>+attributeValue+<OID>.
|
||||||
|
func attributeKey(cnr cid.ID, objID oid.ID, attributeKey, attributeValue string) []byte {
|
||||||
|
result := make([]byte, 1+len(attributeKey)+cidSize+len(attributeValue)+objectKeySize)
|
||||||
|
result[0] = userAttributePrefix
|
||||||
|
copy(result[1:], []byte(attributeKey))
|
||||||
|
cnr.Encode(result[1+len(attributeKey):])
|
||||||
|
copy(result[1+len(attributeKey)+cidSize:], []byte(attributeValue))
|
||||||
|
objID.Encode(result[1+cidSize+len(attributeKey)+len(attributeValue):])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns attributeValue from attributeKey result, nil otherwise.
|
||||||
|
func attributeValueFromAttributeKey(key []byte, attributeKey string) ([]byte, error) {
|
||||||
|
if len(key) < 1+len(attributeKey)+cidSize+objectKeySize {
|
||||||
|
return nil, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != userAttributePrefix {
|
||||||
|
return nil, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
if !bytes.Equal(key[1:1+len(attributeKey)], []byte(attributeKey)) {
|
||||||
|
return nil, errInvalidAttributeKey
|
||||||
}
|
}
|
||||||
|
|
||||||
return val[1:bucketKeySize]
|
return key[1+len(attributeKey)+cidSize : len(key)-objectKeySize], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// payloadHashBucketName returns <CID>_payloadhash.
|
func addressFromAttributeKey(key []byte, attributeKey string) (oid.Address, error) {
|
||||||
func payloadHashBucketName(cnr cid.ID, key []byte) []byte {
|
if len(key) < 1+len(attributeKey)+cidSize+objectKeySize {
|
||||||
return bucketName(cnr, payloadHashPrefix, key)
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
}
|
|
||||||
|
|
||||||
// rootBucketName returns <CID>_root.
|
|
||||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, rootPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ownerBucketName returns <CID>_ownerid.
|
|
||||||
func ownerBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, ownerPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// parentBucketName returns <CID>_parent.
|
|
||||||
func parentBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, parentPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// splitBucketName returns <CID>_splitid.
|
|
||||||
func splitBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, splitPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ecInfoBucketName returns <CID>_ecinfo.
|
|
||||||
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
|
|
||||||
return bucketName(cnr, ecInfoPrefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// addressKey returns key for K-V tables when key is a whole address.
|
|
||||||
func addressKey(addr oid.Address, key []byte) []byte {
|
|
||||||
addr.Container().Encode(key)
|
|
||||||
addr.Object().Encode(key[cidSize:])
|
|
||||||
return key[:addressKeySize]
|
|
||||||
}
|
|
||||||
|
|
||||||
// parses object address formed by addressKey.
|
|
||||||
func decodeAddressFromKey(dst *oid.Address, k []byte) error {
|
|
||||||
if len(k) != addressKeySize {
|
|
||||||
return errInvalidLength
|
|
||||||
}
|
}
|
||||||
|
if key[0] != userAttributePrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
if !bytes.Equal(key[1:1+len(attributeKey)], []byte(attributeKey)) {
|
||||||
|
return oid.Address{}, errInvalidAttributeKey
|
||||||
|
}
|
||||||
|
var cnrID cid.ID
|
||||||
|
if err := cnrID.Decode(key[1+len(attributeKey) : 1+len(attributeKey)+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
var objID oid.ID
|
||||||
|
if err := objID.Decode(key[len(key)-objectKeySize:]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||||
|
}
|
||||||
|
var result oid.Address
|
||||||
|
result.SetContainer(cnrID)
|
||||||
|
result.SetObject(objID)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// payloadHashKeyLongPrefix returns payloadHashPrefix_<CID>_hash.
|
||||||
|
func payloadHashKeyLongPrefix(cnr cid.ID, hash []byte) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+len(hash))
|
||||||
|
result[0] = payloadHashPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
copy(result[1+cidSize:], hash)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// payloadHashKeyShortPrefix returns payloadHashPrefix_<CID>.
|
||||||
|
func payloadHashKeyShortPrefix(cnr cid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize)
|
||||||
|
result[0] = payloadHashPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// payloadHashKey returns payloadHashPrefix_<CID>_hash_<OID>.
|
||||||
|
func payloadHashKey(cnr cid.ID, obj oid.ID, hash []byte) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+len(hash)+objectKeySize)
|
||||||
|
result[0] = payloadHashPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
copy(result[1+cidSize:], hash)
|
||||||
|
obj.Encode(result[1+cidSize+len(hash):])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromPayloadHashKey(k []byte) (oid.Address, error) {
|
||||||
|
if len(k) < 1+cidSize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if k[0] != payloadHashPrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
if err := cnr.Decode(k[:cidSize]); err != nil {
|
if err := cnr.Decode(k[1 : 1+cidSize]); err != nil {
|
||||||
return err
|
return oid.Address{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var obj oid.ID
|
var obj oid.ID
|
||||||
if err := obj.Decode(k[cidSize:]); err != nil {
|
if err := obj.Decode(k[len(k)-objectKeySize:]); err != nil {
|
||||||
return err
|
return oid.Address{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.SetObject(obj)
|
var result oid.Address
|
||||||
dst.SetContainer(cnr)
|
result.SetObject(obj)
|
||||||
return nil
|
result.SetContainer(cnr)
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectKey returns key for K-V tables when key is an object id.
|
func payloadHashFromPayloadHashKey(k []byte) ([]byte, error) {
|
||||||
func objectKey(obj oid.ID, key []byte) []byte {
|
if len(k) < 1+cidSize+objectKeySize {
|
||||||
obj.Encode(key)
|
return nil, errInvalidKeyLenght
|
||||||
return key[:objectKeySize]
|
}
|
||||||
|
if k[0] != payloadHashPrefix {
|
||||||
|
return nil, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.Clone(k[1+cidSize : len(k)-objectKeySize]), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if meets irregular object container in objs - returns its type, otherwise returns object.TypeRegular.
|
// rootBucketName returns rootPrefix_<CID>.
|
||||||
//
|
func rootKeyPrefix(cnr cid.ID) []byte {
|
||||||
// firstIrregularObjectType(tx, cnr, obj) usage allows getting object type.
|
return keyPrefix(cnr, rootPrefix)
|
||||||
func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) objectSDK.Type {
|
|
||||||
if len(objs) == 0 {
|
|
||||||
panic("empty object list in firstIrregularObjectType")
|
|
||||||
}
|
|
||||||
|
|
||||||
var keys [2][1 + cidSize]byte
|
|
||||||
|
|
||||||
irregularTypeBuckets := [...]struct {
|
|
||||||
typ objectSDK.Type
|
|
||||||
name []byte
|
|
||||||
}{
|
|
||||||
{objectSDK.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])},
|
|
||||||
{objectSDK.TypeLock, bucketNameLockers(idCnr, keys[1][:])},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range objs {
|
|
||||||
for j := range irregularTypeBuckets {
|
|
||||||
if inBucket(tx, irregularTypeBuckets[j].name, objs[i]) {
|
|
||||||
return irregularTypeBuckets[j].typ
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return objectSDK.TypeRegular
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// return true if provided object is of LOCK type.
|
func rootKey(cnr cid.ID, objID oid.ID) []byte {
|
||||||
func isLockObject(tx *bbolt.Tx, idCnr cid.ID, obj oid.ID) bool {
|
return keyObject(rootPrefix, cnr, objID)
|
||||||
return inBucket(tx,
|
}
|
||||||
bucketNameLockers(idCnr, make([]byte, bucketKeySize)),
|
|
||||||
objectKey(obj, make([]byte, objectKeySize)))
|
func addressFromRootKey(key []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(rootPrefix, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ownerKey returns ownerPrefix_<CID>_owner_<OID>.
|
||||||
|
func ownerKey(cnr cid.ID, obj oid.ID, owner []byte) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+len(owner)+objectKeySize)
|
||||||
|
result[0] = ownerPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
copy(result[1+cidSize:], owner)
|
||||||
|
obj.Encode(result[1+cidSize+len(owner):])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// ownerKeyShortPrefix returns ownerPrefix_<CID>.
|
||||||
|
func ownerKeyShortPrefix(cnr cid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize)
|
||||||
|
result[0] = ownerPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// ownerKeyLongPrefix returns ownerPrefix_<CID>_owner.
|
||||||
|
func ownerKeyLongPrefix(cnr cid.ID, owner []byte) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+len(owner))
|
||||||
|
result[0] = ownerPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
copy(result[1+cidSize:], owner)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromOwnerKey(k []byte) (oid.Address, error) {
|
||||||
|
if len(k) < 1+cidSize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if k[0] != ownerPrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var cnr cid.ID
|
||||||
|
if err := cnr.Decode(k[1 : 1+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(k[len(k)-objectKeySize:]); err != nil {
|
||||||
|
return oid.Address{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result oid.Address
|
||||||
|
result.SetObject(obj)
|
||||||
|
result.SetContainer(cnr)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ownerFromOwnerKey(k []byte) ([]byte, error) {
|
||||||
|
if len(k) < 1+cidSize+objectKeySize {
|
||||||
|
return nil, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if k[0] != ownerPrefix {
|
||||||
|
return nil, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
return bytes.Clone(k[1+cidSize : len(k)-objectKeySize]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ecInfoLongKeyPrefix returns ecInfoPrefix_<CID>_<parent_OID>.
|
||||||
|
func ecInfoLongKeyPrefix(cnr cid.ID, parent oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+objectKeySize)
|
||||||
|
result[0] = ecInfoPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
parent.Encode(result[1+cidSize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// ecInfoShortKeyPrefix returns ecInfoPrefix_<CID>.
|
||||||
|
func ecInfoShortKeyPrefix(cnr cid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize)
|
||||||
|
result[0] = ecInfoPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func ecInfoKey(cnr cid.ID, parent, chunk oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+objectKeySize+objectKeySize)
|
||||||
|
result[0] = ecInfoPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
parent.Encode(result[1+cidSize:])
|
||||||
|
chunk.Encode(result[1+cidSize+objectKeySize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressOfParentFromECInfoKey(key []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(ecInfoPrefix, key[:1+cidSize+objectKeySize])
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressOfChunkFromECInfoKey(key []byte) (oid.Address, error) {
|
||||||
|
if len(key) != 1+cidSize+objectKeySize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != ecInfoPrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var cont cid.ID
|
||||||
|
if err := cont.Decode(key[1 : 1+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(key[1+cidSize+objectKeySize:]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||||
|
}
|
||||||
|
var result oid.Address
|
||||||
|
result.SetContainer(cont)
|
||||||
|
result.SetObject(obj)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parentKeyShortPrefix returns parentPrefix_<CID>.
|
||||||
|
func parentKeyShortPrefix(cnr cid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize)
|
||||||
|
result[0] = parentPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressOfParentFromParentKey(key []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(parentPrefix, key[:1+cidSize+objectKeySize])
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressOfTargetFromParentKey(key []byte) (oid.Address, error) {
|
||||||
|
if len(key) != 1+cidSize+objectKeySize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != parentPrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var cont cid.ID
|
||||||
|
if err := cont.Decode(key[1 : 1+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(key[1+cidSize+objectKeySize:]); err != nil {
|
||||||
|
return oid.Address{}, fmt.Errorf("failed to decode object ID: %w", err)
|
||||||
|
}
|
||||||
|
var result oid.Address
|
||||||
|
result.SetContainer(cont)
|
||||||
|
result.SetObject(obj)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parentKeyLongPrefix returns parentPrefix_<CID>_<parent_OID>.
|
||||||
|
func parentKeyLongPrefix(cnr cid.ID, parentObj oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+objectKeySize)
|
||||||
|
result[0] = parentPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
parentObj.Encode(result[bucketKeySize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func parentKey(cnr cid.ID, parentObj, obj oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+objectKeySize+objectKeySize)
|
||||||
|
result[0] = parentPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
parentObj.Encode(result[1+cidSize:])
|
||||||
|
obj.Encode(result[1+cidSize+objectKeySize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitKeyLongPrefix returns splitPrefix_<CID>_splitID.
|
||||||
|
func splitKeyLongPrefix(cnr cid.ID, splitID []byte) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+len(splitID))
|
||||||
|
result[0] = splitPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
copy(result[1+cidSize:], splitID)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitKeyShortPrefix returns splitPrefix_<CID>.
|
||||||
|
func splitKeyShortPrefix(cnr cid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize)
|
||||||
|
result[0] = splitPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitKey returns splitPrefix_<CID>_splitID_<OID>.
|
||||||
|
func splitKey(cnr cid.ID, obj oid.ID, splitID []byte) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+len(splitID)+objectKeySize)
|
||||||
|
result[0] = splitPrefix
|
||||||
|
cnr.Encode(result[1:])
|
||||||
|
copy(result[1+cidSize:], splitID)
|
||||||
|
obj.Encode(result[1+cidSize+len(splitID):])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromSplitKey(key []byte) (oid.Address, error) {
|
||||||
|
if len(key) < 1+cidSize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != splitPrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var cnr cid.ID
|
||||||
|
if err := cnr.Decode(key[1 : 1+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(key[len(key)-objectKeySize:]); err != nil {
|
||||||
|
return oid.Address{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result oid.Address
|
||||||
|
result.SetObject(obj)
|
||||||
|
result.SetContainer(cnr)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitIDFromSplitKey(key []byte) ([]byte, error) {
|
||||||
|
if len(key) < 1+cidSize+objectKeySize {
|
||||||
|
return nil, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != splitPrefix {
|
||||||
|
return nil, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.Clone(key[1+cidSize : len(key)-objectKeySize]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns prefix of the keys with objects of type LOCK for specified container.
|
||||||
|
func lockersKeyPrefix(idCnr cid.ID) []byte {
|
||||||
|
return keyPrefix(idCnr, lockersPrefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
func lockersKey(cnrID cid.ID, objID oid.ID) []byte {
|
||||||
|
return keyObject(lockersPrefix, cnrID, objID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromLockersKey(v []byte) (oid.Address, error) {
|
||||||
|
return addressFromKey(lockersPrefix, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns lockedPrefix_<CID>_<OID>.
|
||||||
|
func lockedKeyLongPrefix(cnrID cid.ID, objID oid.ID) []byte {
|
||||||
|
prefix := make([]byte, 1+cidSize+objectKeySize)
|
||||||
|
prefix[0] = lockedPrefix
|
||||||
|
cnrID.Encode(prefix[1:])
|
||||||
|
objID.Encode(prefix[1+cidSize:])
|
||||||
|
return prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns lockedPrefix_<CID>.
|
||||||
|
func lockedKeyShortPrefix(cnrID cid.ID) []byte {
|
||||||
|
prefix := make([]byte, 1+cidSize)
|
||||||
|
prefix[0] = lockedPrefix
|
||||||
|
cnrID.Encode(prefix[1:])
|
||||||
|
return prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns lockedPrefix_<CID>_<OID>_<LOCKER_OID>.
|
||||||
|
func lockedKey(cnrID cid.ID, objID, lockerObjID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+cidSize+objectKeySize+objectKeySize)
|
||||||
|
result[0] = lockedPrefix
|
||||||
|
cnrID.Encode(result[1:])
|
||||||
|
objID.Encode(result[1+cidSize:])
|
||||||
|
lockerObjID.Encode(result[1+cidSize+objectKeySize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func lockerObjectIDFromLockedKey(k []byte) (oid.ID, error) {
|
||||||
|
if len(k) != 1+cidSize+objectKeySize+objectKeySize {
|
||||||
|
return oid.ID{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if k[0] != lockedPrefix {
|
||||||
|
return oid.ID{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var result oid.ID
|
||||||
|
if err := result.Decode(k[1+cidSize+objectKeySize:]); err != nil {
|
||||||
|
return oid.ID{}, fmt.Errorf("failed to decode lockers object ID: %w", err)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectIDFromLockedKey(k []byte) (oid.ID, error) {
|
||||||
|
if len(k) != 1+cidSize+objectKeySize+objectKeySize {
|
||||||
|
return oid.ID{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if k[0] != lockedPrefix {
|
||||||
|
return oid.ID{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var result oid.ID
|
||||||
|
if err := result.Decode(k[1+cidSize : 1+cidSize+objectKeySize]); err != nil {
|
||||||
|
return oid.ID{}, fmt.Errorf("failed to decode locked object ID: %w", err)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func expiredKey(cnr cid.ID, obj oid.ID, epoch uint64) []byte {
|
||||||
|
result := make([]byte, 1+8+cidSize+objectKeySize)
|
||||||
|
result[0] = expiredPrefix
|
||||||
|
// BigEndian is important for iteration order
|
||||||
|
binary.BigEndian.PutUint64(result[1:1+8], epoch)
|
||||||
|
cnr.Encode(result[1+8 : 1+8+cidSize])
|
||||||
|
obj.Encode(result[1+8+cidSize:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func expirationEpochFromExpiredKey(key []byte) (uint64, error) {
|
||||||
|
if len(key) != 1+8+cidSize+objectKeySize {
|
||||||
|
return 0, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != expiredPrefix {
|
||||||
|
return 0, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
// BigEndian is important for iteration order
|
||||||
|
return binary.BigEndian.Uint64(key[1 : 1+8]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func addressFromExpiredKey(key []byte) (oid.Address, error) {
|
||||||
|
if len(key) != 1+8+cidSize+objectKeySize {
|
||||||
|
return oid.Address{}, errInvalidKeyLenght
|
||||||
|
}
|
||||||
|
if key[0] != expiredPrefix {
|
||||||
|
return oid.Address{}, errInvalidKeyPrefix
|
||||||
|
}
|
||||||
|
var cnr cid.ID
|
||||||
|
if err := cnr.Decode(key[1+8 : 1+8+cidSize]); err != nil {
|
||||||
|
return oid.Address{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(key[len(key)-objectKeySize:]); err != nil {
|
||||||
|
return oid.Address{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result oid.Address
|
||||||
|
result.SetObject(obj)
|
||||||
|
result.SetContainer(cnr)
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue