[#1502] node: Store lock object on every container node

Includes extending listing methods in the Storage Engine with object types.
It allows tuning replication/policer algorithms: container nodes do
not remove `LOCK` objects as redundant and try to fulfill `LOCK` placement
on the ohter container nodes.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-11-12 16:48:44 +03:00 committed by fyrchik
parent fe09cd9c70
commit 634792077e
10 changed files with 87 additions and 64 deletions

View file

@ -13,6 +13,7 @@ Changelog for NeoFS Node
### Changed
- `object lock` command reads CID and OID the same way other commands do (#1971)
- `LOCK` object are stored on every container node (#1502)
### Fixed
- Open FSTree in sync mode by default (#1992)
@ -30,6 +31,7 @@ Changelog for NeoFS Node
- Session token's IAT and NBF checks in ACL service (#2028)
- Losing meta information on request forwarding (#2040)
- Assembly process triggered by a request with a bearer token (#2040)
- Losing locking context after metabase resync (#1502)
### Removed
### Updated

View file

@ -136,8 +136,10 @@ mainLoop:
loop:
for i := range lst {
addr := lst[i].Address
var getPrm shard.GetPrm
getPrm.SetAddress(lst[i])
getPrm.SetAddress(addr)
getRes, err := sh.Get(getPrm)
if err != nil {
@ -147,18 +149,18 @@ mainLoop:
return res, err
}
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString())))
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object())
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object())
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
zap.String("from", sidList[n]),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", lst[i]))
zap.Stringer("addr", addr))
res.count++
}
@ -172,7 +174,7 @@ mainLoop:
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
}
err = prm.handler(lst[i], getRes.Object())
err = prm.handler(addr, getRes.Object())
if err != nil {
return res, err
}

View file

@ -3,8 +3,8 @@ package engine
import (
"sort"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)
// ErrEndOfListing is returned from an object listing with cursor
@ -38,12 +38,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []oid.Address
addrList []objectcore.AddressWithType
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []oid.Address {
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
return l.addrList
}
@ -60,7 +60,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]oid.Address, 0, prm.count)
result := make([]objectcore.AddressWithType, 0, prm.count)
// 1. Get available shards and sort them.
e.mtx.RLock()

View file

