[#19] node: Make policier read shards concurrently #257

Merged
fyrchik merged 1 commit from aarifullin/frostfs-node:feature/19-list_mul_cursor into master 2023-04-27 08:16:43 +00:00
2 changed files with 187 additions and 92 deletions

View file

@ -1,6 +1,7 @@
package engine package engine
import ( import (
"math/rand"
"sort" "sort"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -12,10 +13,38 @@ import (
// cursor. Use nil cursor object to start listing again. // cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = shard.ErrEndOfListing var ErrEndOfListing = shard.ErrEndOfListing
// Cursor is a type for continuous object listing. // Cursor is a type for continuous object listing. Cursor contains shard IDs to read
// and shard cursors that contain state from previous read.
type Cursor struct { type Cursor struct {
shardID string current string
shardCursor *shard.Cursor shardIDs map[string]bool
shardIDToCursor map[string]*shard.Cursor
}
func (c *Cursor) getCurrentShardCursor() *shard.Cursor {
return c.shardIDToCursor[c.current]
}
func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) {
c.shardIDToCursor[c.current] = sc
}
func (c *Cursor) nextShard() bool {
var shardsToRead []string
for shardID, read := range c.shardIDs {
if !read {
shardsToRead = append(shardsToRead, shardID)
fyrchik marked this conversation as resolved Outdated

From your POV, do this methods really improve readability?

From your POV, do this methods really improve readability?

Outdated. Removed this method

Outdated. Removed this method
}
}
if len(shardsToRead) == 0 {
return false
}
c.current = shardsToRead[rand.Intn(len(shardsToRead))]
return true
}
func (c *Cursor) setShardRead(shardID string) {
c.shardIDs[shardID] = true

So we have a list of shardIDs in each element of this slice. Can we share both them and curr?

So we have a list of `shardIDs` in each element of this slice. Can we share both them and `curr`?

Outdated. Removed this method

Outdated. Removed this method
} }
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
@ -57,65 +86,69 @@ func (l ListWithCursorRes) Cursor() *Cursor {
// Does not include inhumed objects. Use cursor value from the response // Does not include inhumed objects. Use cursor value from the response
// for consecutive requests. // for consecutive requests.
// //
// If count param is big enough, then the method reads objects from different shards
// by portions. In this case shards are chosen randomly, if they're not read out yet.
//
// Adding a shard between ListWithCursor does not invalidate the cursor but new shard
// won't be listed.
// Removing a shard between ListWithCursor leads to the undefined behavior
// (e.g. usage of the objects from the removed shard).
//
fyrchik marked this conversation as resolved Outdated

What if it is == 0?

What if it is == 0?

Nothing wrong, but let's return ListWithCursorRes{}, ErrEndOfListing.

Also, len(cursor.shardCursorWrappers) <= int(prm.count). See above how it's adjusted

Nothing wrong, but let's return `ListWithCursorRes{}, ErrEndOfListing`. Also, `len(cursor.shardCursorWrappers) <= int(prm.count)`. See above how it's adjusted
carpawell marked this conversation as resolved Outdated

is that planned to be solved?

(also, dot at the end of the sentence)

is that planned to be solved? (also, dot at the end of the sentence)

is that planned to be solved?

I don't think so. Anyway, it's rather warning for user.

From my POV, it's fine when an iterator gets invalidated because data has been changed

> is that planned to be solved? I don't think so. Anyway, it's rather warning for user. From my POV, it's fine when an iterator gets invalidated because data has been changed
// Returns ErrEndOfListing if there are no more objects to return or count // Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero. // parameter set to zero.
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
result := make([]objectcore.AddressWithType, 0, prm.count) result := make([]objectcore.AddressWithType, 0, prm.count)
// 1. Get available shards and sort them. // Set initial cursors
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()
if len(shardIDs) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
sort.Slice(shardIDs, func(i, j int) bool {
return shardIDs[i] < shardIDs[j]
})
// 2. Prepare cursor object.
cursor := prm.cursor cursor := prm.cursor
if cursor == nil { if cursor == nil {
cursor = &Cursor{shardID: shardIDs[0]} shardIDs := getSortedShardIDs(e)
if len(shardIDs) == 0 {
return ListWithCursorRes{}, ErrEndOfListing
}
cursor = newCursor(shardIDs)
} }
// 3. Iterate over available shards. Skip unavailable shards. const (
for i := range shardIDs { splitShardCountLimit = 100
shardsNum = 4
)
batchSize := prm.count
if batchSize >= splitShardCountLimit {
batchSize /= shardsNum
carpawell marked this conversation as resolved Outdated

usually we do just var wg sync.WaitGroup

usually we do just `var wg sync.WaitGroup`

I should get used to it :) Fixed!

I should get used to it :) Fixed!
}
for cursor.nextShard() {
if len(result) >= int(prm.count) { if len(result) >= int(prm.count) {
break break
} }
curr := cursor.current
if shardIDs[i] < cursor.shardID {
continue
}
e.mtx.RLock() e.mtx.RLock()
shardInstance, ok := e.shards[shardIDs[i]] shardInstance, ok := e.shards[curr]
e.mtx.RUnlock() e.mtx.RUnlock()
if !ok { if !ok {
cursor.setShardRead(curr)
continue continue
} }
count := uint32(int(prm.count) - len(result)) count := prm.count - uint32(len(result))
if count > batchSize {
count = batchSize
}
var shardPrm shard.ListWithCursorPrm var shardPrm shard.ListWithCursorPrm
shardPrm.WithCount(count) shardPrm.WithCount(count)
if shardIDs[i] == cursor.shardID { shardPrm.WithCursor(cursor.getCurrentShardCursor())
shardPrm.WithCursor(cursor.shardCursor)
}
res, err := shardInstance.ListWithCursor(shardPrm) res, err := shardInstance.ListWithCursor(shardPrm)
if err != nil { if err != nil {
cursor.setShardRead(curr)
continue continue
} }
result = append(result, res.AddressList()...) result = append(result, res.AddressList()...)
cursor.shardCursor = res.Cursor() cursor.setCurrentShardCursor(res.Cursor())
cursor.shardID = shardIDs[i]
} }
if len(result) == 0 { if len(result) == 0 {
@ -127,3 +160,23 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes
cursor: cursor, cursor: cursor,
}, nil }, nil
} }
func getSortedShardIDs(e *StorageEngine) []string {
e.mtx.RLock()
shardIDs := make([]string, 0, len(e.shards))
for id := range e.shards {
shardIDs = append(shardIDs, id)
}
e.mtx.RUnlock()
sort.Strings(shardIDs)
return shardIDs
fyrchik marked this conversation as resolved Outdated

Why did you decide to implement a separate engine method instead of a simple wrapper in the policer package?

Why did you decide to implement a separate engine method instead of a simple wrapper in the policer package?

Briefly: no way

I tried many ways to manage reading shards from the policer but nothing succeeded. If ListWithCursor is used, then shards are re-read few times. We don't need it

Briefly: no way I tried many ways to manage reading shards from the policer but nothing succeeded. If `ListWithCursor` is used, then shards are re-read few times. We don't need it

Ok, could you describe the behaviour in the doc-comment?
I am specifically interested in (1) what do we do if the shard is removed/added and in (2) error handling (one of the shards went degraded -> we get error on list).

Ok, could you describe the behaviour in the doc-comment? I am specifically interested in (1) what do we do if the shard is removed/added and in (2) error handling (one of the shards went degraded -> we get error on list).

Please, check new comments for the method. If it lacks something, I'll add

Please, check new comments for the method. If it lacks something, I'll add
}
dstepanov-yadro marked this conversation as resolved Outdated

