[#19] node: Make policier read shards concurrently #257
|
@ -1,6 +1,7 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -12,10 +13,38 @@ import (
|
||||||
// cursor. Use nil cursor object to start listing again.
|
// cursor. Use nil cursor object to start listing again.
|
||||||
var ErrEndOfListing = shard.ErrEndOfListing
|
var ErrEndOfListing = shard.ErrEndOfListing
|
||||||
|
|
||||||
// Cursor is a type for continuous object listing.
|
// Cursor is a type for continuous object listing. Cursor contains shard IDs to read
|
||||||
|
// and shard cursors that contain state from previous read.
|
||||||
type Cursor struct {
|
type Cursor struct {
|
||||||
shardID string
|
current string
|
||||||
shardCursor *shard.Cursor
|
shardIDs map[string]bool
|
||||||
|
shardIDToCursor map[string]*shard.Cursor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
|
||||||
|
return c.shardIDToCursor[c.current]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
|
||||||
|
c.shardIDToCursor[c.current] = sc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) nextShard() bool {
|
||||||
|
var shardsToRead []string
|
||||||
|
for shardID, read := range c.shardIDs {
|
||||||
|
if !read {
|
||||||
|
shardsToRead = append(shardsToRead, shardID)
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
if len(shardsToRead) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
c.current = shardsToRead[rand.Intn(len(shardsToRead))]
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cursor) setShardRead(shardID string) {
|
||||||
|
c.shardIDs[shardID] = true
|
||||||
fyrchik
commented
So we have a list of So we have a list of `shardIDs` in each element of this slice. Can we share both them and `curr`?
aarifullin
commented
Outdated. Removed this method Outdated. Removed this method
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||||
|
@ -57,65 +86,69 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
||||||
// Does not include inhumed objects. Use cursor value from the response
|
// Does not include inhumed objects. Use cursor value from the response
|
||||||
// for consecutive requests.
|
// for consecutive requests.
|
||||||
//
|
//
|
||||||
|
// If count param is big enough, then the method reads objects from different shards
|
||||||
|
// by portions. In this case shards are chosen randomly, if they're not read out yet.
|
||||||
|
//
|
||||||
|
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
|
||||||
|
// won't be listed.
|
||||||
|
// Removing a shard between ListWithCursor leads to the undefined behavior
|
||||||
|
// (e.g. usage of the objects from the removed shard).
|
||||||
|
//
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What if it is == 0? What if it is == 0?
aarifullin
commented
Nothing wrong, but let's return Also, Nothing wrong, but let's return `ListWithCursorRes{}, ErrEndOfListing`.
Also, `len(cursor.shardCursorWrappers) <= int(prm.count)`. See above how it's adjusted
carpawell marked this conversation as resolved
Outdated
carpawell
commented
is that planned to be solved? (also, dot at the end of the sentence) is that planned to be solved?
(also, dot at the end of the sentence)
aarifullin
commented
I don't think so. Anyway, it's rather warning for user. From my POV, it's fine when an iterator gets invalidated because data has been changed > is that planned to be solved?
I don't think so. Anyway, it's rather warning for user.
From my POV, it's fine when an iterator gets invalidated because data has been changed
|
|||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
// parameter set to zero.
|
// parameter set to zero.
|
||||||
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||||
|
|
||||||
// 1. Get available shards and sort them.
|
// Set initial cursors
|
||||||
e.mtx.RLock()
|
|
||||||
shardIDs := make([]string, 0, len(e.shards))
|
|
||||||
for id := range e.shards {
|
|
||||||
shardIDs = append(shardIDs, id)
|
|
||||||
}
|
|
||||||
e.mtx.RUnlock()
|
|
||||||
|
|
||||||
if len(shardIDs) == 0 {
|
|
||||||
return ListWithCursorRes{}, ErrEndOfListing
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(shardIDs, func(i, j int) bool {
|
|
||||||
return shardIDs[i] < shardIDs[j]
|
|
||||||
})
|
|
||||||
|
|
||||||
// 2. Prepare cursor object.
|
|
||||||
cursor := prm.cursor
|
cursor := prm.cursor
|
||||||
if cursor == nil {
|
if cursor == nil {
|
||||||
cursor = &Cursor{shardID: shardIDs[0]}
|
shardIDs := getSortedShardIDs(e)
|
||||||
|
if len(shardIDs) == 0 {
|
||||||
|
return ListWithCursorRes{}, ErrEndOfListing
|
||||||
|
}
|
||||||
|
cursor = newCursor(shardIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Iterate over available shards. Skip unavailable shards.
|
const (
|
||||||
for i := range shardIDs {
|
splitShardCountLimit = 100
|
||||||
|
shardsNum = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
batchSize := prm.count
|
||||||
|
if batchSize >= splitShardCountLimit {
|
||||||
|
batchSize /= shardsNum
|
||||||
carpawell marked this conversation as resolved
Outdated
carpawell
commented
usually we do just usually we do just `var wg sync.WaitGroup`
aarifullin
commented
I should get used to it :) Fixed! I should get used to it :) Fixed!
|
|||||||
|
}
|
||||||
|
|
||||||
|
for cursor.nextShard() {
|
||||||
if len(result) >= int(prm.count) {
|
if len(result) >= int(prm.count) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
curr := cursor.current
|
||||||
if shardIDs[i] < cursor.shardID {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
shardInstance, ok := e.shards[shardIDs[i]]
|
shardInstance, ok := e.shards[curr]
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
cursor.setShardRead(curr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
count := uint32(int(prm.count) - len(result))
|
count := prm.count - uint32(len(result))
|
||||||
|
if count > batchSize {
|
||||||
|
count = batchSize
|
||||||
|
}
|
||||||
|
|
||||||
var shardPrm shard.ListWithCursorPrm
|
var shardPrm shard.ListWithCursorPrm
|
||||||
shardPrm.WithCount(count)
|
shardPrm.WithCount(count)
|
||||||
if shardIDs[i] == cursor.shardID {
|
shardPrm.WithCursor(cursor.getCurrentShardCursor())
|
||||||
shardPrm.WithCursor(cursor.shardCursor)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := shardInstance.ListWithCursor(shardPrm)
|
res, err := shardInstance.ListWithCursor(shardPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cursor.setShardRead(curr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
result = append(result, res.AddressList()...)
|
result = append(result, res.AddressList()...)
|
||||||
cursor.shardCursor = res.Cursor()
|
cursor.setCurrentShardCursor(res.Cursor())
|
||||||
cursor.shardID = shardIDs[i]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result) == 0 {
|
if len(result) == 0 {
|
||||||
|
@ -127,3 +160,23 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes
|
||||||
cursor: cursor,
|
cursor: cursor,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSortedShardIDs(e *StorageEngine) []string {
|
||||||
|
e.mtx.RLock()
|
||||||
|
shardIDs := make([]string, 0, len(e.shards))
|
||||||
|
for id := range e.shards {
|
||||||
|
shardIDs = append(shardIDs, id)
|
||||||
|
}
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
sort.Strings(shardIDs)
|
||||||
|
return shardIDs
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why did you decide to implement a separate engine method instead of a simple wrapper in the policer package? Why did you decide to implement a separate engine method instead of a simple wrapper in the policer package?
aarifullin
commented
Briefly: no way I tried many ways to manage reading shards from the policer but nothing succeeded. If Briefly: no way
I tried many ways to manage reading shards from the policer but nothing succeeded. If `ListWithCursor` is used, then shards are re-read few times. We don't need it
fyrchik
commented
Ok, could you describe the behaviour in the doc-comment? Ok, could you describe the behaviour in the doc-comment?
I am specifically interested in (1) what do we do if the shard is removed/added and in (2) error handling (one of the shards went degraded -> we get error on list).
aarifullin
commented
Please, check new comments for the method. If it lacks something, I'll add Please, check new comments for the method. If it lacks something, I'll add
|
|||||||
|
}
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Does Does `funlen` linter marks this function as function with too many lines of code?
I propose to divide this function: `getShardIDs`, `createMultiCursor`, `listWithMultiCursor` and so on.
aarifullin
commented
Ok, fare point Ok, fare point
aarifullin
commented
Added Added `getShardIDs` and `createMultiCursor` but left `listWithMultiCursor` because the function became uglier with that :) But `funlen` is fine with this function anyway
|
|||||||
|
|
||||||
|
func newCursor(shardIDs []string) *Cursor {
|
||||||
fyrchik
commented
We already have similar methods like We already have similar methods like `sortedShards`, it is possible/easy to reuse/rewrite them somehow?
aarifullin
commented
Sorry, but I have not found anything with name Sorry, but I have not found anything with name `sortedShards`
|
|||||||
|
shardIDsMap := make(map[string]bool)
|
||||||
|
shardIDToCursor := make(map[string]*shard.Cursor)
|
||||||
|
for _, shardID := range shardIDs {
|
||||||
|
shardIDsMap[shardID] = false
|
||||||
|
}
|
||||||
|
return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
|
||||||
|
}
|
||||||
|
|
|
@ -2,77 +2,119 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestListWithCursor(t *testing.T) {
|
|
||||||
s1 := testNewShard(t, 1)
|
|
||||||
s2 := testNewShard(t, 2)
|
|
||||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
|
||||||
|
|
||||||
t.Cleanup(func() {
|
|
||||||
e.Close()
|
|
||||||
os.RemoveAll(t.Name())
|
|
||||||
})
|
|
||||||
|
|
||||||
const total = 20
|
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, total)
|
|
||||||
got := make([]object.AddressWithType, 0, total)
|
|
||||||
|
|
||||||
for i := 0; i < total; i++ {
|
|
||||||
containerID := cidtest.ID()
|
|
||||||
obj := testutil.GenerateObjectWithCID(containerID)
|
|
||||||
|
|
||||||
var prm PutPrm
|
|
||||||
prm.WithObject(obj)
|
|
||||||
|
|
||||||
err := e.Put(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
|
||||||
}
|
|
||||||
|
|
||||||
expected = sortAddresses(expected)
|
|
||||||
|
|
||||||
var prm ListWithCursorPrm
|
|
||||||
prm.WithCount(1)
|
|
||||||
|
|
||||||
res, err := e.ListWithCursor(prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotEmpty(t, res.AddressList())
|
|
||||||
got = append(got, res.AddressList()...)
|
|
||||||
|
|
||||||
for i := 0; i < total-1; i++ {
|
|
||||||
prm.WithCursor(res.Cursor())
|
|
||||||
|
|
||||||
res, err = e.ListWithCursor(prm)
|
|
||||||
if errors.Is(err, ErrEndOfListing) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
got = append(got, res.AddressList()...)
|
|
||||||
}
|
|
||||||
|
|
||||||
prm.WithCursor(res.Cursor())
|
|
||||||
|
|
||||||
_, err = e.ListWithCursor(prm)
|
|
||||||
require.ErrorIs(t, err, ErrEndOfListing)
|
|
||||||
|
|
||||||
got = sortAddresses(got)
|
|
||||||
require.Equal(t, expected, got)
|
|
||||||
}
|
|
||||||
|
|
||||||
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
|
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
|
||||||
sort.Slice(addrWithType, func(i, j int) bool {
|
sort.Slice(addrWithType, func(i, j int) bool {
|
||||||
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
|
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
|
||||||
})
|
})
|
||||||
return addrWithType
|
return addrWithType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListWithCursor(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
shardNum int
|
||||||
|
objectNum int
|
||||||
|
batchSize uint32
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "one shard, few objects, small batch size",
|
||||||
|
shardNum: 1,
|
||||||
|
objectNum: 2,
|
||||||
|
batchSize: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one shard, many objects, big batch size",
|
||||||
|
shardNum: 1,
|
||||||
|
objectNum: 53,
|
||||||
|
batchSize: 100,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "many shards, many objects, small batch size",
|
||||||
|
shardNum: 6,
|
||||||
|
objectNum: 66,
|
||||||
|
batchSize: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "many shards, many objects, big batch size",
|
||||||
|
shardNum: 6,
|
||||||
|
objectNum: 99,
|
||||||
|
batchSize: 100,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
e := testNewEngine(t).setShardsNumOpts(t, tt.shardNum, func(id int) []shard.Option {
|
||||||
|
return []shard.Option{
|
||||||
|
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
|
||||||
|
shard.WithBlobStorOptions(
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What is What is `many` in this context? I mean what is the difference between 3 and 6?
aarifullin
commented
Removed extra cases Removed extra cases
|
|||||||
|
blobstor.WithStorages(
|
||||||
|
newStorages(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.blobstor", id)),
|
||||||
|
1<<20))),
|
||||||
|
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.pilorama", id)))),
|
||||||
|
shard.WithMetaBaseOptions(
|
||||||
|
meta.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.metabase", id))),
|
||||||
|
meta.WithPermissions(0700),
|
||||||
|
meta.WithEpochState(epochState{}),
|
||||||
|
)}
|
||||||
|
}).engine
|
||||||
|
require.NoError(t, e.Open())
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
e.Close()
|
||||||
|
os.RemoveAll(t.Name())
|
||||||
|
})
|
||||||
|
|
||||||
|
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
||||||
|
got := make([]object.AddressWithType, 0, tt.objectNum)
|
||||||
|
|
||||||
|
for i := 0; i < tt.objectNum; i++ {
|
||||||
|
containerID := cidtest.ID()
|
||||||
|
obj := testutil.GenerateObjectWithCIDWithPayload(containerID, []byte{'a'})
|
||||||
|
|
||||||
|
var prm PutPrm
|
||||||
|
prm.WithObject(obj)
|
||||||
|
|
||||||
|
err := e.Put(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
|
||||||
|
}
|
||||||
|
expected = sortAddresses(expected)
|
||||||
|
|
||||||
|
var prm ListWithCursorPrm
|
||||||
|
prm.count = tt.batchSize
|
||||||
|
for {
|
||||||
|
res, err := e.ListWithCursor(prm)
|
||||||
|
if err == ErrEndOfListing {
|
||||||
|
require.Empty(t, res.AddressList())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NotEmpty(t, res.AddressList())
|
||||||
|
got = append(got, res.AddressList()...)
|
||||||
|
prm.cursor = res.Cursor()
|
||||||
|
}
|
||||||
|
|
||||||
|
got = sortAddresses(got)
|
||||||
|
require.Equal(t, expected, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
From your POV, do this methods really improve readability?
Outdated. Removed this method