frostfs-node/pkg/local_object_storage/engine/list.go
Airat Arifullin ada081dfd5
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
ci/woodpecker/push/pre-commit Pipeline was successful
[#19] node: Make policier read shards concurrently
* Introduce ListWithMultiCursor that simultaneously reads objects
  from different shards

Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-04-27 11:11:47 +03:00

182 lines
4.8 KiB
Go

package engine
import (
"math/rand"
"sort"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
)
// ErrEndOfListing is returned from an object listing with cursor
// when the storage can't return any more objects after the provided
// cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = shard.ErrEndOfListing
// Cursor is a type for continuous object listing. Cursor contains shard IDs to read
// and shard cursors that contain state from previous read.
type Cursor struct {
current string
shardIDs map[string]bool
shardIDToCursor map[string]*shard.Cursor
}
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
return c.shardIDToCursor[c.current]
}
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
c.shardIDToCursor[c.current] = sc
}
func (c *Cursor) nextShard() bool {
var shardsToRead []string
for shardID, read := range c.shardIDs {
if !read {
shardsToRead = append(shardsToRead, shardID)
}
}
if len(shardsToRead) == 0 {
return false
}
c.current = shardsToRead[rand.Intn(len(shardsToRead))]
return true
}
func (c *Cursor) setShardRead(shardID string) {
c.shardIDs[shardID] = true
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
cursor *Cursor
}
// WithCount sets the maximum amount of addresses that ListWithCursor should return.
func (p *ListWithCursorPrm) WithCount(count uint32) {
p.count = count
}
// WithCursor sets a cursor for ListWithCursor operation. For initial request
// ignore this param or use nil value. For consecutive requests, use value
// from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
p.cursor = cursor
}
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []objectcore.AddressWithType
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
return l.addrList
}
// Cursor returns cursor for consecutive listing requests.
func (l ListWithCursorRes) Cursor() *Cursor {
return l.cursor
}
// ListWithCursor lists physical objects available in the engine starting
// from the cursor. It includes regular, tombstone and storage group objects.
// Does not include inhumed objects. Use cursor value from the response
// for consecutive requests.
//
// If count param is big enough, then the method reads objects from different shards
// by portions. In this case shards are chosen randomly, if they're not read out yet.
//
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
// won't be listed.
// Removing a shard between ListWithCursor leads to the undefined behavior
// (e.g. usage of the objects from the removed shard).
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]objectcore.AddressWithType, 0, prm.count)
// Set initial cursors
cursor := prm.cursor
if cursor == nil {
shardIDs := getSortedShardIDs(e)
if len(shardIDs) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
cursor = newCursor(shardIDs)
}
const (
splitShardCountLimit = 100
shardsNum = 4
)
batchSize := prm.count
if batchSize >= splitShardCountLimit {
batchSize /= shardsNum
}
for cursor.nextShard() {
if len(result) >= int(prm.count) {
break
}
curr := cursor.current
e.mtx.RLock()
shardInstance, ok := e.shards[curr]
e.mtx.RUnlock()
if !ok {
cursor.setShardRead(curr)
continue
}
count := prm.count - uint32(len(result))
if count > batchSize {
count = batchSize
}
var shardPrm shard.ListWithCursorPrm
shardPrm.WithCount(count)
shardPrm.WithCursor(cursor.getCurrentShardCursor())
res, err := shardInstance.ListWithCursor(shardPrm)
if err != nil {
cursor.setShardRead(curr)
continue
}
result = append(result, res.AddressList()...)
cursor.setCurrentShardCursor(res.Cursor())
}
if len(result) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
return ListWithCursorRes{
addrList: result,
cursor: cursor,
}, nil
}
func getSortedShardIDs(e *StorageEngine) []string {
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()
sort.Strings(shardIDs)
return shardIDs
}
func newCursor(shardIDs []string) *Cursor {
shardIDsMap := make(map[string]bool)
shardIDToCursor := make(map[string]*shard.Cursor)
for _, shardID := range shardIDs {
shardIDsMap[shardID] = false
}
return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
}