forked from TrueCloudLab/frostfs-node
[#948] engine: Hide shard IDs in ListWithCursor
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
0f6d8f6eea
commit
164cd10af8
2 changed files with 105 additions and 23 deletions
|
@ -1,7 +1,12 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
core "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -9,7 +14,6 @@ import (
|
||||||
type ListWithCursorPrm struct {
|
type ListWithCursorPrm struct {
|
||||||
count uint32
|
count uint32
|
||||||
cursor string
|
cursor string
|
||||||
shard shard.ID
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithCount sets maximum amount of addresses that ListWithCursor can return.
|
// WithCount sets maximum amount of addresses that ListWithCursor can return.
|
||||||
|
@ -26,12 +30,6 @@ func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithShardID sets shard where listing will process.
|
|
||||||
func (p *ListWithCursorPrm) WithShardID(id shard.ID) *ListWithCursorPrm {
|
|
||||||
p.shard = id
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||||
type ListWithCursorRes struct {
|
type ListWithCursorRes struct {
|
||||||
addrList []*object.Address
|
addrList []*object.Address
|
||||||
|
@ -53,22 +51,99 @@ func (l ListWithCursorRes) Cursor() string {
|
||||||
// Does not include inhumed objects. Use cursor value from response
|
// Does not include inhumed objects. Use cursor value from response
|
||||||
// for consecutive requests.
|
// for consecutive requests.
|
||||||
func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
|
func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
result = make([]*object.Address, 0, prm.count)
|
||||||
|
)
|
||||||
|
|
||||||
|
// 1. Get available shards and sort them.
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
shardInstance, ok := e.shards[prm.shard.String()]
|
shardIDs := make([]string, 0, len(e.shards))
|
||||||
|
for id := range e.shards {
|
||||||
|
shardIDs = append(shardIDs, id)
|
||||||
|
}
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
if !ok {
|
if len(shardIDs) == 0 {
|
||||||
return nil, errShardNotFound
|
return nil, core.ErrEndOfListing
|
||||||
}
|
}
|
||||||
|
|
||||||
shardPrm := new(shard.ListWithCursorPrm).WithCursor(prm.cursor).WithCount(prm.count)
|
sort.Slice(shardIDs, func(i, j int) bool {
|
||||||
res, err := shardInstance.ListWithCursor(shardPrm)
|
return shardIDs[i] < shardIDs[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
// 2. Decode shard ID from cursor.
|
||||||
|
cursor := prm.cursor
|
||||||
|
cursorShardID := shardIDs[0]
|
||||||
|
if len(cursor) > 0 {
|
||||||
|
cursorShardID, cursor, err = decodeID(cursor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Iterate over available shards. Skip unavailable shards.
|
||||||
|
for i := range shardIDs {
|
||||||
|
if len(result) >= int(prm.count) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if shardIDs[i] < cursorShardID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
e.mtx.RLock()
|
||||||
|
shardInstance, ok := e.shards[shardIDs[i]]
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
count := uint32(int(prm.count) - len(result))
|
||||||
|
shardPrm := new(shard.ListWithCursorPrm).WithCount(count)
|
||||||
|
if shardIDs[i] == cursorShardID {
|
||||||
|
shardPrm.WithCursor(cursor)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := shardInstance.ListWithCursor(shardPrm)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, res.AddressList()...)
|
||||||
|
cursor = res.Cursor()
|
||||||
|
cursorShardID = shardIDs[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result) == 0 {
|
||||||
|
return nil, core.ErrEndOfListing
|
||||||
|
}
|
||||||
|
|
||||||
return &ListWithCursorRes{
|
return &ListWithCursorRes{
|
||||||
addrList: res.AddressList(),
|
addrList: result,
|
||||||
cursor: res.Cursor(),
|
cursor: encodeID(cursorShardID, cursor),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func decodeID(cursor string) (shardID string, shardCursor string, err error) {
|
||||||
|
ln := len(cursor)
|
||||||
|
if ln < 2 {
|
||||||
|
return "", "", fmt.Errorf("invalid cursor %s", cursor)
|
||||||
|
}
|
||||||
|
|
||||||
|
idLen, err := strconv.Atoi(cursor[:2])
|
||||||
|
if err != nil {
|
||||||
|
return "", "", fmt.Errorf("invalid cursor %s", cursor)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cursor) < 2+idLen {
|
||||||
|
return "", "", fmt.Errorf("invalid cursor %s", cursor)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cursor[2 : 2+idLen], cursor[2+idLen:], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func encodeID(id, cursor string) string {
|
||||||
|
prefix := fmt.Sprintf("%02d", len(id))
|
||||||
|
return prefix + id + cursor
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -37,16 +38,22 @@ func TestListWithCursor(t *testing.T) {
|
||||||
|
|
||||||
expected = sortAddresses(expected)
|
expected = sortAddresses(expected)
|
||||||
|
|
||||||
for _, shard := range e.DumpInfo().Shards {
|
prm := new(ListWithCursorPrm).WithCount(1)
|
||||||
prm := new(ListWithCursorPrm).WithShardID(*shard.ID).WithCount(total)
|
|
||||||
res, err := e.ListWithCursor(prm)
|
res, err := e.ListWithCursor(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, res.AddressList())
|
require.NotEmpty(t, res.AddressList())
|
||||||
|
|
||||||
got = append(got, res.AddressList()...)
|
got = append(got, res.AddressList()...)
|
||||||
|
|
||||||
|
for i := 0; i < total-1; i++ {
|
||||||
|
res, err = e.ListWithCursor(prm.WithCursor(res.Cursor()))
|
||||||
|
if errors.Is(err, core.ErrEndOfListing) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
got = append(got, res.AddressList()...)
|
||||||
|
}
|
||||||
|
|
||||||
_, err = e.ListWithCursor(prm.WithCursor(res.Cursor()))
|
_, err = e.ListWithCursor(prm.WithCursor(res.Cursor()))
|
||||||
require.ErrorIs(t, err, core.ErrEndOfListing)
|
require.ErrorIs(t, err, core.ErrEndOfListing)
|
||||||
}
|
|
||||||
|
|
||||||
got = sortAddresses(got)
|
got = sortAddresses(got)
|
||||||
require.Equal(t, expected, got)
|
require.Equal(t, expected, got)
|
||||||
|
|
Loading…
Reference in a new issue