[#948] engine: Add ListWithCursor method

Interface for shard.ListWithCursor invocations.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-10-27 17:51:58 +03:00 committed by Alex Vanin
parent 5b6be7bc1c
commit c02c7bee5b
2 changed files with 134 additions and 0 deletions

View file

@ -0,0 +1,74 @@
package engine
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
)
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
cursor string
shard shard.ID
}
// WithCount sets maximum amount of addresses that ListWithCursor can return.
func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm {
p.count = count
return p
}
// WithCursor sets cursor for ListWithCursor operation. For initial request
// ignore this param or use empty string. For continues requests, use value
// from ListWithCursorRes.
func (p *ListWithCursorPrm) WithCursor(cursor string) *ListWithCursorPrm {
p.cursor = cursor
return p
}
// WithShardID sets shard where listing will process.
func (p *ListWithCursorPrm) WithShardID(id shard.ID) *ListWithCursorPrm {
p.shard = id
return p
}
// ListWithCursorRes contains values returned from ListWithCursor operation.
type ListWithCursorRes struct {
addrList []*object.Address
cursor string
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListWithCursorRes) AddressList() []*object.Address {
return l.addrList
}
// Cursor returns cursor for consecutive listing requests.
func (l ListWithCursorRes) Cursor() string {
return l.cursor
}
// ListWithCursor lists physical objects available in specified shard starting
// from cursor. Includes regular,tombstone and storage group objects.
// Does not include inhumed objects. Use cursor value from response
// for consecutive requests.
func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
e.mtx.RLock()
shardInstance, ok := e.shards[prm.shard.String()]
e.mtx.RUnlock()
if !ok {
return nil, errShardNotFound
}
shardPrm := new(shard.ListWithCursorPrm).WithCursor(prm.cursor).WithCount(prm.count)
res, err := shardInstance.ListWithCursor(shardPrm)
if err != nil {
return nil, err
}
return &ListWithCursorRes{
addrList: res.AddressList(),
cursor: res.Cursor(),
}, nil
}

View file

@ -0,0 +1,60 @@
package engine
import (
"os"
"sort"
"testing"
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/stretchr/testify/require"
)
func TestListWithCursor(t *testing.T) {
s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2)
e := testNewEngineWithShards(s1, s2)
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})
const total = 20
expected := make([]*object.Address, 0, total)
got := make([]*object.Address, 0, total)
for i := 0; i < total; i++ {
containerID := cidtest.Generate()
obj := generateRawObjectWithCID(t, containerID)
prm := new(PutPrm).WithObject(obj.Object())
_, err := e.Put(prm)
require.NoError(t, err)
expected = append(expected, obj.Object().Address())
}
expected = sortAddresses(expected)
for _, shard := range e.DumpInfo().Shards {
prm := new(ListWithCursorPrm).WithShardID(*shard.ID).WithCount(total)
res, err := e.ListWithCursor(prm)
require.NoError(t, err)
require.NotEmpty(t, res.AddressList())
got = append(got, res.AddressList()...)
_, err = e.ListWithCursor(prm.WithCursor(res.Cursor()))
require.ErrorIs(t, err, meta.ErrEndOfListing)
}
got = sortAddresses(got)
require.Equal(t, expected, got)
}
func sortAddresses(addr []*object.Address) []*object.Address {
sort.Slice(addr, func(i, j int) bool {
return addr[i].String() < addr[j].String()
})
return addr
}