forked from TrueCloudLab/frostfs-node
[#1502] node: Store lock object on every container node
Includes extending listing methods in the Storage Engine with object types. It allows tuning replication/policer algorithms: container nodes do not remove `LOCK` objects as redundant and try to fulfill `LOCK` placement on the ohter container nodes. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
114018a7bd
commit
19a6ca7896
10 changed files with 87 additions and 64 deletions
|
@ -11,6 +11,7 @@ Changelog for NeoFS Node
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- `object lock` command reads CID and OID the same way other commands do (#1971)
|
- `object lock` command reads CID and OID the same way other commands do (#1971)
|
||||||
|
- `LOCK` object are stored on every container node (#1502)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
- Open FSTree in sync mode by default (#1992)
|
- Open FSTree in sync mode by default (#1992)
|
||||||
|
@ -28,6 +29,7 @@ Changelog for NeoFS Node
|
||||||
- Session token's IAT and NBF checks in ACL service (#2028)
|
- Session token's IAT and NBF checks in ACL service (#2028)
|
||||||
- Losing meta information on request forwarding (#2040)
|
- Losing meta information on request forwarding (#2040)
|
||||||
- Assembly process triggered by a request with a bearer token (#2040)
|
- Assembly process triggered by a request with a bearer token (#2040)
|
||||||
|
- Losing locking context after metabase resync (#1502)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
### Updated
|
### Updated
|
||||||
|
|
|
@ -136,8 +136,10 @@ mainLoop:
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for i := range lst {
|
for i := range lst {
|
||||||
|
addr := lst[i].Address
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm shard.GetPrm
|
||||||
getPrm.SetAddress(lst[i])
|
getPrm.SetAddress(addr)
|
||||||
|
|
||||||
getRes, err := sh.Get(getPrm)
|
getRes, err := sh.Get(getPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -147,18 +149,18 @@ mainLoop:
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString())))
|
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
||||||
for j := range shards {
|
for j := range shards {
|
||||||
if _, ok := shardMap[shards[j].ID().String()]; ok {
|
if _, ok := shardMap[shards[j].ID().String()]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object())
|
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object())
|
||||||
if putDone || exists {
|
if putDone || exists {
|
||||||
if putDone {
|
if putDone {
|
||||||
e.log.Debug("object is moved to another shard",
|
e.log.Debug("object is moved to another shard",
|
||||||
zap.String("from", sidList[n]),
|
zap.String("from", sidList[n]),
|
||||||
zap.Stringer("to", shards[j].ID()),
|
zap.Stringer("to", shards[j].ID()),
|
||||||
zap.Stringer("addr", lst[i]))
|
zap.Stringer("addr", addr))
|
||||||
|
|
||||||
res.count++
|
res.count++
|
||||||
}
|
}
|
||||||
|
@ -172,7 +174,7 @@ mainLoop:
|
||||||
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
|
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
err = prm.handler(lst[i], getRes.Object())
|
err = prm.handler(addr, getRes.Object())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@ package engine
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrEndOfListing is returned from an object listing with cursor
|
// ErrEndOfListing is returned from an object listing with cursor
|
||||||
|
@ -38,12 +38,12 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []oid.Address
|
addrList []objectcore.AddressWithType
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (l ListWithCursorRes) AddressList() []oid.Address {
|
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
||||||
return l.addrList
|
return l.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
// parameter set to zero.
|
// parameter set to zero.
|
||||||
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||||
result := make([]oid.Address, 0, prm.count)
|
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||||
|
|
||||||
// 1. Get available shards and sort them.
|
// 1. Get available shards and sort them.
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,8 +24,8 @@ func TestListWithCursor(t *testing.T) {
|
||||||
|
|
||||||
const total = 20
|
const total = 20
|
||||||
|
|
||||||
expected := make([]oid.Address, 0, total)
|
expected := make([]object.AddressWithType, 0, total)
|
||||||
got := make([]oid.Address, 0, total)
|
got := make([]object.AddressWithType, 0, total)
|
||||||
|
|
||||||
for i := 0; i < total; i++ {
|
for i := 0; i < total; i++ {
|
||||||
containerID := cidtest.ID()
|
containerID := cidtest.ID()
|
||||||
|
@ -36,7 +36,7 @@ func TestListWithCursor(t *testing.T) {
|
||||||
|
|
||||||
_, err := e.Put(prm)
|
_, err := e.Put(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressOf(obj))
|
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||||
}
|
}
|
||||||
|
|
||||||
expected = sortAddresses(expected)
|
expected = sortAddresses(expected)
|
||||||
|
@ -68,9 +68,9 @@ func TestListWithCursor(t *testing.T) {
|
||||||
require.Equal(t, expected, got)
|
require.Equal(t, expected, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sortAddresses(addr []oid.Address) []oid.Address {
|
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
|
||||||
sort.Slice(addr, func(i, j int) bool {
|
sort.Slice(addrWithType, func(i, j int) bool {
|
||||||
return addr[i].EncodeToString() < addr[j].EncodeToString()
|
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
|
||||||
})
|
})
|
||||||
return addr
|
return addrWithType
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -38,12 +40,12 @@ func (l *ListPrm) SetCursor(cursor *Cursor) {
|
||||||
|
|
||||||
// ListRes contains values returned from ListWithCursor operation.
|
// ListRes contains values returned from ListWithCursor operation.
|
||||||
type ListRes struct {
|
type ListRes struct {
|
||||||
addrList []oid.Address
|
addrList []objectcore.AddressWithType
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (l ListRes) AddressList() []oid.Address {
|
func (l ListRes) AddressList() []objectcore.AddressWithType {
|
||||||
return l.addrList
|
return l.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +64,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
result := make([]oid.Address, 0, prm.count)
|
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||||
|
|
||||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
||||||
|
@ -72,7 +74,7 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []oid.Address, count int, cursor *Cursor) ([]oid.Address, *Cursor, error) {
|
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
||||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
threshold := cursor == nil // threshold is a flag to ignore cursor
|
||||||
var bucketName []byte
|
var bucketName []byte
|
||||||
|
|
||||||
|
@ -97,12 +99,17 @@ loop:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var objType object.Type
|
||||||
|
|
||||||
switch prefix {
|
switch prefix {
|
||||||
case
|
case primaryPrefix:
|
||||||
primaryPrefix,
|
objType = object.TypeRegular
|
||||||
storageGroupPrefix,
|
case storageGroupPrefix:
|
||||||
lockersPrefix,
|
objType = object.TypeStorageGroup
|
||||||
tombstonePrefix:
|
case lockersPrefix:
|
||||||
|
objType = object.TypeLock
|
||||||
|
case tombstonePrefix:
|
||||||
|
objType = object.TypeTombstone
|
||||||
default:
|
default:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -110,7 +117,7 @@ loop:
|
||||||
bkt := tx.Bucket(name)
|
bkt := tx.Bucket(name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor = selectNFromBucket(bkt, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
||||||
result, count, cursor, threshold)
|
result, count, cursor, threshold)
|
||||||
}
|
}
|
||||||
bucketName = name
|
bucketName = name
|
||||||
|
@ -145,14 +152,15 @@ loop:
|
||||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||||
// object to start selecting from. Ignores inhumed objects.
|
// object to start selecting from. Ignores inhumed objects.
|
||||||
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
|
objType object.Type, // type of the objects stored in the main bucket
|
||||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
cnt cid.ID, // container ID
|
cnt cid.ID, // container ID
|
||||||
to []oid.Address, // listing result
|
to []objectcore.AddressWithType, // listing result
|
||||||
limit int, // stop listing at `limit` items in result
|
limit int, // stop listing at `limit` items in result
|
||||||
cursor *Cursor, // start from cursor object
|
cursor *Cursor, // start from cursor object
|
||||||
threshold bool, // ignore cursor and start immediately
|
threshold bool, // ignore cursor and start immediately
|
||||||
) ([]oid.Address, []byte, *Cursor) {
|
) ([]objectcore.AddressWithType, []byte, *Cursor) {
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
@ -186,7 +194,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
var a oid.Address
|
var a oid.Address
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, a)
|
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -73,7 +72,7 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
total = containers * 5 // regular + ts + sg + child + lock
|
total = containers * 5 // regular + ts + sg + child + lock
|
||||||
)
|
)
|
||||||
|
|
||||||
expected := make([]oid.Address, 0, total)
|
expected := make([]object.AddressWithType, 0, total)
|
||||||
|
|
||||||
// fill metabase with objects
|
// fill metabase with objects
|
||||||
for i := 0; i < containers; i++ {
|
for i := 0; i < containers; i++ {
|
||||||
|
@ -84,28 +83,28 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
obj.SetType(objectSDK.TypeRegular)
|
obj.SetType(objectSDK.TypeRegular)
|
||||||
err := putBig(db, obj)
|
err := putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressOf(obj))
|
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeRegular})
|
||||||
|
|
||||||
// add one tombstone
|
// add one tombstone
|
||||||
obj = generateObjectWithCID(t, containerID)
|
obj = generateObjectWithCID(t, containerID)
|
||||||
obj.SetType(objectSDK.TypeTombstone)
|
obj.SetType(objectSDK.TypeTombstone)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressOf(obj))
|
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeTombstone})
|
||||||
|
|
||||||
// add one storage group
|
// add one storage group
|
||||||
obj = generateObjectWithCID(t, containerID)
|
obj = generateObjectWithCID(t, containerID)
|
||||||
obj.SetType(objectSDK.TypeStorageGroup)
|
obj.SetType(objectSDK.TypeStorageGroup)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressOf(obj))
|
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeStorageGroup})
|
||||||
|
|
||||||
// add one lock
|
// add one lock
|
||||||
obj = generateObjectWithCID(t, containerID)
|
obj = generateObjectWithCID(t, containerID)
|
||||||
obj.SetType(objectSDK.TypeLock)
|
obj.SetType(objectSDK.TypeLock)
|
||||||
err = putBig(db, obj)
|
err = putBig(db, obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressOf(obj))
|
expected = append(expected, object.AddressWithType{Address: object.AddressOf(obj), Type: objectSDK.TypeLock})
|
||||||
|
|
||||||
// add one inhumed (do not include into expected)
|
// add one inhumed (do not include into expected)
|
||||||
obj = generateObjectWithCID(t, containerID)
|
obj = generateObjectWithCID(t, containerID)
|
||||||
|
@ -127,14 +126,14 @@ func TestLisObjectsWithCursor(t *testing.T) {
|
||||||
child.SetSplitID(splitID)
|
child.SetSplitID(splitID)
|
||||||
err = putBig(db, child)
|
err = putBig(db, child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressOf(child))
|
expected = append(expected, object.AddressWithType{Address: object.AddressOf(child), Type: objectSDK.TypeRegular})
|
||||||
}
|
}
|
||||||
|
|
||||||
expected = sortAddresses(expected)
|
expected = sortAddresses(expected)
|
||||||
|
|
||||||
t.Run("success with various count", func(t *testing.T) {
|
t.Run("success with various count", func(t *testing.T) {
|
||||||
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
for countPerReq := 1; countPerReq <= total; countPerReq++ {
|
||||||
got := make([]oid.Address, 0, total)
|
got := make([]object.AddressWithType, 0, total)
|
||||||
|
|
||||||
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
res, cursor, err := metaListWithCursor(db, uint32(countPerReq), nil)
|
||||||
require.NoError(t, err, "count:%d", countPerReq)
|
require.NoError(t, err, "count:%d", countPerReq)
|
||||||
|
@ -184,8 +183,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
||||||
got, cursor, err := metaListWithCursor(db, total/2, nil)
|
got, cursor, err := metaListWithCursor(db, total/2, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, obj := range got {
|
for _, obj := range got {
|
||||||
if _, ok := expected[obj.EncodeToString()]; ok {
|
if _, ok := expected[obj.Address.EncodeToString()]; ok {
|
||||||
expected[obj.EncodeToString()]++
|
expected[obj.Address.EncodeToString()]++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -203,8 +202,8 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
for _, obj := range got {
|
for _, obj := range got {
|
||||||
if _, ok := expected[obj.EncodeToString()]; ok {
|
if _, ok := expected[obj.Address.EncodeToString()]; ok {
|
||||||
expected[obj.EncodeToString()]++
|
expected[obj.Address.EncodeToString()]++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -216,14 +215,14 @@ func TestAddObjectDuringListingWithCursor(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func sortAddresses(addr []oid.Address) []oid.Address {
|
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
|
||||||
sort.Slice(addr, func(i, j int) bool {
|
sort.Slice(addrWithType, func(i, j int) bool {
|
||||||
return addr[i].EncodeToString() < addr[j].EncodeToString()
|
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
|
||||||
})
|
})
|
||||||
return addr
|
return addrWithType
|
||||||
}
|
}
|
||||||
|
|
||||||
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]oid.Address, *meta.Cursor, error) {
|
func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]object.AddressWithType, *meta.Cursor, error) {
|
||||||
var listPrm meta.ListPrm
|
var listPrm meta.ListPrm
|
||||||
listPrm.SetCount(count)
|
listPrm.SetCount(count)
|
||||||
listPrm.SetCursor(cursor)
|
listPrm.SetCursor(cursor)
|
||||||
|
|
|
@ -3,10 +3,10 @@ package shard
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ type ListWithCursorPrm struct {
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []oid.Address
|
addrList []objectcore.AddressWithType
|
||||||
cursor *Cursor
|
cursor *Cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressList returns addresses selected by ListWithCursor operation.
|
// AddressList returns addresses selected by ListWithCursor operation.
|
||||||
func (r ListWithCursorRes) AddressList() []oid.Address {
|
func (r ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
||||||
return r.addrList
|
return r.addrList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,13 +5,14 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -64,8 +65,10 @@ func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
|
||||||
n.submitReplicaHolder(node)
|
n.submitReplicaHolder(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
|
||||||
|
addr := addrWithType.Address
|
||||||
idCnr := addr.Container()
|
idCnr := addr.Container()
|
||||||
|
idObj := addr.Object()
|
||||||
|
|
||||||
cnr, err := p.cnrSrc.Get(idCnr)
|
cnr, err := p.cnrSrc.Get(idCnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -75,14 +78,14 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
||||||
)
|
)
|
||||||
if container.IsErrNotFound(err) {
|
if container.IsErrNotFound(err) {
|
||||||
var prm engine.InhumePrm
|
var prm engine.InhumePrm
|
||||||
prm.MarkAsGarbage(addr)
|
prm.MarkAsGarbage(addrWithType.Address)
|
||||||
prm.WithForceRemoval()
|
prm.WithForceRemoval()
|
||||||
|
|
||||||
_, err := p.jobQueue.localStorage.Inhume(prm)
|
_, err := p.jobQueue.localStorage.Inhume(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error("could not inhume object with missing container",
|
p.log.Error("could not inhume object with missing container",
|
||||||
zap.Stringer("cid", idCnr),
|
zap.Stringer("cid", idCnr),
|
||||||
zap.Stringer("oid", addr.Object()),
|
zap.Stringer("oid", idObj),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,9 +94,8 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
||||||
}
|
}
|
||||||
|
|
||||||
policy := cnr.Value.PlacementPolicy()
|
policy := cnr.Value.PlacementPolicy()
|
||||||
obj := addr.Object()
|
|
||||||
|
|
||||||
nn, err := p.placementBuilder.BuildPlacement(idCnr, &obj, policy)
|
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error("could not build placement vector for object",
|
p.log.Error("could not build placement vector for object",
|
||||||
zap.Stringer("cid", idCnr),
|
zap.Stringer("cid", idCnr),
|
||||||
|
@ -122,7 +124,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
p.processNodes(c, addr, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes)
|
p.processNodes(c, addrWithType, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !c.needLocalCopy {
|
if !c.needLocalCopy {
|
||||||
|
@ -140,8 +142,10 @@ type processPlacementContext struct {
|
||||||
needLocalCopy bool
|
needLocalCopy bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
|
func (p *Policer) processNodes(ctx *processPlacementContext, addrWithType objectcore.AddressWithType,
|
||||||
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
|
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
|
||||||
|
addr := addrWithType.Address
|
||||||
|
typ := addrWithType.Type
|
||||||
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
|
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
|
||||||
|
|
||||||
// Number of copies that are stored on maintenance nodes.
|
// Number of copies that are stored on maintenance nodes.
|
||||||
|
@ -162,6 +166,14 @@ func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if typ == object.TypeLock {
|
||||||
|
// all nodes of a container must store the `LOCK` objects
|
||||||
|
// for correct object removal protection:
|
||||||
|
// - `LOCK` objects are broadcast on their PUT requests;
|
||||||
|
// - `LOCK` object removal is a prohibited action in the GC.
|
||||||
|
shortage = uint32(len(nodes))
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
for i := 0; shortage > 0 && i < len(nodes); i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -5,8 +5,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ func (p *Policer) Run(ctx context.Context) {
|
||||||
|
|
||||||
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
var (
|
var (
|
||||||
addrs []oid.Address
|
addrs []objectcore.AddressWithType
|
||||||
cursor *engine.Cursor
|
cursor *engine.Cursor
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
@ -47,7 +47,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
addr := addrs[i]
|
addr := addrs[i].Address
|
||||||
if p.objsInWork.inWork(addr) {
|
if p.objsInWork.inWork(addr) {
|
||||||
// do not process an object
|
// do not process an object
|
||||||
// that is in work
|
// that is in work
|
||||||
|
@ -62,7 +62,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
|
|
||||||
p.objsInWork.add(addr)
|
p.objsInWork.add(addr)
|
||||||
|
|
||||||
p.processObject(ctx, addr)
|
p.processObject(ctx, addrs[i])
|
||||||
|
|
||||||
p.cache.Add(addr, time.Now())
|
p.cache.Add(addr, time.Now())
|
||||||
p.objsInWork.remove(addr)
|
p.objsInWork.remove(addr)
|
||||||
|
|
|
@ -3,15 +3,15 @@ package policer
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type jobQueue struct {
|
type jobQueue struct {
|
||||||
localStorage *engine.StorageEngine
|
localStorage *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]oid.Address, *engine.Cursor, error) {
|
func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) {
|
||||||
var prm engine.ListWithCursorPrm
|
var prm engine.ListWithCursorPrm
|
||||||
prm.WithCursor(cursor)
|
prm.WithCursor(cursor)
|
||||||
prm.WithCount(count)
|
prm.WithCount(count)
|
||||||
|
|
Loading…
Reference in a new issue