Does funlen linter marks this function as function with too many lines of code?
I propose to divide this function: getShardIDs, createMultiCursor, listWithMultiCursor and so on.

Does `funlen` linter marks this function as function with too many lines of code? I propose to divide this function: `getShardIDs`, `createMultiCursor`, `listWithMultiCursor` and so on.

Ok, fare point

Ok, fare point

Added getShardIDs and createMultiCursor but left listWithMultiCursor because the function became uglier with that :) But funlen is fine with this function anyway

Added `getShardIDs` and `createMultiCursor` but left `listWithMultiCursor` because the function became uglier with that :) But `funlen` is fine with this function anyway
func newCursor(shardIDs []string) *Cursor {

We already have similar methods like sortedShards, it is possible/easy to reuse/rewrite them somehow?

We already have similar methods like `sortedShards`, it is possible/easy to reuse/rewrite them somehow?

Sorry, but I have not found anything with name sortedShards

Sorry, but I have not found anything with name `sortedShards`
shardIDsMap := make(map[string]bool)
shardIDToCursor := make(map[string]*shard.Cursor)
for _, shardID := range shardIDs {
shardIDsMap[shardID] = false
}
return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor}
}

View file

@ -2,77 +2,119 @@ package engine
import ( import (
"context" "context"
"errors" "fmt"
"os" "os"
"path/filepath"
"sort" "sort"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
) )
func TestListWithCursor(t *testing.T) {
s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2)
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})
const total = 20
expected := make([]object.AddressWithType, 0, total)
got := make([]object.AddressWithType, 0, total)
for i := 0; i < total; i++ {
containerID := cidtest.ID()
obj := testutil.GenerateObjectWithCID(containerID)
var prm PutPrm
prm.WithObject(obj)
err := e.Put(context.Background(), prm)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
}
expected = sortAddresses(expected)
var prm ListWithCursorPrm
prm.WithCount(1)
res, err := e.ListWithCursor(prm)
require.NoError(t, err)
require.NotEmpty(t, res.AddressList())
got = append(got, res.AddressList()...)
for i := 0; i < total-1; i++ {
prm.WithCursor(res.Cursor())
res, err = e.ListWithCursor(prm)
if errors.Is(err, ErrEndOfListing) {
break
}
got = append(got, res.AddressList()...)
}
prm.WithCursor(res.Cursor())
_, err = e.ListWithCursor(prm)
require.ErrorIs(t, err, ErrEndOfListing)
got = sortAddresses(got)
require.Equal(t, expected, got)
}
func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType { func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType {
sort.Slice(addrWithType, func(i, j int) bool { sort.Slice(addrWithType, func(i, j int) bool {
return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString() return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString()
}) })
return addrWithType return addrWithType
} }
func TestListWithCursor(t *testing.T) {
tests := []struct {
name string
shardNum int
objectNum int
batchSize uint32
}{
{
name: "one shard, few objects, small batch size",
shardNum: 1,
objectNum: 2,
batchSize: 1,
},
{
name: "one shard, many objects, big batch size",
shardNum: 1,
objectNum: 53,
batchSize: 100,
},
{
name: "many shards, many objects, small batch size",
shardNum: 6,
objectNum: 66,
batchSize: 1,
},
{
name: "many shards, many objects, big batch size",
shardNum: 6,
objectNum: 99,
batchSize: 100,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := testNewEngine(t).setShardsNumOpts(t, tt.shardNum, func(id int) []shard.Option {
return []shard.Option{
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions(
fyrchik marked this conversation as resolved Outdated

What is many in this context? I mean what is the difference between 3 and 6?

What is `many` in this context? I mean what is the difference between 3 and 6?

Removed extra cases

Removed extra cases
blobstor.WithStorages(
newStorages(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.blobstor", id)),
1<<20))),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.pilorama", id)))),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
)}
}).engine
require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})
expected := make([]object.AddressWithType, 0, tt.objectNum)
got := make([]object.AddressWithType, 0, tt.objectNum)
for i := 0; i < tt.objectNum; i++ {
containerID := cidtest.ID()
obj := testutil.GenerateObjectWithCIDWithPayload(containerID, []byte{'a'})
var prm PutPrm
prm.WithObject(obj)
err := e.Put(context.Background(), prm)
require.NoError(t, err)
expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)})
}
expected = sortAddresses(expected)
var prm ListWithCursorPrm
prm.count = tt.batchSize
for {
res, err := e.ListWithCursor(prm)
if err == ErrEndOfListing {
require.Empty(t, res.AddressList())
break
}
require.NotEmpty(t, res.AddressList())
got = append(got, res.AddressList()...)
prm.cursor = res.Cursor()
}
got = sortAddresses(got)
require.Equal(t, expected, got)
})
}
}