[#19] node: Make policier read shards concurrently

* Introduce ListWithMultiCursor that simultaneously reads objects
  from different shards

Signed-off-by: Airat Arifullin a.arifullin@yadro.com
This commit is contained in:
Airat Arifullin 2023-04-16 17:03:42 +03:00
parent 1f4061c0e2
commit ada081dfd5
2 changed files with 187 additions and 92 deletions

View file

@ -1,6 +1,7 @@
package engine
import (
"math/rand"
"sort"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -12,10 +13,38 @@ import (
// cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = shard.ErrEndOfListing
// Cursor is a type for continuous object listing.
// Cursor is a type for continuous object listing. Cursor contains shard IDs to read
// and shard cursors that contain state from previous read.
type Cursor struct {
shardID string
shardCursor *shard.Cursor
current string
shardIDs map[string]bool
shardIDToCursor map[string]*shard.Cursor
}
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
return c.shardIDToCursor[c.current]
}
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
c.shardIDToCursor[c.current] = sc
}
func (c *Cursor) nextShard() bool {
var shardsToRead []string
for shardID, read := range c.shardIDs {
if !read {
shardsToRead = append(shardsToRead, shardID)
}
}
if len(shardsToRead) == 0 {
return false
}
c.current = shardsToRead[rand.Intn(len(shardsToRead))]
return true
}
func (c *Cursor) setShardRead(shardID string) {
c.shardIDs[shardID] = true
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
@ -57,65 +86,69 @@ func (l ListWithCursorRes) Cursor() *Cursor {
// Does not include inhumed objects. Use cursor value from the response
// for consecutive requests.
//
// If count param is big enough, then the method reads objects from different shards
// by portions. In this case shards are chosen randomly, if they're not read out yet.
//
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
// won't be listed.
// Removing a shard between ListWithCursor leads to the undefined behavior
// (e.g. usage of the objects from the removed shard).
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]objectcore.AddressWithType, 0, prm.count)
// 1. Get available shards and sort them.
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()
if len(shardIDs) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
sort.Slice(shardIDs, func(i, j int) bool {
return shardIDs[i] < shardIDs[j]
})
// 2. Prepare cursor object.
// Set initial cursors
cursor := prm.cursor
if cursor == nil {
cursor = &Cursor{shardID: shardIDs[0]}
shardIDs := getSortedShardIDs(e)
if len(shardIDs) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
cursor = newCursor(shardIDs)
}
// 3. Iterate over available shards. Skip unavailable shards.
for i := range shardIDs {
const (
splitShardCountLimit = 100
shardsNum = 4
)
batchSize := prm.count
if batchSize >= splitShardCountLimit {
batchSize /= shardsNum
}
for cursor.nextShard() {
if len(result) >= int(prm.count) {
break
}
if shardIDs[i] < cursor.shardID {
continue
}
curr := cursor.current
e.mtx.RLock()
shardInstance, ok := e.shards[shardIDs[i]]
shardInstance, ok := e.shards[curr]
e.mtx.RUnlock()
if !ok {
cursor.setShardRead(curr)
continue
}
count := uint32(int(prm.count) - len(result))
count := prm.count - uint32(len(result))
if count > batchSize {
count = batchSize
}
var shardPrm shard.ListWithCursorPrm
shardPrm.WithCount(count)
if shardIDs[i] == cursor.shardID {
shardPrm.WithCursor(cursor.shardCursor)
}
shardPrm.WithCursor(cursor.getCurrentShardCursor())
res, err := shardInstance.ListWithCursor(shardPrm)
if err != nil {
cursor.setShardRead(curr)
continue
}
result = append(result, res.AddressList()...)
cursor.shardCursor = res.Cursor()
cursor.shardID = shardIDs[i]
cursor.setCurrentShardCursor(res.Cursor())
}
if len(result) == 0 {
@ -127,3 +160,23 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes
cursor: cursor,
}, nil
}
func getSortedShardIDs(e *StorageEngine) []string {
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()
sort.Strings(shardIDs)
return shardIDs
}
func newCursor(shardIDs []string) *Cursor {
shardIDsMap := make(map[string]bool)
shardIDToCursor := make(map[string]*shard.Cursor)
for _, shardID := range shardIDs {
shardIDsMap[shardID] = false
}
return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
}

View file

@ -2,77 +2,119 @@ package engine
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestListWithCursor(t *testing.T) {
s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2)
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})
const total = 20
expected := make([]object.AddressWithType, 0, total)
got := make([]object.AddressWithType, 0, total)
for i := 0; i < total; i++ {
containerID := cidtest.ID()
obj := testutil.GenerateObjectWithCID(containerID)
var prm PutPrm
prm.WithObject(obj)
err := e.Put(context.Background(), prm)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
}
expected = sortAddresses(expected)
var prm ListWithCursorPrm
prm.WithCount(1)
res, err := e.ListWithCursor(prm)
require.NoError(t, err)
require.NotEmpty(t, res.AddressList())
got = append(got, res.AddressList()...)
for i := 0; i < total-1; i++ {
prm.WithCursor(res.Cursor())
res, err = e.ListWithCursor(prm)
if errors.Is(err, ErrEndOfListing) {
break
}
got = append(got, res.AddressList()...)
}
prm.WithCursor(res.Cursor())
_, err = e.ListWithCursor(prm)
require.ErrorIs(t, err, ErrEndOfListing)
got = sortAddresses(got)
require.Equal(t, expected, got)
}
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
sort.Slice(addrWithType, func(i, j int) bool {
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
})
return addrWithType
}
func TestListWithCursor(t *testing.T) {
tests := []struct {
name string
shardNum int
objectNum int
batchSize uint32
}{
{
name: "one shard, few objects, small batch size",
shardNum: 1,
objectNum: 2,
batchSize: 1,
},
{
name: "one shard, many objects, big batch size",
shardNum: 1,
objectNum: 53,
batchSize: 100,
},
{
name: "many shards, many objects, small batch size",
shardNum: 6,
objectNum: 66,
batchSize: 1,
},
{
name: "many shards, many objects, big batch size",
shardNum: 6,
objectNum: 99,
batchSize: 100,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := testNewEngine(t).setShardsNumOpts(t, tt.shardNum, func(id int) []shard.Option {
return []shard.Option{
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.blobstor", id)),
1<<20))),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.pilorama", id)))),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
)}
}).engine
require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})
expected := make([]object.AddressWithType, 0, tt.objectNum)
got := make([]object.AddressWithType, 0, tt.objectNum)
for i := 0; i < tt.objectNum; i++ {
containerID := cidtest.ID()
obj := testutil.GenerateObjectWithCIDWithPayload(containerID, []byte{'a'})
var prm PutPrm
prm.WithObject(obj)
err := e.Put(context.Background(), prm)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
}
expected = sortAddresses(expected)
var prm ListWithCursorPrm
prm.count = tt.batchSize
for {
res, err := e.ListWithCursor(prm)
if err == ErrEndOfListing {
require.Empty(t, res.AddressList())
break
}
require.NotEmpty(t, res.AddressList())
got = append(got, res.AddressList()...)
prm.cursor = res.Cursor()
}
got = sortAddresses(got)
require.Equal(t, expected, got)
})
}
}