@ -8,7 +8,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/object"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/stretchr/testify/require"
)
@ -24,8 +24,8 @@ func TestListWithCursor(t *testing.T) {
const total = 20
expected := make([]oid.Address, 0, total)
got := make([]oid.Address, 0, total)
expected := make([]object.AddressWithType, 0, total)
got := make([]object.AddressWithType, 0, total)
for i := 0; i < total; i++ {
containerID := cidtest.ID()
@ -36,7 +36,7 @@ func TestListWithCursor(t *testing.T) {
_, err := e.Put(prm)
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
}
expected = sortAddresses(expected)
@ -68,9 +68,9 @@ func TestListWithCursor(t *testing.T) {
require.Equal(t, expected, got)
}
func sortAddresses(addr []oid.Address) []oid.Address {
sort.Slice(addr, func(i, j int) bool {
return addr[i].EncodeToString() < addr[j].EncodeToString()
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
sort.Slice(addrWithType, func(i, j int) bool {
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
})
return addr
return addrWithType
}

View file

@ -1,8 +1,10 @@
package meta
import (
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
@ -38,12 +40,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
// ListRes contains values returned from ListWithCursor operation.
type ListRes struct {
addrList []oid.Address
addrList []objectcore.AddressWithType
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListRes) AddressList() []oid.Address {
func (l ListRes) AddressList() []objectcore.AddressWithType {
return l.addrList
}
@ -62,7 +64,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
result := make([]oid.Address, 0, prm.count)
result := make([]objectcore.AddressWithType, 0, prm.count)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
@ -72,7 +74,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
return res, err
}
func (db *DB) listWithCursor(tx *bbolt.Tx, result []oid.Address, count int, cursor *Cursor) ([]oid.Address, *Cursor, error) {
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte
@ -97,12 +99,17 @@ loop:
continue
}
var objType object.Type
switch prefix {
case
primaryPrefix,
storageGroupPrefix,
lockersPrefix,
tombstonePrefix:
case primaryPrefix:
objType = object.TypeRegular
case storageGroupPrefix:
objType = object.TypeStorageGroup
case lockersPrefix:
objType = object.TypeLock
case tombstonePrefix:
objType = object.TypeTombstone
default:
continue
}
@ -110,7 +117,7 @@ loop:
bkt := tx.Bucket(name)
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor = selectNFromBucket(bkt, graveyardBkt, garbageBkt, rawAddr, containerID,
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold)
}
bucketName = name
@ -145,14 +152,15 @@ loop:
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
// object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
objType object.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID
to []oid.Address, // listing result
to []objectcore.AddressWithType, // listing result
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately
) ([]oid.Address, []byte, *Cursor) {
) ([]objectcore.AddressWithType, []byte, *Cursor) {
if cursor == nil {
cursor = new(Cursor)
}
@ -186,7 +194,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
var a oid.Address
a.SetContainer(cnt)
a.SetObject(obj)
to = append(to, a)
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
count++
}

View file

@ -9,7 +9,6 @@ import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
@ -73,7 +72,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
total = containers * 5 // regular + ts + sg + child + lock
)
expected := make([]oid.Address, 0, total)
expected := make([]object.AddressWithType, 0, total)
// fill metabase with objects
for i := 0; i < containers; i++ {
@ -84,28 +83,28 @@ func TestLisObjectsWithCursor(t *testing.T) {
obj.SetType(objectSDK.TypeRegular)
err := putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
// add one tombstone
obj = generateObjectWithCID(t, containerID)
obj.SetType(objectSDK.TypeTombstone)
err = putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
// add one storage group
obj = generateObjectWithCID(t, containerID)
obj.SetType(objectSDK.TypeStorageGroup)
err = putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeStorageGroup})
// add one lock
obj = generateObjectWithCID(t, containerID)
obj.SetType(objectSDK.TypeLock)
err = putBig(db, obj)
require.NoError(t, err)
expected = append(expected, object.AddressOf(obj))
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
// add one inhumed (do not include into expected)
obj = generateObjectWithCID(t, containerID)
@ -127,14 +126,14 @@ func TestLisObjectsWithCursor(t *testing.T) {
child.SetSplitID(splitID)
err = putBig(db, child)
require.NoError(t, err)
expected = append(expected, object.AddressOf(child))
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
}
expected = sortAddresses(expected)
t.Run("success with various count", func(t *testing.T) {
for countPerReq := 1; countPerReq <= total; countPerReq++ {
got := make([]oid.Address, 0, total)
got := make([]object.AddressWithType, 0, total)
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
require.NoError(t, err, "count:%d", countPerReq)
@ -184,8 +183,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
got, cursor, err := metaListWithCursor(db, total/2, nil)
require.NoError(t, err)
for _, obj := range got {
if _, ok := expected[obj.EncodeToString()]; ok {
expected[obj.EncodeToString()]++
if _, ok := expected[obj.Address.EncodeToString()]; ok {
expected[obj.Address.EncodeToString()]++
}
}
@ -203,8 +202,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
break
}
for _, obj := range got {
if _, ok := expected[obj.EncodeToString()]; ok {
expected[obj.EncodeToString()]++
if _, ok := expected[obj.Address.EncodeToString()]; ok {
expected[obj.Address.EncodeToString()]++
}
}
}
@ -216,14 +215,14 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
}
func sortAddresses(addr []oid.Address) []oid.Address {
sort.Slice(addr, func(i, j int) bool {
return addr[i].EncodeToString() < addr[j].EncodeToString()
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
sort.Slice(addrWithType, func(i, j int) bool {
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
})
return addr
return addrWithType
}
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]oid.Address, *meta.Cursor, error) {
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) {
var listPrm meta.ListPrm
listPrm.SetCount(count)
listPrm.SetCursor(cursor)

View file

@ -3,10 +3,10 @@ package shard
import (
"fmt"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -36,7 +36,7 @@ type ListWithCursorPrm struct {
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []oid.Address
addrList []objectcore.AddressWithType
cursor *Cursor
}
@ -53,7 +53,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
}
// AddressList returns addresses selected by ListWithCursor operation.
func (r ListWithCursorRes) AddressList() []oid.Address {
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType {
return r.addrList
}

View file

@ -5,13 +5,14 @@ import (
"errors"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
"github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)
@ -64,8 +65,10 @@ func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}
func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
addr := addrWithType.Address
idCnr := addr.Container()
idObj := addr.Object()
cnr, err := p.cnrSrc.Get(idCnr)
if err != nil {
@ -75,14 +78,14 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
)
if container.IsErrNotFound(err) {
var prm engine.InhumePrm
prm.MarkAsGarbage(addr)
prm.MarkAsGarbage(addrWithType.Address)
prm.WithForceRemoval()
_, err := p.jobQueue.localStorage.Inhume(prm)
if err != nil {
p.log.Error("could not inhume object with missing container",
zap.Stringer("cid", idCnr),
zap.Stringer("oid", addr.Object()),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
}
}
@ -91,9 +94,8 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
}
policy := cnr.Value.PlacementPolicy()
obj := addr.Object()
nn, err := p.placementBuilder.BuildPlacement(idCnr, &obj, policy)
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
if err != nil {
p.log.Error("could not build placement vector for object",
zap.Stringer("cid", idCnr),
@ -122,7 +124,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
default:
}
p.processNodes(c, addr, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes)
p.processNodes(c, addrWithType, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes)
}
if !c.needLocalCopy {
@ -140,8 +142,10 @@ type processPlacementContext struct {
needLocalCopy bool
}
func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
func (p *Policer) processNodes(ctx *processPlacementContext, addrWithType objectcore.AddressWithType,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
addr := addrWithType.Address
typ := addrWithType.Type
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
// Number of copies that are stored on maintenance nodes.
@ -162,6 +166,14 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
)
}
if typ == object.TypeLock {
// all nodes of a container must store the `LOCK` objects
// for correct object removal protection:
// - `LOCK` objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nodes))
}
for i := 0; shortage > 0 && i < len(nodes); i++ {
select {
case <-ctx.Done():

View file

@ -5,8 +5,8 @@ import (
"errors"
"time"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -21,7 +21,7 @@ func (p *Policer) Run(ctx context.Context) {
func (p *Policer) shardPolicyWorker(ctx context.Context) {
var (
addrs []oid.Address
addrs []objectcore.AddressWithType
cursor *engine.Cursor
err error
)
@ -47,7 +47,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
case <-ctx.Done():
return
default:
addr := addrs[i]
addr := addrs[i].Address
if p.objsInWork.inWork(addr) {
// do not process an object
// that is in work
@ -62,7 +62,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
p.objsInWork.add(addr)
p.processObject(ctx, addr)
p.processObject(ctx, addrs[i])
p.cache.Add(addr, time.Now())
p.objsInWork.remove(addr)

View file

@ -3,15 +3,15 @@ package policer
import (
"fmt"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)
type jobQueue struct {
localStorage *engine.StorageEngine
}
func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]oid.Address, *engine.Cursor, error) {
func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) {
var prm engine.ListWithCursorPrm
prm.WithCursor(cursor)
prm.WithCount(count)