From be744ae3e6eadb5b02952cbb110ef59f33f799bc Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 1 Oct 2024 18:19:12 +0300 Subject: [PATCH] [#1412] metabase: Index attributes for indexed containers Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 4 +- pkg/local_object_storage/engine/evacuate.go | 11 +-- pkg/local_object_storage/engine/put.go | 5 +- pkg/local_object_storage/metabase/delete.go | 45 ++++++++++ .../metabase/delete_meta_test.go | 85 +++++++++++++++++++ pkg/local_object_storage/metabase/put.go | 80 ++++++++++++++++- pkg/local_object_storage/metabase/util.go | 8 +- pkg/local_object_storage/shard/put.go | 8 +- 8 files changed, 231 insertions(+), 15 deletions(-) create mode 100644 pkg/local_object_storage/metabase/delete_meta_test.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 5c322886b..f2c4bff1d 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -535,6 +535,6 @@ func (e engineWithoutNotifications) Lock(ctx context.Context, locker oid.Address return e.engine.Lock(ctx, locker.Container(), locker.Object(), toLock) } -func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexableContainer bool) error { - return engine.Put(ctx, e.engine, o, indexableContainer) +func (e engineWithoutNotifications) Put(ctx context.Context, o *objectSDK.Object, indexedContainer bool) error { + return engine.Put(ctx, e.engine, o, indexedContainer) } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index a618ff274..1baf237f9 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -18,6 +18,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -471,7 +472,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context default: } egObject.Go(func() error { - err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate) + err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate, c.Value) if err != nil { cancel(err) } @@ -744,7 +745,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) } func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, - getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, cnr containerSDK.Container, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() @@ -773,7 +774,7 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI return err } - evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res) + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res, cnr) if err != nil { return err } @@ -817,7 +818,7 @@ func (e *StorageEngine) isNotRepOne(c *container.Container) bool { } func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, + shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, cnr containerSDK.Container, ) (bool, error) { hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString())) for j := range shards { @@ -830,7 +831,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok { continue } - switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object).status { + switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object, container.IsIndexedContainer(cnr)).status { case putToShardSuccess: res.objEvacuated.Add(1) e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 41e566560..a50d80b75 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -107,7 +107,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { // Shard was concurrently removed, skip. return false } - shRes = e.putToShard(ctx, sh, pool, addr, prm.Object) + shRes = e.putToShard(ctx, sh, pool, addr, prm.Object, prm.IsIndexedContainer) return shRes.status != putToShardUnknown }) switch shRes.status { @@ -125,7 +125,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error { // putToShard puts object to sh. // Return putToShardStatus and error if it is necessary to propagate an error upper. func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool, - addr oid.Address, obj *objectSDK.Object, + addr oid.Address, obj *objectSDK.Object, isIndexedContainer bool, ) (res putToShardRes) { exitCh := make(chan struct{}) @@ -158,6 +158,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti var putPrm shard.PutPrm putPrm.SetObject(obj) + putPrm.SetIndexAttributes(isIndexedContainer) _, err = sh.Put(ctx, putPrm) if err != nil { diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 4ad11164f..3add1f268 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -338,6 +338,11 @@ func (db *DB) deleteObject( return fmt.Errorf("can't remove list indexes: %w", err) } + err = updateFKBTIndexes(tx, obj, delFKBTIndexItem) + if err != nil { + return fmt.Errorf("can't remove fake bucket tree indexes: %w", err) + } + if isParent { // remove record from the garbage bucket, because regular object deletion does nothing for virtual object garbageBKT := tx.Bucket(garbageBucketName) @@ -415,6 +420,46 @@ func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { return nil } +func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { + bkt := tx.Bucket(item.name) + if bkt == nil { + return nil + } + + fkbtRoot := bkt.Bucket(item.key) + if fkbtRoot == nil { + return nil + } + + if err := fkbtRoot.Delete(item.val); err != nil { + return err + } + + if hasAnyItem(fkbtRoot) { + return nil + } + + if err := bkt.DeleteBucket(item.key); err != nil { + return err + } + + if hasAnyItem(bkt) { + return nil + } + + return tx.DeleteBucket(item.name) +} + +func hasAnyItem(b *bbolt.Bucket) bool { + var hasAnyItem bool + c := b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + hasAnyItem = true + break + } + return hasAnyItem +} + func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error { addr := object.AddressOf(obj) diff --git a/pkg/local_object_storage/metabase/delete_meta_test.go b/pkg/local_object_storage/metabase/delete_meta_test.go new file mode 100644 index 000000000..cdfe2a203 --- /dev/null +++ b/pkg/local_object_storage/metabase/delete_meta_test.go @@ -0,0 +1,85 @@ +package meta + +import ( + "bytes" + "context" + "path/filepath" + "testing" + + objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestPutDeleteIndexAttributes(t *testing.T) { + db := New([]Option{ + WithPath(filepath.Join(t.TempDir(), "metabase")), + WithPermissions(0o600), + WithEpochState(epochState{}), + }...) + + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + require.NoError(t, db.Init()) + defer func() { require.NoError(t, db.Close()) }() + + cnr := cidtest.ID() + obj1 := testutil.GenerateObjectWithCID(cnr) + testutil.AddAttribute(obj1, "S3-Access-Box-CRDT-Name", "CRDT-Name") + testutil.AddAttribute(obj1, objectSDK.AttributeFilePath, "/path/to/object") + + var putPrm PutPrm + putPrm.SetObject(obj1) + + _, err := db.Put(context.Background(), putPrm) + require.NoError(t, err) + + require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(attributeBucketName(cnr, "S3-Access-Box-CRDT-Name", make([]byte, bucketKeySize))) + require.Nil(t, b) + b = tx.Bucket(attributeBucketName(cnr, objectSDK.AttributeFilePath, make([]byte, bucketKeySize))) + require.Nil(t, b) + return nil + })) + + obj2 := testutil.GenerateObjectWithCID(cnr) + testutil.AddAttribute(obj2, "S3-Access-Box-CRDT-Name", "CRDT-Name") + testutil.AddAttribute(obj2, objectSDK.AttributeFilePath, "/path/to/object") + + putPrm.SetObject(obj2) + putPrm.SetIndexAttributes(true) + + _, err = db.Put(context.Background(), putPrm) + require.NoError(t, err) + + objKey := objectKey(objectCore.AddressOf(obj2).Object(), make([]byte, objectKeySize)) + require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(attributeBucketName(cnr, "S3-Access-Box-CRDT-Name", make([]byte, bucketKeySize))) + require.NotNil(t, b) + b = b.Bucket([]byte("CRDT-Name")) + require.NotNil(t, b) + require.True(t, bytes.Equal(zeroValue, b.Get(objKey))) + b = tx.Bucket(attributeBucketName(cnr, objectSDK.AttributeFilePath, make([]byte, bucketKeySize))) + require.NotNil(t, b) + b = b.Bucket([]byte("/path/to/object")) + require.NotNil(t, b) + require.True(t, bytes.Equal(zeroValue, b.Get(objKey))) + return nil + })) + + var dPrm DeletePrm + dPrm.SetAddresses(objectCore.AddressOf(obj1), objectCore.AddressOf(obj2)) + _, err = db.Delete(context.Background(), dPrm) + require.NoError(t, err) + + require.NoError(t, db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(attributeBucketName(cnr, "S3-Access-Box-CRDT-Name", make([]byte, bucketKeySize))) + require.Nil(t, b) + b = tx.Bucket(attributeBucketName(cnr, objectSDK.AttributeFilePath, make([]byte, bucketKeySize))) + require.Nil(t, b) + return nil + })) +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 087529895..0c14196b7 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -35,6 +35,8 @@ type PutPrm struct { obj *objectSDK.Object id []byte + + indexAttributes bool } // PutRes groups the resulting values of Put operation. @@ -52,6 +54,10 @@ func (p *PutPrm) SetStorageID(id []byte) { p.id = id } +func (p *PutPrm) SetIndexAttributes(v bool) { + p.indexAttributes = v +} + var ( ErrUnknownObjectType = errors.New("unknown object type") ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it") @@ -90,7 +96,7 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) { err = db.boltDB.Batch(func(tx *bbolt.Tx) error { var e error - res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch) + res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes) return e }) if err == nil { @@ -108,6 +114,7 @@ func (db *DB) put(tx *bbolt.Tx, id []byte, si *objectSDK.SplitInfo, currEpoch uint64, + indexAttributes bool, ) (PutRes, error) { cnr, ok := obj.ContainerID() if !ok { @@ -129,7 +136,7 @@ func (db *DB) put(tx *bbolt.Tx, return PutRes{}, db.updateObj(tx, obj, id, si, isParent) } - return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch) + return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes) } func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error { @@ -152,14 +159,14 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje return nil } -func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error { +func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error { if par := obj.Parent(); par != nil && !isParent { // limit depth by two parentSI, err := splitInfoFromObject(obj) if err != nil { return err } - _, err = db.put(tx, par, id, parentSI, currEpoch) + _, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes) if err != nil { return err } @@ -175,6 +182,13 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o return fmt.Errorf("can't put list indexes: %w", err) } + if indexAttributes { + err = updateFKBTIndexes(tx, obj, putFKBTIndexItem) + if err != nil { + return fmt.Errorf("can't put fake bucket tree indexes: %w", err) + } + } + // update container volume size estimation if obj.Type() == objectSDK.TypeRegular && !isParent { err = changeContainerSize(tx, cnr, obj.PayloadSize(), true) @@ -381,6 +395,50 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun return nil } +var indexedAttributes = map[string]struct{}{ + "S3-Access-Box-CRDT-Name": {}, + objectSDK.AttributeFilePath: {}, +} + +// IsAtrributeIndexed returns True if attribute is indexed by metabase. +func IsAtrributeIndexed(attr string) bool { + _, found := indexedAttributes[attr] + return found +} + +func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error { + id, _ := obj.ID() + cnr, _ := obj.ContainerID() + objKey := objectKey(id, make([]byte, objectKeySize)) + + key := make([]byte, bucketKeySize) + var attrs []objectSDK.Attribute + if obj.ECHeader() != nil { + attrs = obj.ECHeader().ParentAttributes() + objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize)) + } else { + attrs = obj.Attributes() + } + + // user specified attributes + for i := range attrs { + if !IsAtrributeIndexed(attrs[i].Key()) { + continue + } + key = attributeBucketName(cnr, attrs[i].Key(), key) + err := f(tx, namedBucketItem{ + name: key, + key: []byte(attrs[i].Value()), + val: objKey, + }) + if err != nil { + return err + } + } + + return nil +} + func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) { attributes := obj.Attributes() if ech := obj.ECHeader(); ech != nil { @@ -425,6 +483,20 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil }) } +func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { + bkt, err := createBucketLikelyExists(tx, item.name) + if err != nil { + return fmt.Errorf("can't create index %v: %w", item.name, err) + } + + fkbtRoot, err := createBucketLikelyExists(bkt, item.key) + if err != nil { + return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err) + } + + return fkbtRoot.Put(item.val, zeroValue) +} + func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error { bkt, err := createBucketLikelyExists(tx, item.name) if err != nil { diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index eef7210dc..4679de332 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -99,7 +99,6 @@ const ( // userAttributePrefix was used for prefixing FKBT index buckets containing objects. // Key: attribute value // Value: bucket containing object IDs as keys - // removed in version 3 userAttributePrefix // ==================== @@ -170,6 +169,13 @@ func smallBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, smallPrefix, key) } +// attributeBucketName returns _. +func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte { + key[0] = userAttributePrefix + cnr.Encode(key[1:]) + return append(key[:bucketKeySize], attributeKey...) +} + // rootBucketName returns _root. func rootBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, rootPrefix, key) diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index d7a9e7012..24cc75154 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -17,7 +17,8 @@ import ( // PutPrm groups the parameters of Put operation. type PutPrm struct { - obj *objectSDK.Object + obj *objectSDK.Object + indexAttributes bool } // PutRes groups the resulting values of Put operation. @@ -28,6 +29,10 @@ func (p *PutPrm) SetObject(obj *objectSDK.Object) { p.obj = obj } +func (p *PutPrm) SetIndexAttributes(v bool) { + p.indexAttributes = v +} + // Put saves the object in shard. // // Returns any error encountered that @@ -84,6 +89,7 @@ func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) { var pPrm meta.PutPrm pPrm.SetObject(prm.obj) pPrm.SetStorageID(res.StorageID) + pPrm.SetIndexAttributes(prm.indexAttributes) res, err := s.metaBase.Put(ctx, pPrm) if err != nil { // may we need to handle this case in a special way