From 634792077e74190daec8f5167a12df8668c796e5 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Sat, 12 Nov 2022 16:48:44 +0300 Subject: [PATCH] [#1502] node: Store lock object on every container node Includes extending listing methods in the Storage Engine with object types. It allows tuning replication/policer algorithms: container nodes do not remove `LOCK` objects as redundant and try to fulfill `LOCK` placement on the ohter container nodes. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 2 ++ pkg/local_object_storage/engine/evacuate.go | 12 ++++--- pkg/local_object_storage/engine/list.go | 8 ++--- pkg/local_object_storage/engine/list_test.go | 16 ++++----- pkg/local_object_storage/metabase/list.go | 34 ++++++++++++------- .../metabase/list_test.go | 33 +++++++++--------- pkg/local_object_storage/shard/list.go | 6 ++-- pkg/services/policer/check.go | 28 ++++++++++----- pkg/services/policer/process.go | 8 ++--- pkg/services/policer/queue.go | 4 +-- 10 files changed, 87 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eda6fe072..b9c748e9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Changelog for NeoFS Node ### Changed - `object lock` command reads CID and OID the same way other commands do (#1971) +- `LOCK` object are stored on every container node (#1502) ### Fixed - Open FSTree in sync mode by default (#1992) @@ -30,6 +31,7 @@ Changelog for NeoFS Node - Session token's IAT and NBF checks in ACL service (#2028) - Losing meta information on request forwarding (#2040) - Assembly process triggered by a request with a bearer token (#2040) +- Losing locking context after metabase resync (#1502) ### Removed ### Updated diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 0a19e537f..ad989fd3e 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -136,8 +136,10 @@ mainLoop: loop: for i := range lst { + addr := lst[i].Address + var getPrm shard.GetPrm - getPrm.SetAddress(lst[i]) + getPrm.SetAddress(addr) getRes, err := sh.Get(getPrm) if err != nil { @@ -147,18 +149,18 @@ mainLoop: return res, err } - hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString()))) + hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString()))) for j := range shards { if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object()) + putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object()) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", zap.String("from", sidList[n]), zap.Stringer("to", shards[j].ID()), - zap.Stringer("addr", lst[i])) + zap.Stringer("addr", addr)) res.count++ } @@ -172,7 +174,7 @@ mainLoop: return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) } - err = prm.handler(lst[i], getRes.Object()) + err = prm.handler(addr, getRes.Object()) if err != nil { return res, err } diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 6abb9167c..0845ded92 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -3,8 +3,8 @@ package engine import ( "sort" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) // ErrEndOfListing is returned from an object listing with cursor @@ -38,12 +38,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { - addrList []oid.Address + addrList []objectcore.AddressWithType cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. -func (l ListWithCursorRes) AddressList() []oid.Address { +func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType { return l.addrList } @@ -60,7 +60,7 @@ func (l ListWithCursorRes) Cursor() *Cursor { // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { - result := make([]oid.Address, 0, prm.count) + result := make([]objectcore.AddressWithType, 0, prm.count) // 1. Get available shards and sort them. e.mtx.RLock() diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 31bbb9321..2762aa37a 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -8,7 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/stretchr/testify/require" ) @@ -24,8 +24,8 @@ func TestListWithCursor(t *testing.T) { const total = 20 - expected := make([]oid.Address, 0, total) - got := make([]oid.Address, 0, total) + expected := make([]object.AddressWithType, 0, total) + got := make([]object.AddressWithType, 0, total) for i := 0; i < total; i++ { containerID := cidtest.ID() @@ -36,7 +36,7 @@ func TestListWithCursor(t *testing.T) { _, err := e.Put(prm) require.NoError(t, err) - expected = append(expected, object.AddressOf(obj)) + expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) } expected = sortAddresses(expected) @@ -68,9 +68,9 @@ func TestListWithCursor(t *testing.T) { require.Equal(t, expected, got) } -func sortAddresses(addr []oid.Address) []oid.Address { - sort.Slice(addr, func(i, j int) bool { - return addr[i].EncodeToString() < addr[j].EncodeToString() +func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType { + sort.Slice(addrWithType, func(i, j int) bool { + return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString() }) - return addr + return addrWithType } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 21701fc64..4bd065be9 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -1,8 +1,10 @@ package meta import ( + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -38,12 +40,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) { // ListRes contains values returned from ListWithCursor operation. type ListRes struct { - addrList []oid.Address + addrList []objectcore.AddressWithType cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. -func (l ListRes) AddressList() []oid.Address { +func (l ListRes) AddressList() []objectcore.AddressWithType { return l.addrList } @@ -62,7 +64,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() - result := make([]oid.Address, 0, prm.count) + result := make([]objectcore.AddressWithType, 0, prm.count) err = db.boltDB.View(func(tx *bbolt.Tx) error { res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor) @@ -72,7 +74,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) { return res, err } -func (db *DB) listWithCursor(tx *bbolt.Tx, result []oid.Address, count int, cursor *Cursor) ([]oid.Address, *Cursor, error) { +func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { threshold := cursor == nil // threshold is a flag to ignore cursor var bucketName []byte @@ -97,12 +99,17 @@ loop: continue } + var objType object.Type + switch prefix { - case - primaryPrefix, - storageGroupPrefix, - lockersPrefix, - tombstonePrefix: + case primaryPrefix: + objType = object.TypeRegular + case storageGroupPrefix: + objType = object.TypeStorageGroup + case lockersPrefix: + objType = object.TypeLock + case tombstonePrefix: + objType = object.TypeTombstone default: continue } @@ -110,7 +117,7 @@ loop: bkt := tx.Bucket(name) if bkt != nil { copy(rawAddr, cidRaw) - result, offset, cursor = selectNFromBucket(bkt, graveyardBkt, garbageBkt, rawAddr, containerID, + result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, count, cursor, threshold) } bucketName = name @@ -145,14 +152,15 @@ loop: // selectNFromBucket similar to selectAllFromBucket but uses cursor to find // object to start selecting from. Ignores inhumed objects. func selectNFromBucket(bkt *bbolt.Bucket, // main bucket + objType object.Type, // type of the objects stored in the main bucket graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets cidRaw []byte, // container ID prefix, optimization cnt cid.ID, // container ID - to []oid.Address, // listing result + to []objectcore.AddressWithType, // listing result limit int, // stop listing at `limit` items in result cursor *Cursor, // start from cursor object threshold bool, // ignore cursor and start immediately -) ([]oid.Address, []byte, *Cursor) { +) ([]objectcore.AddressWithType, []byte, *Cursor) { if cursor == nil { cursor = new(Cursor) } @@ -186,7 +194,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket var a oid.Address a.SetContainer(cnt) a.SetObject(obj) - to = append(to, a) + to = append(to, objectcore.AddressWithType{Address: a, Type: objType}) count++ } diff --git a/pkg/local_object_storage/metabase/list_test.go b/pkg/local_object_storage/metabase/list_test.go index bef33eb13..3a325afa2 100644 --- a/pkg/local_object_storage/metabase/list_test.go +++ b/pkg/local_object_storage/metabase/list_test.go @@ -9,7 +9,6 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" @@ -73,7 +72,7 @@ func TestLisObjectsWithCursor(t *testing.T) { total = containers * 5 // regular + ts + sg + child + lock ) - expected := make([]oid.Address, 0, total) + expected := make([]object.AddressWithType, 0, total) // fill metabase with objects for i := 0; i < containers; i++ { @@ -84,28 +83,28 @@ func TestLisObjectsWithCursor(t *testing.T) { obj.SetType(objectSDK.TypeRegular) err := putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressOf(obj)) + expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular}) // add one tombstone obj = generateObjectWithCID(t, containerID) obj.SetType(objectSDK.TypeTombstone) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressOf(obj)) + expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone}) // add one storage group obj = generateObjectWithCID(t, containerID) obj.SetType(objectSDK.TypeStorageGroup) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressOf(obj)) + expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeStorageGroup}) // add one lock obj = generateObjectWithCID(t, containerID) obj.SetType(objectSDK.TypeLock) err = putBig(db, obj) require.NoError(t, err) - expected = append(expected, object.AddressOf(obj)) + expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock}) // add one inhumed (do not include into expected) obj = generateObjectWithCID(t, containerID) @@ -127,14 +126,14 @@ func TestLisObjectsWithCursor(t *testing.T) { child.SetSplitID(splitID) err = putBig(db, child) require.NoError(t, err) - expected = append(expected, object.AddressOf(child)) + expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular}) } expected = sortAddresses(expected) t.Run("success with various count", func(t *testing.T) { for countPerReq := 1; countPerReq <= total; countPerReq++ { - got := make([]oid.Address, 0, total) + got := make([]object.AddressWithType, 0, total) res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil) require.NoError(t, err, "count:%d", countPerReq) @@ -184,8 +183,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) { got, cursor, err := metaListWithCursor(db, total/2, nil) require.NoError(t, err) for _, obj := range got { - if _, ok := expected[obj.EncodeToString()]; ok { - expected[obj.EncodeToString()]++ + if _, ok := expected[obj.Address.EncodeToString()]; ok { + expected[obj.Address.EncodeToString()]++ } } @@ -203,8 +202,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) { break } for _, obj := range got { - if _, ok := expected[obj.EncodeToString()]; ok { - expected[obj.EncodeToString()]++ + if _, ok := expected[obj.Address.EncodeToString()]; ok { + expected[obj.Address.EncodeToString()]++ } } } @@ -216,14 +215,14 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) { } -func sortAddresses(addr []oid.Address) []oid.Address { - sort.Slice(addr, func(i, j int) bool { - return addr[i].EncodeToString() < addr[j].EncodeToString() +func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType { + sort.Slice(addrWithType, func(i, j int) bool { + return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString() }) - return addr + return addrWithType } -func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]oid.Address, *meta.Cursor, error) { +func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) { var listPrm meta.ListPrm listPrm.SetCount(count) listPrm.SetCursor(cursor) diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 5fc4a7d98..eedd4a019 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -3,10 +3,10 @@ package shard import ( "fmt" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -36,7 +36,7 @@ type ListWithCursorPrm struct { // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { - addrList []oid.Address + addrList []objectcore.AddressWithType cursor *Cursor } @@ -53,7 +53,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { } // AddressList returns addresses selected by ListWithCursor operation. -func (r ListWithCursorRes) AddressList() []oid.Address { +func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType { return r.addrList } diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 459133f8a..fa4a3f106 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -5,13 +5,14 @@ import ( "errors" "github.com/nspcc-dev/neofs-node/pkg/core/container" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/netmap" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" ) @@ -64,8 +65,10 @@ func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) { n.submitReplicaHolder(node) } -func (p *Policer) processObject(ctx context.Context, addr oid.Address) { +func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) { + addr := addrWithType.Address idCnr := addr.Container() + idObj := addr.Object() cnr, err := p.cnrSrc.Get(idCnr) if err != nil { @@ -75,14 +78,14 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) { ) if container.IsErrNotFound(err) { var prm engine.InhumePrm - prm.MarkAsGarbage(addr) + prm.MarkAsGarbage(addrWithType.Address) prm.WithForceRemoval() _, err := p.jobQueue.localStorage.Inhume(prm) if err != nil { p.log.Error("could not inhume object with missing container", zap.Stringer("cid", idCnr), - zap.Stringer("oid", addr.Object()), + zap.Stringer("oid", idObj), zap.String("error", err.Error())) } } @@ -91,9 +94,8 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) { } policy := cnr.Value.PlacementPolicy() - obj := addr.Object() - nn, err := p.placementBuilder.BuildPlacement(idCnr, &obj, policy) + nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) if err != nil { p.log.Error("could not build placement vector for object", zap.Stringer("cid", idCnr), @@ -122,7 +124,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) { default: } - p.processNodes(c, addr, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes) + p.processNodes(c, addrWithType, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes) } if !c.needLocalCopy { @@ -140,8 +142,10 @@ type processPlacementContext struct { needLocalCopy bool } -func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, +func (p *Policer) processNodes(ctx *processPlacementContext, addrWithType objectcore.AddressWithType, nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) { + addr := addrWithType.Address + typ := addrWithType.Type prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) // Number of copies that are stored on maintenance nodes. @@ -162,6 +166,14 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address, ) } + if typ == object.TypeLock { + // all nodes of a container must store the `LOCK` objects + // for correct object removal protection: + // - `LOCK` objects are broadcast on their PUT requests; + // - `LOCK` object removal is a prohibited action in the GC. + shortage = uint32(len(nodes)) + } + for i := 0; shortage > 0 && i < len(nodes); i++ { select { case <-ctx.Done(): diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index ca360968e..bab1b9e18 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -5,8 +5,8 @@ import ( "errors" "time" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -21,7 +21,7 @@ func (p *Policer) Run(ctx context.Context) { func (p *Policer) shardPolicyWorker(ctx context.Context) { var ( - addrs []oid.Address + addrs []objectcore.AddressWithType cursor *engine.Cursor err error ) @@ -47,7 +47,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { case <-ctx.Done(): return default: - addr := addrs[i] + addr := addrs[i].Address if p.objsInWork.inWork(addr) { // do not process an object // that is in work @@ -62,7 +62,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { p.objsInWork.add(addr) - p.processObject(ctx, addr) + p.processObject(ctx, addrs[i]) p.cache.Add(addr, time.Now()) p.objsInWork.remove(addr) diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go index 31e2ee0d2..4b2cc4170 100644 --- a/pkg/services/policer/queue.go +++ b/pkg/services/policer/queue.go @@ -3,15 +3,15 @@ package policer import ( "fmt" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) type jobQueue struct { localStorage *engine.StorageEngine } -func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]oid.Address, *engine.Cursor, error) { +func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) { var prm engine.ListWithCursorPrm prm.WithCursor(cursor) prm.WithCount(count)