forked from TrueCloudLab/frostfs-node
[#1412] metabase: Index attributes for indexed containers
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
1b520f7973
commit
be744ae3e6
8 changed files with 231 additions and 15 deletions
|
@ -535,6 +535,6 @@ func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address
|
||||||
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexableContainer bool) error {
|
func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error {
|
||||||
return engine.Put(ctx, e.engine, o, indexableContainer)
|
return engine.Put(ctx, e.engine, o, indexedContainer)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -471,7 +472,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
egObject.Go(func() error {
|
egObject.Go(func() error {
|
||||||
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
|
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate, c.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel(err)
|
cancel(err)
|
||||||
}
|
}
|
||||||
|
@ -744,7 +745,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -773,7 +774,7 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
|
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res, cnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -817,7 +818,7 @@ func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString()))
|
hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString()))
|
||||||
for j := range shards {
|
for j := range shards {
|
||||||
|
@ -830,7 +831,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
||||||
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
|
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object).status {
|
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object, container.IsIndexedContainer(cnr)).status {
|
||||||
case putToShardSuccess:
|
case putToShardSuccess:
|
||||||
res.objEvacuated.Add(1)
|
res.objEvacuated.Add(1)
|
||||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||||
|
|
|
@ -107,7 +107,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
// Shard was concurrently removed, skip.
|
// Shard was concurrently removed, skip.
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
shRes = e.putToShard(ctx, sh, pool, addr, prm.Object)
|
shRes = e.putToShard(ctx, sh, pool, addr, prm.Object, prm.IsIndexedContainer)
|
||||||
return shRes.status != putToShardUnknown
|
return shRes.status != putToShardUnknown
|
||||||
})
|
})
|
||||||
switch shRes.status {
|
switch shRes.status {
|
||||||
|
@ -125,7 +125,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
||||||
// putToShard puts object to sh.
|
// putToShard puts object to sh.
|
||||||
// Return putToShardStatus and error if it is necessary to propagate an error upper.
|
// Return putToShardStatus and error if it is necessary to propagate an error upper.
|
||||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool,
|
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool,
|
||||||
addr oid.Address, obj *objectSDK.Object,
|
addr oid.Address, obj *objectSDK.Object, isIndexedContainer bool,
|
||||||
) (res putToShardRes) {
|
) (res putToShardRes) {
|
||||||
exitCh := make(chan struct{})
|
exitCh := make(chan struct{})
|
||||||
|
|
||||||
|
@ -158,6 +158,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm shard.PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
putPrm.SetIndexAttributes(isIndexedContainer)
|
||||||
|
|
||||||
_, err = sh.Put(ctx, putPrm)
|
_, err = sh.Put(ctx, putPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -338,6 +338,11 @@ func (db *DB) deleteObject(
|
||||||
return fmt.Errorf("can't remove list indexes: %w", err)
|
return fmt.Errorf("can't remove list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if isParent {
|
if isParent {
|
||||||
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
|
@ -415,6 +420,46 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
|
bkt := tx.Bucket(item.name)
|
||||||
|
if bkt == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fkbtRoot := bkt.Bucket(item.key)
|
||||||
|
if fkbtRoot == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fkbtRoot.Delete(item.val); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasAnyItem(fkbtRoot) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bkt.DeleteBucket(item.key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasAnyItem(bkt) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.DeleteBucket(item.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasAnyItem(b *bbolt.Bucket) bool {
|
||||||
|
var hasAnyItem bool
|
||||||
|
c := b.Cursor()
|
||||||
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||||
|
hasAnyItem = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return hasAnyItem
|
||||||
|
}
|
||||||
|
|
||||||
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
||||||
addr := object.AddressOf(obj)
|
addr := object.AddressOf(obj)
|
||||||
|
|
||||||
|
|
85
pkg/local_object_storage/metabase/delete_meta_test.go
Normal file
85
pkg/local_object_storage/metabase/delete_meta_test.go
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPutDeleteIndexAttributes(t *testing.T) {
|
||||||
|
db := New([]Option{
|
||||||
|
WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||||
|
WithPermissions(0o600),
|
||||||
|
WithEpochState(epochState{}),
|
||||||
|
}...)
|
||||||
|
|
||||||
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
|
require.NoError(t, db.Init())
|
||||||
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
obj1 := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
testutil.AddAttribute(obj1, "S3-Access-Box-CRDT-Name", "CRDT-Name")
|
||||||
|
testutil.AddAttribute(obj1, objectSDK.AttributeFilePath, "/path/to/object")
|
||||||
|
|
||||||
|
var putPrm PutPrm
|
||||||
|
putPrm.SetObject(obj1)
|
||||||
|
|
||||||
|
_, err := db.Put(context.Background(), putPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(attributeBucketName(cnr, "S3-Access-Box-CRDT-Name", make([]byte, bucketKeySize)))
|
||||||
|
require.Nil(t, b)
|
||||||
|
b = tx.Bucket(attributeBucketName(cnr, objectSDK.AttributeFilePath, make([]byte, bucketKeySize)))
|
||||||
|
require.Nil(t, b)
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
|
||||||
|
obj2 := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
testutil.AddAttribute(obj2, "S3-Access-Box-CRDT-Name", "CRDT-Name")
|
||||||
|
testutil.AddAttribute(obj2, objectSDK.AttributeFilePath, "/path/to/object")
|
||||||
|
|
||||||
|
putPrm.SetObject(obj2)
|
||||||
|
putPrm.SetIndexAttributes(true)
|
||||||
|
|
||||||
|
_, err = db.Put(context.Background(), putPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objKey := objectKey(objectCore.AddressOf(obj2).Object(), make([]byte, objectKeySize))
|
||||||
|
require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(attributeBucketName(cnr, "S3-Access-Box-CRDT-Name", make([]byte, bucketKeySize)))
|
||||||
|
require.NotNil(t, b)
|
||||||
|
b = b.Bucket([]byte("CRDT-Name"))
|
||||||
|
require.NotNil(t, b)
|
||||||
|
require.True(t, bytes.Equal(zeroValue, b.Get(objKey)))
|
||||||
|
b = tx.Bucket(attributeBucketName(cnr, objectSDK.AttributeFilePath, make([]byte, bucketKeySize)))
|
||||||
|
require.NotNil(t, b)
|
||||||
|
b = b.Bucket([]byte("/path/to/object"))
|
||||||
|
require.NotNil(t, b)
|
||||||
|
require.True(t, bytes.Equal(zeroValue, b.Get(objKey)))
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
|
||||||
|
var dPrm DeletePrm
|
||||||
|
dPrm.SetAddresses(objectCore.AddressOf(obj1), objectCore.AddressOf(obj2))
|
||||||
|
_, err = db.Delete(context.Background(), dPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(attributeBucketName(cnr, "S3-Access-Box-CRDT-Name", make([]byte, bucketKeySize)))
|
||||||
|
require.Nil(t, b)
|
||||||
|
b = tx.Bucket(attributeBucketName(cnr, objectSDK.AttributeFilePath, make([]byte, bucketKeySize)))
|
||||||
|
require.Nil(t, b)
|
||||||
|
return nil
|
||||||
|
}))
|
||||||
|
}
|
|
@ -35,6 +35,8 @@ type PutPrm struct {
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
|
|
||||||
id []byte
|
id []byte
|
||||||
|
|
||||||
|
indexAttributes bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutRes groups the resulting values of Put operation.
|
// PutRes groups the resulting values of Put operation.
|
||||||
|
@ -52,6 +54,10 @@ func (p *PutPrm) SetStorageID(id []byte) {
|
||||||
p.id = id
|
p.id = id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PutPrm) SetIndexAttributes(v bool) {
|
||||||
|
p.indexAttributes = v
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrUnknownObjectType = errors.New("unknown object type")
|
ErrUnknownObjectType = errors.New("unknown object type")
|
||||||
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
|
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
|
||||||
|
@ -90,7 +96,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
var e error
|
var e error
|
||||||
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -108,6 +114,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
id []byte,
|
id []byte,
|
||||||
si *objectSDK.SplitInfo,
|
si *objectSDK.SplitInfo,
|
||||||
currEpoch uint64,
|
currEpoch uint64,
|
||||||
|
indexAttributes bool,
|
||||||
) (PutRes, error) {
|
) (PutRes, error) {
|
||||||
cnr, ok := obj.ContainerID()
|
cnr, ok := obj.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -129,7 +136,7 @@ func (db *DB) put(tx *bbolt.Tx,
|
||||||
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
||||||
}
|
}
|
||||||
|
|
||||||
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
|
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||||
|
@ -152,14 +159,14 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error {
|
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
||||||
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
||||||
parentSI, err := splitInfoFromObject(obj)
|
parentSI, err := splitInfoFromObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.put(tx, par, id, parentSI, currEpoch)
|
_, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -175,6 +182,13 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
return fmt.Errorf("can't put list indexes: %w", err)
|
return fmt.Errorf("can't put list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if indexAttributes {
|
||||||
|
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update container volume size estimation
|
// update container volume size estimation
|
||||||
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||||
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
||||||
|
@ -381,6 +395,50 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var indexedAttributes = map[string]struct{}{
|
||||||
|
"S3-Access-Box-CRDT-Name": {},
|
||||||
|
objectSDK.AttributeFilePath: {},
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsAtrributeIndexed returns True if attribute is indexed by metabase.
|
||||||
|
func IsAtrributeIndexed(attr string) bool {
|
||||||
|
_, found := indexedAttributes[attr]
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||||
|
id, _ := obj.ID()
|
||||||
|
cnr, _ := obj.ContainerID()
|
||||||
|
objKey := objectKey(id, make([]byte, objectKeySize))
|
||||||
|
|
||||||
|
key := make([]byte, bucketKeySize)
|
||||||
|
var attrs []objectSDK.Attribute
|
||||||
|
if obj.ECHeader() != nil {
|
||||||
|
attrs = obj.ECHeader().ParentAttributes()
|
||||||
|
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
|
||||||
|
} else {
|
||||||
|
attrs = obj.Attributes()
|
||||||
|
}
|
||||||
|
|
||||||
|
// user specified attributes
|
||||||
|
for i := range attrs {
|
||||||
|
if !IsAtrributeIndexed(attrs[i].Key()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
key = attributeBucketName(cnr, attrs[i].Key(), key)
|
||||||
|
err := f(tx, namedBucketItem{
|
||||||
|
name: key,
|
||||||
|
key: []byte(attrs[i].Value()),
|
||||||
|
val: objKey,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
|
func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
|
||||||
attributes := obj.Attributes()
|
attributes := obj.Attributes()
|
||||||
if ech := obj.ECHeader(); ech != nil {
|
if ech := obj.ECHeader(); ech != nil {
|
||||||
|
@ -425,6 +483,20 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fkbtRoot.Put(item.val, zeroValue)
|
||||||
|
}
|
||||||
|
|
||||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -99,7 +99,6 @@ const (
|
||||||
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||||
// Key: attribute value
|
// Key: attribute value
|
||||||
// Value: bucket containing object IDs as keys
|
// Value: bucket containing object IDs as keys
|
||||||
// removed in version 3
|
|
||||||
userAttributePrefix
|
userAttributePrefix
|
||||||
|
|
||||||
// ====================
|
// ====================
|
||||||
|
@ -170,6 +169,13 @@ func smallBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, smallPrefix, key)
|
return bucketName(cnr, smallPrefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// attributeBucketName returns <CID>_<attributeKey>.
|
||||||
|
func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
||||||
|
key[0] = userAttributePrefix
|
||||||
|
cnr.Encode(key[1:])
|
||||||
|
return append(key[:bucketKeySize], attributeKey...)
|
||||||
|
}
|
||||||
|
|
||||||
// rootBucketName returns <CID>_root.
|
// rootBucketName returns <CID>_root.
|
||||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, rootPrefix, key)
|
return bucketName(cnr, rootPrefix, key)
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
// PutPrm groups the parameters of Put operation.
|
// PutPrm groups the parameters of Put operation.
|
||||||
type PutPrm struct {
|
type PutPrm struct {
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
|
indexAttributes bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutRes groups the resulting values of Put operation.
|
// PutRes groups the resulting values of Put operation.
|
||||||
|
@ -28,6 +29,10 @@ func (p *PutPrm) SetObject(obj *objectSDK.Object) {
|
||||||
p.obj = obj
|
p.obj = obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PutPrm) SetIndexAttributes(v bool) {
|
||||||
|
p.indexAttributes = v
|
||||||
|
}
|
||||||
|
|
||||||
// Put saves the object in shard.
|
// Put saves the object in shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
|
@ -84,6 +89,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
var pPrm meta.PutPrm
|
var pPrm meta.PutPrm
|
||||||
pPrm.SetObject(prm.obj)
|
pPrm.SetObject(prm.obj)
|
||||||
pPrm.SetStorageID(res.StorageID)
|
pPrm.SetStorageID(res.StorageID)
|
||||||
|
pPrm.SetIndexAttributes(prm.indexAttributes)
|
||||||
res, err := s.metaBase.Put(ctx, pPrm)
|
res, err := s.metaBase.Put(ctx, pPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// may we need to handle this case in a special way
|
// may we need to handle this case in a special way
|
||||||
|
|
Loading…
Reference in a new issue