[#19] node: Make policier read shards concurrently #257
2 changed files with 187 additions and 92 deletions
|
@ -1,6 +1,7 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -12,10 +13,38 @@ import (
|
||||||
// cursor. Use nil cursor object to start listing again.
|
// cursor. Use nil cursor object to start listing again.
|
||||||
var ErrEndOfListing = shard.ErrEndOfListing
|
var ErrEndOfListing = shard.ErrEndOfListing
|
||||||
|
|
||||||
// Cursor is a type for continuous object listing.
|
// Cursor is a type for continuous object listing. Cursor contains shard IDs to read
|
||||||
|
// and shard cursors that contain state from previous read.
|
||||||
type Cursor struct {
|
type Cursor struct {
|
||||||
shardID string
|
current string
|
||||||
shardCursor *shard.Cursor
|
shardIDs map[string]bool
|
||||||
|
shardIDToCursor map[string]*shard.Cursor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
|
||||||
|
return c.shardIDToCursor[c.current]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
|
||||||
|
c.shardIDToCursor[c.current] = sc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) nextShard() bool {
|
||||||
|
var shardsToRead []string
|
||||||
|
for shardID, read := range c.shardIDs {
|
||||||
|
if !read {
|
||||||
|
shardsToRead = append(shardsToRead, shardID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(shardsToRead) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
c.current = shardsToRead[rand.Intn(len(shardsToRead))]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) setShardRead(shardID string) {
|
||||||
|
c.shardIDs[shardID] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||||
|
@ -57,65 +86,69 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
||||||
// Does not include inhumed objects. Use cursor value from the response
|
// Does not include inhumed objects. Use cursor value from the response
|
||||||
// for consecutive requests.
|
// for consecutive requests.
|
||||||
//
|
//
|
||||||
|
// If count param is big enough, then the method reads objects from different shards
|
||||||
|
// by portions. In this case shards are chosen randomly, if they're not read out yet.
|
||||||
|
//
|
||||||
|
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
|
||||||
|
// won't be listed.
|
||||||
|
// Removing a shard between ListWithCursor leads to the undefined behavior
|
||||||
|
// (e.g. usage of the objects from the removed shard).
|
||||||
|
//
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
// parameter set to zero.
|
// parameter set to zero.
|
||||||
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||||
|
|
||||||
// 1. Get available shards and sort them.
|
// Set initial cursors
|
||||||
e.mtx.RLock()
|
cursor := prm.cursor
|
||||||
shardIDs := make([]string, 0, len(e.shards))
|
if cursor == nil {
|
||||||
for id := range e.shards {
|
shardIDs := getSortedShardIDs(e)
|
||||||
shardIDs = append(shardIDs, id)
|
|
||||||
}
|
|
||||||
e.mtx.RUnlock()
|
|
||||||
|
|
||||||
if len(shardIDs) == 0 {
|
if len(shardIDs) == 0 {
|
||||||
return ListWithCursorRes{}, ErrEndOfListing
|
return ListWithCursorRes{}, ErrEndOfListing
|
||||||
}
|
}
|
||||||
|
cursor = newCursor(shardIDs)
|
||||||
sort.Slice(shardIDs, func(i, j int) bool {
|
|
||||||
return shardIDs[i] < shardIDs[j]
|
|
||||||
})
|
|
||||||
|
|
||||||
// 2. Prepare cursor object.
|
|
||||||
cursor := prm.cursor
|
|
||||||
if cursor == nil {
|
|
||||||
cursor = &Cursor{shardID: shardIDs[0]}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Iterate over available shards. Skip unavailable shards.
|
const (
|
||||||
for i := range shardIDs {
|
splitShardCountLimit = 100
|
||||||
|
shardsNum = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
batchSize := prm.count
|
||||||
|
if batchSize >= splitShardCountLimit {
|
||||||
|
batchSize /= shardsNum
|
||||||
|
}
|
||||||
|
|
||||||
|
for cursor.nextShard() {
|
||||||
if len(result) >= int(prm.count) {
|
if len(result) >= int(prm.count) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
curr := cursor.current
|
||||||
if shardIDs[i] < cursor.shardID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
shardInstance, ok := e.shards[shardIDs[i]]
|
shardInstance, ok := e.shards[curr]
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
cursor.setShardRead(curr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
count := uint32(int(prm.count) - len(result))
|
count := prm.count - uint32(len(result))
|
||||||
|
if count > batchSize {
|
||||||
|
count = batchSize
|
||||||
|
}
|
||||||
|
|
||||||
var shardPrm shard.ListWithCursorPrm
|
var shardPrm shard.ListWithCursorPrm
|
||||||
shardPrm.WithCount(count)
|
shardPrm.WithCount(count)
|
||||||
if shardIDs[i] == cursor.shardID {
|
shardPrm.WithCursor(cursor.getCurrentShardCursor())
|
||||||
shardPrm.WithCursor(cursor.shardCursor)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := shardInstance.ListWithCursor(shardPrm)
|
res, err := shardInstance.ListWithCursor(shardPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cursor.setShardRead(curr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
result = append(result, res.AddressList()...)
|
result = append(result, res.AddressList()...)
|
||||||
cursor.shardCursor = res.Cursor()
|
cursor.setCurrentShardCursor(res.Cursor())
|
||||||
cursor.shardID = shardIDs[i]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result) == 0 {
|
if len(result) == 0 {
|
||||||
|
@ -127,3 +160,23 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes
|
||||||
cursor: cursor,
|
cursor: cursor,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSortedShardIDs(e *StorageEngine) []string {
|
||||||
|
e.mtx.RLock()
|
||||||
|
shardIDs := make([]string, 0, len(e.shards))
|
||||||
|
for id := range e.shards {
|
||||||
|
shardIDs = append(shardIDs, id)
|
||||||
|
}
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
sort.Strings(shardIDs)
|
||||||
|
return shardIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCursor(shardIDs []string) *Cursor {
|
||||||
|
shardIDsMap := make(map[string]bool)
|
||||||
|
shardIDToCursor := make(map[string]*shard.Cursor)
|
||||||
|
for _, shardID := range shardIDs {
|
||||||
|
shardIDsMap[shardID] = false
|
||||||
|
}
|
||||||
|
return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
|
||||||
|
}
|
||||||
|
|
|
@ -2,36 +2,94 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
|
||||||
|
sort.Slice(addrWithType, func(i, j int) bool {
|
||||||
|
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
|
||||||
|
})
|
||||||
|
return addrWithType
|
||||||
|
}
|
||||||
|
|
||||||
func TestListWithCursor(t *testing.T) {
|
func TestListWithCursor(t *testing.T) {
|
||||||
s1 := testNewShard(t, 1)
|
tests := []struct {
|
||||||
s2 := testNewShard(t, 2)
|
name string
|
||||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
shardNum int
|
||||||
|
objectNum int
|
||||||
|
batchSize uint32
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "one shard, few objects, small batch size",
|
||||||
|
shardNum: 1,
|
||||||
|
objectNum: 2,
|
||||||
|
batchSize: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one shard, many objects, big batch size",
|
||||||
|
shardNum: 1,
|
||||||
|
objectNum: 53,
|
||||||
|
batchSize: 100,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "many shards, many objects, small batch size",
|
||||||
|
shardNum: 6,
|
||||||
|
objectNum: 66,
|
||||||
|
batchSize: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "many shards, many objects, big batch size",
|
||||||
|
shardNum: 6,
|
||||||
|
objectNum: 99,
|
||||||
|
batchSize: 100,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
e := testNewEngine(t).setShardsNumOpts(t, tt.shardNum, func(id int) []shard.Option {
|
||||||
|
return []shard.Option{
|
||||||
|
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
|
||||||
|
shard.WithBlobStorOptions(
|
||||||
|
blobstor.WithStorages(
|
||||||
|
newStorages(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.blobstor", id)),
|
||||||
|
1<<20))),
|
||||||
|
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.pilorama", id)))),
|
||||||
|
shard.WithMetaBaseOptions(
|
||||||
|
meta.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.metabase", id))),
|
||||||
|
meta.WithPermissions(0700),
|
||||||
|
meta.WithEpochState(epochState{}),
|
||||||
|
)}
|
||||||
|
}).engine
|
||||||
|
require.NoError(t, e.Open())
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
e.Close()
|
e.Close()
|
||||||
os.RemoveAll(t.Name())
|
os.RemoveAll(t.Name())
|
||||||
})
|
})
|
||||||
|
|
||||||
const total = 20
|
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
||||||
|
got := make([]object.AddressWithType, 0, tt.objectNum)
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, total)
|
for i := 0; i < tt.objectNum; i++ {
|
||||||
got := make([]object.AddressWithType, 0, total)
|
|
||||||
|
|
||||||
for i := 0; i < total; i++ {
|
|
||||||
containerID := cidtest.ID()
|
containerID := cidtest.ID()
|
||||||
obj := testutil.GenerateObjectWithCID(containerID)
|
obj := testutil.GenerateObjectWithCIDWithPayload(containerID, []byte{'a'})
|
||||||
|
|
||||||
var prm PutPrm
|
var prm PutPrm
|
||||||
prm.WithObject(obj)
|
prm.WithObject(obj)
|
||||||
|
@ -40,39 +98,23 @@ func TestListWithCursor(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||||
}
|
}
|
||||||
|
|
||||||
expected = sortAddresses(expected)
|
expected = sortAddresses(expected)
|
||||||
|
|
||||||
var prm ListWithCursorPrm
|
var prm ListWithCursorPrm
|
||||||
prm.WithCount(1)
|
prm.count = tt.batchSize
|
||||||
|
for {
|
||||||
res, err := e.ListWithCursor(prm)
|
res, err := e.ListWithCursor(prm)
|
||||||
require.NoError(t, err)
|
if err == ErrEndOfListing {
|
||||||
require.NotEmpty(t, res.AddressList())
|
require.Empty(t, res.AddressList())
|
||||||
got = append(got, res.AddressList()...)
|
|
||||||
|
|
||||||
for i := 0; i < total-1; i++ {
|
|
||||||
prm.WithCursor(res.Cursor())
|
|
||||||
|
|
||||||
res, err = e.ListWithCursor(prm)
|
|
||||||
if errors.Is(err, ErrEndOfListing) {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
require.NotEmpty(t, res.AddressList())
|
||||||
got = append(got, res.AddressList()...)
|
got = append(got, res.AddressList()...)
|
||||||
|
prm.cursor = res.Cursor()
|
||||||
}
|
}
|
||||||
|
|
||||||
prm.WithCursor(res.Cursor())
|
|
||||||
|
|
||||||
_, err = e.ListWithCursor(prm)
|
|
||||||
require.ErrorIs(t, err, ErrEndOfListing)
|
|
||||||
|
|
||||||
got = sortAddresses(got)
|
got = sortAddresses(got)
|
||||||
require.Equal(t, expected, got)
|
require.Equal(t, expected, got)
|
||||||
}
|
|
||||||
|
|
||||||
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
|
|
||||||
sort.Slice(addrWithType, func(i, j int) bool {
|
|
||||||
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
|
|
||||||
})
|
})
|
||||||
return addrWithType
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue