frostfs-node/pkg/local_object_storage/engine/list.go
Pavel Karpy 19a6ca7896 [#1502] node: Store lock object on every container node
Includes extending listing methods in the Storage Engine with object types.
It allows tuning replication/policer algorithms: container nodes do
not remove `LOCK` objects as redundant and try to fulfill `LOCK` placement
on the ohter container nodes.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
2022-11-14 12:29:40 +03:00

129 lines
3.3 KiB
Go

package engine
import (
"sort"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
)
// ErrEndOfListing is returned from an object listing with cursor
// when the storage can't return any more objects after the provided
// cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = shard.ErrEndOfListing
// Cursor is a type for continuous object listing.
type Cursor struct {
shardID string
shardCursor *shard.Cursor
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
cursor *Cursor
}
// WithCount sets the maximum amount of addresses that ListWithCursor should return.
func (p *ListWithCursorPrm) WithCount(count uint32) {
p.count = count
}
// WithCursor sets a cursor for ListWithCursor operation. For initial request
// ignore this param or use nil value. For consecutive requests, use value
// from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
p.cursor = cursor
}
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []objectcore.AddressWithType
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
return l.addrList
}
// Cursor returns cursor for consecutive listing requests.
func (l ListWithCursorRes) Cursor() *Cursor {
return l.cursor
}
// ListWithCursor lists physical objects available in the engine starting
// from the cursor. It includes regular, tombstone and storage group objects.
// Does not include inhumed objects. Use cursor value from the response
// for consecutive requests.
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]objectcore.AddressWithType, 0, prm.count)
// 1. Get available shards and sort them.
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()
if len(shardIDs) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
sort.Slice(shardIDs, func(i, j int) bool {
return shardIDs[i] < shardIDs[j]
})
// 2. Prepare cursor object.
cursor := prm.cursor
if cursor == nil {
cursor = &Cursor{shardID: shardIDs[0]}
}
// 3. Iterate over available shards. Skip unavailable shards.
for i := range shardIDs {
if len(result) >= int(prm.count) {
break
}
if shardIDs[i] < cursor.shardID {
continue
}
e.mtx.RLock()
shardInstance, ok := e.shards[shardIDs[i]]
e.mtx.RUnlock()
if !ok {
continue
}
count := uint32(int(prm.count) - len(result))
var shardPrm shard.ListWithCursorPrm
shardPrm.WithCount(count)
if shardIDs[i] == cursor.shardID {
shardPrm.WithCursor(cursor.shardCursor)
}
res, err := shardInstance.ListWithCursor(shardPrm)
if err != nil {
continue
}
result = append(result, res.AddressList()...)
cursor.shardCursor = res.Cursor()
cursor.shardID = shardIDs[i]
}
if len(result) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
return ListWithCursorRes{
addrList: result,
cursor: cursor,
}, nil
}