180 lines
4.8 KiB
Go
180 lines
4.8 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"sort"
|
|
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
)
|
|
|
|
// ErrEndOfListing is returned from an object listing with cursor
|
|
// when the storage can't return any more objects after the provided
|
|
// cursor. Use nil cursor object to start listing again.
|
|
var ErrEndOfListing = shard.ErrEndOfListing
|
|
|
|
// Cursor is a type for continuous object listing. Cursor contains shard IDs to read
|
|
// and shard cursors that contain state from previous read.
|
|
type Cursor struct {
|
|
current string
|
|
shardIDs map[string]bool
|
|
shardIDToCursor map[string]*shard.Cursor
|
|
}
|
|
|
|
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
|
|
return c.shardIDToCursor[c.current]
|
|
}
|
|
|
|
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
|
|
c.shardIDToCursor[c.current] = sc
|
|
}
|
|
|
|
func (c *Cursor) nextShard() bool {
|
|
var shardsToRead []string
|
|
for shardID, read := range c.shardIDs {
|
|
if !read {
|
|
shardsToRead = append(shardsToRead, shardID)
|
|
}
|
|
}
|
|
if len(shardsToRead) == 0 {
|
|
return false
|
|
}
|
|
c.current = shardsToRead[rand.Intn(len(shardsToRead))]
|
|
return true
|
|
}
|
|
|
|
func (c *Cursor) setShardRead(shardID string) {
|
|
c.shardIDs[shardID] = true
|
|
}
|
|
|
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
|
type ListWithCursorPrm struct {
|
|
count uint32
|
|
cursor *Cursor
|
|
}
|
|
|
|
// WithCount sets the maximum amount of addresses that ListWithCursor should return.
|
|
func (p *ListWithCursorPrm) WithCount(count uint32) {
|
|
p.count = count
|
|
}
|
|
|
|
// WithCursor sets a cursor for ListWithCursor operation. For initial request
|
|
// ignore this param or use nil value. For consecutive requests, use value
|
|
// from ListWithCursorRes.
|
|
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
|
p.cursor = cursor
|
|
}
|
|
|
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
|
type ListWithCursorRes struct {
|
|
addrList []objectcore.AddressWithType
|
|
cursor *Cursor
|
|
}
|
|
|
|
// AddressList returns addresses selected by ListWithCursor operation.
|
|
func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType {
|
|
return l.addrList
|
|
}
|
|
|
|
// Cursor returns cursor for consecutive listing requests.
|
|
func (l ListWithCursorRes) Cursor() *Cursor {
|
|
return l.cursor
|
|
}
|
|
|
|
// ListWithCursor lists physical objects available in the engine starting
|
|
// from the cursor. It includes regular, tombstone and storage group objects.
|
|
// Does not include inhumed objects. Use cursor value from the response
|
|
// for consecutive requests.
|
|
//
|
|
// If count param is big enough, then the method reads objects from different shards
|
|
// by portions. In this case shards are chosen randomly, if they're not read out yet.
|
|
//
|
|
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
|
|
// won't be listed.
|
|
// Removing a shard between ListWithCursor leads to the undefined behavior
|
|
// (e.g. usage of the objects from the removed shard).
|
|
//
|
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
|
// parameter set to zero.
|
|
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
|
result := make([]objectcore.AddressWithType, 0, prm.count)
|
|
|
|
// Set initial cursors
|
|
cursor := prm.cursor
|
|
if cursor == nil {
|
|
shardIDs := getSortedShardIDs(e)
|
|
if len(shardIDs) == 0 {
|
|
return ListWithCursorRes{}, ErrEndOfListing
|
|
}
|
|
cursor = newCursor(shardIDs)
|
|
}
|
|
|
|
const (
|
|
splitShardCountLimit = 100
|
|
shardsNum = 4
|
|
)
|
|
|
|
batchSize := prm.count
|
|
if batchSize >= splitShardCountLimit {
|
|
batchSize /= shardsNum
|
|
}
|
|
|
|
for cursor.nextShard() {
|
|
if len(result) >= int(prm.count) {
|
|
break
|
|
}
|
|
curr := cursor.current
|
|
|
|
e.mtx.RLock()
|
|
shardInstance, ok := e.shards[curr]
|
|
e.mtx.RUnlock()
|
|
if !ok {
|
|
cursor.setShardRead(curr)
|
|
continue
|
|
}
|
|
|
|
count := min(prm.count-uint32(len(result)), batchSize)
|
|
|
|
var shardPrm shard.ListWithCursorPrm
|
|
shardPrm.WithCount(count)
|
|
shardPrm.WithCursor(cursor.getCurrentShardCursor())
|
|
|
|
res, err := shardInstance.ListWithCursor(ctx, shardPrm)
|
|
if err != nil {
|
|
cursor.setShardRead(curr)
|
|
continue
|
|
}
|
|
result = append(result, res.AddressList()...)
|
|
cursor.setCurrentShardCursor(res.Cursor())
|
|
}
|
|
|
|
if len(result) == 0 {
|
|
return ListWithCursorRes{}, ErrEndOfListing
|
|
}
|
|
|
|
return ListWithCursorRes{
|
|
addrList: result,
|
|
cursor: cursor,
|
|
}, nil
|
|
}
|
|
|
|
func getSortedShardIDs(e *StorageEngine) []string {
|
|
e.mtx.RLock()
|
|
shardIDs := make([]string, 0, len(e.shards))
|
|
for id := range e.shards {
|
|
shardIDs = append(shardIDs, id)
|
|
}
|
|
e.mtx.RUnlock()
|
|
sort.Strings(shardIDs)
|
|
return shardIDs
|
|
}
|
|
|
|
func newCursor(shardIDs []string) *Cursor {
|
|
shardIDsMap := make(map[string]bool)
|
|
shardIDToCursor := make(map[string]*shard.Cursor)
|
|
for _, shardID := range shardIDs {
|
|
shardIDsMap[shardID] = false
|
|
}
|
|
return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
|
|
}
|