package engine import ( "context" "math/rand" "sort" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" ) // ErrEndOfListing is returned from an object listing with cursor // when the storage can't return any more objects after the provided // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = shard.ErrEndOfListing // Cursor is a type for continuous object listing. Cursor contains shard IDs to read // and shard cursors that contain state from previous read. type Cursor struct { current string shardIDs map[string]bool shardIDToCursor map[string]*shard.Cursor } func (c *Cursor) getCurrentShardCursor() *shard.Cursor { return c.shardIDToCursor[c.current] } func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) { c.shardIDToCursor[c.current] = sc } func (c *Cursor) nextShard() bool { var shardsToRead []string for shardID, read := range c.shardIDs { if !read { shardsToRead = append(shardsToRead, shardID) } } if len(shardsToRead) == 0 { return false } c.current = shardsToRead[rand.Intn(len(shardsToRead))] return true } func (c *Cursor) setShardRead(shardID string) { c.shardIDs[shardID] = true } // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 cursor *Cursor } // WithCount sets the maximum amount of addresses that ListWithCursor should return. func (p *ListWithCursorPrm) WithCount(count uint32) { p.count = count } // WithCursor sets a cursor for ListWithCursor operation. For initial request // ignore this param or use nil value. For consecutive requests, use value // from ListWithCursorRes. func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { p.cursor = cursor } // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { addrList []objectcore.Info cursor *Cursor } // AddressList returns addresses selected by ListWithCursor operation. func (l ListWithCursorRes) AddressList() []objectcore.Info { return l.addrList } // Cursor returns cursor for consecutive listing requests. func (l ListWithCursorRes) Cursor() *Cursor { return l.cursor } // ListWithCursor lists physical objects available in the engine starting // from the cursor. It includes regular, tombstone and storage group objects. // Does not include inhumed objects. Use cursor value from the response // for consecutive requests. // // If count param is big enough, then the method reads objects from different shards // by portions. In this case shards are chosen randomly, if they're not read out yet. // // Adding a shard between ListWithCursor does not invalidate the cursor but new shard // won't be listed. // Removing a shard between ListWithCursor leads to the undefined behavior // (e.g. usage of the objects from the removed shard). // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.ListWithCursor") defer span.End() defer elapsed("ListWithCursor", e.metrics.AddMethodDuration)() result := make([]objectcore.Info, 0, prm.count) // Set initial cursors cursor := prm.cursor if cursor == nil { shardIDs := getSortedShardIDs(e) if len(shardIDs) == 0 { return ListWithCursorRes{}, ErrEndOfListing } cursor = newCursor(shardIDs) } const ( splitShardCountLimit = 100 shardsNum = 4 ) batchSize := prm.count if batchSize >= splitShardCountLimit { batchSize /= shardsNum } for cursor.nextShard() { if len(result) >= int(prm.count) { break } curr := cursor.current e.mtx.RLock() shardInstance, ok := e.shards[curr] e.mtx.RUnlock() if !ok { cursor.setShardRead(curr) continue } count := min(prm.count-uint32(len(result)), batchSize) var shardPrm shard.ListWithCursorPrm shardPrm.WithCount(count) shardPrm.WithCursor(cursor.getCurrentShardCursor()) res, err := shardInstance.ListWithCursor(ctx, shardPrm) if err != nil { cursor.setShardRead(curr) continue } result = append(result, res.AddressList()...) cursor.setCurrentShardCursor(res.Cursor()) } if len(result) == 0 { return ListWithCursorRes{}, ErrEndOfListing } return ListWithCursorRes{ addrList: result, cursor: cursor, }, nil } func getSortedShardIDs(e *StorageEngine) []string { e.mtx.RLock() shardIDs := make([]string, 0, len(e.shards)) for id := range e.shards { shardIDs = append(shardIDs, id) } e.mtx.RUnlock() sort.Strings(shardIDs) return shardIDs } func newCursor(shardIDs []string) *Cursor { shardIDsMap := make(map[string]bool) shardIDToCursor := make(map[string]*shard.Cursor) for _, shardID := range shardIDs { shardIDsMap[shardID] = false } return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor} }