diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 64e1e3540..8d887c248 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -1,7 +1,12 @@ package engine import ( + "fmt" + "sort" + "strconv" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + core "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" ) @@ -9,7 +14,6 @@ import ( type ListWithCursorPrm struct { count uint32 cursor string - shard shard.ID } // WithCount sets maximum amount of addresses that ListWithCursor can return. @@ -26,12 +30,6 @@ func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm { return p } -// WithShardID sets shard where listing will process. -func (p *ListWithCursorPrm) WithShardID(id shard.ID) *ListWithCursorPrm { - p.shard = id - return p -} - // ListWithCursorRes contains values returned from ListWithCursor operation. type ListWithCursorRes struct { addrList []*object.Address @@ -53,22 +51,99 @@ func (l ListWithCursorRes) Cursor() string { // Does not include inhumed objects. Use cursor value from response // for consecutive requests. func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) { + var ( + err error + result = make([]*object.Address, 0, prm.count) + ) + + // 1. Get available shards and sort them. e.mtx.RLock() - shardInstance, ok := e.shards[prm.shard.String()] + shardIDs := make([]string, 0, len(e.shards)) + for id := range e.shards { + shardIDs = append(shardIDs, id) + } e.mtx.RUnlock() - if !ok { - return nil, errShardNotFound + if len(shardIDs) == 0 { + return nil, core.ErrEndOfListing } - shardPrm := new(shard.ListWithCursorPrm).WithCursor(prm.cursor).WithCount(prm.count) - res, err := shardInstance.ListWithCursor(shardPrm) - if err != nil { - return nil, err + sort.Slice(shardIDs, func(i, j int) bool { + return shardIDs[i] < shardIDs[j] + }) + + // 2. Decode shard ID from cursor. + cursor := prm.cursor + cursorShardID := shardIDs[0] + if len(cursor) > 0 { + cursorShardID, cursor, err = decodeID(cursor) + if err != nil { + return nil, err + } + } + + // 3. Iterate over available shards. Skip unavailable shards. + for i := range shardIDs { + if len(result) >= int(prm.count) { + break + } + + if shardIDs[i] < cursorShardID { + continue + } + + e.mtx.RLock() + shardInstance, ok := e.shards[shardIDs[i]] + e.mtx.RUnlock() + if !ok { + continue + } + + count := uint32(int(prm.count) - len(result)) + shardPrm := new(shard.ListWithCursorPrm).WithCount(count) + if shardIDs[i] == cursorShardID { + shardPrm.WithCursor(cursor) + } + + res, err := shardInstance.ListWithCursor(shardPrm) + if err != nil { + continue + } + + result = append(result, res.AddressList()...) + cursor = res.Cursor() + cursorShardID = shardIDs[i] + } + + if len(result) == 0 { + return nil, core.ErrEndOfListing } return &ListWithCursorRes{ - addrList: res.AddressList(), - cursor: res.Cursor(), + addrList: result, + cursor: encodeID(cursorShardID, cursor), }, nil } + +func decodeID(cursor string) (shardID string, shardCursor string, err error) { + ln := len(cursor) + if ln < 2 { + return "", "", fmt.Errorf("invalid cursor %s", cursor) + } + + idLen, err := strconv.Atoi(cursor[:2]) + if err != nil { + return "", "", fmt.Errorf("invalid cursor %s", cursor) + } + + if len(cursor) < 2+idLen { + return "", "", fmt.Errorf("invalid cursor %s", cursor) + } + + return cursor[2 : 2+idLen], cursor[2+idLen:], nil +} + +func encodeID(id, cursor string) string { + prefix := fmt.Sprintf("%02d", len(id)) + return prefix + id + cursor +} diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 78d51eedd..ecd1918a1 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -1,6 +1,7 @@ package engine import ( + "errors" "os" "sort" "testing" @@ -37,17 +38,23 @@ func TestListWithCursor(t *testing.T) { expected = sortAddresses(expected) - for _, shard := range e.DumpInfo().Shards { - prm := new(ListWithCursorPrm).WithShardID(*shard.ID).WithCount(total) - res, err := e.ListWithCursor(prm) - require.NoError(t, err) - require.NotEmpty(t, res.AddressList()) + prm := new(ListWithCursorPrm).WithCount(1) + res, err := e.ListWithCursor(prm) + require.NoError(t, err) + require.NotEmpty(t, res.AddressList()) + got = append(got, res.AddressList()...) + for i := 0; i < total-1; i++ { + res, err = e.ListWithCursor(prm.WithCursor(res.Cursor())) + if errors.Is(err, core.ErrEndOfListing) { + break + } got = append(got, res.AddressList()...) - _, err = e.ListWithCursor(prm.WithCursor(res.Cursor())) - require.ErrorIs(t, err, core.ErrEndOfListing) } + _, err = e.ListWithCursor(prm.WithCursor(res.Cursor())) + require.ErrorIs(t, err, core.ErrEndOfListing) + got = sortAddresses(got) require.Equal(t, expected, got) }