From ada081dfd566341d38dc5633dc63783095e2c20c Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Sun, 16 Apr 2023 17:03:42 +0300 Subject: [PATCH] [#19] node: Make policier read shards concurrently * Introduce ListWithMultiCursor that simultaneously reads objects from different shards Signed-off-by: Airat Arifullin a.arifullin@yadro.com --- pkg/local_object_storage/engine/list.go | 123 ++++++++++----- pkg/local_object_storage/engine/list_test.go | 156 ++++++++++++------- 2 files changed, 187 insertions(+), 92 deletions(-) diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 8644a7f7e..8781416f4 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -1,6 +1,7 @@ package engine import ( + "math/rand" "sort" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -12,10 +13,38 @@ import ( // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = shard.ErrEndOfListing -// Cursor is a type for continuous object listing. +// Cursor is a type for continuous object listing. Cursor contains shard IDs to read +// and shard cursors that contain state from previous read. type Cursor struct { - shardID string - shardCursor *shard.Cursor + current string + shardIDs map[string]bool + shardIDToCursor map[string]*shard.Cursor +} + +func (c *Cursor) getCurrentShardCursor() *shard.Cursor { + return c.shardIDToCursor[c.current] +} + +func (c *Cursor) setCurrentShardCursor(sc *shard.Cursor) { + c.shardIDToCursor[c.current] = sc +} + +func (c *Cursor) nextShard() bool { + var shardsToRead []string + for shardID, read := range c.shardIDs { + if !read { + shardsToRead = append(shardsToRead, shardID) + } + } + if len(shardsToRead) == 0 { + return false + } + c.current = shardsToRead[rand.Intn(len(shardsToRead))] + return true +} + +func (c *Cursor) setShardRead(shardID string) { + c.shardIDs[shardID] = true } // ListWithCursorPrm contains parameters for ListWithCursor operation. @@ -57,65 +86,69 @@ func (l ListWithCursorRes) Cursor() *Cursor { // Does not include inhumed objects. Use cursor value from the response // for consecutive requests. // +// If count param is big enough, then the method reads objects from different shards +// by portions. In this case shards are chosen randomly, if they're not read out yet. +// +// Adding a shard between ListWithCursor does not invalidate the cursor but new shard +// won't be listed. +// Removing a shard between ListWithCursor leads to the undefined behavior +// (e.g. usage of the objects from the removed shard). +// // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { result := make([]objectcore.AddressWithType, 0, prm.count) - // 1. Get available shards and sort them. - e.mtx.RLock() - shardIDs := make([]string, 0, len(e.shards)) - for id := range e.shards { - shardIDs = append(shardIDs, id) - } - e.mtx.RUnlock() - - if len(shardIDs) == 0 { - return ListWithCursorRes{}, ErrEndOfListing - } - - sort.Slice(shardIDs, func(i, j int) bool { - return shardIDs[i] < shardIDs[j] - }) - - // 2. Prepare cursor object. + // Set initial cursors cursor := prm.cursor if cursor == nil { - cursor = &Cursor{shardID: shardIDs[0]} + shardIDs := getSortedShardIDs(e) + if len(shardIDs) == 0 { + return ListWithCursorRes{}, ErrEndOfListing + } + cursor = newCursor(shardIDs) } - // 3. Iterate over available shards. Skip unavailable shards. - for i := range shardIDs { + const ( + splitShardCountLimit = 100 + shardsNum = 4 + ) + + batchSize := prm.count + if batchSize >= splitShardCountLimit { + batchSize /= shardsNum + } + + for cursor.nextShard() { if len(result) >= int(prm.count) { break } - - if shardIDs[i] < cursor.shardID { - continue - } + curr := cursor.current e.mtx.RLock() - shardInstance, ok := e.shards[shardIDs[i]] + shardInstance, ok := e.shards[curr] e.mtx.RUnlock() if !ok { + cursor.setShardRead(curr) continue } - count := uint32(int(prm.count) - len(result)) + count := prm.count - uint32(len(result)) + if count > batchSize { + count = batchSize + } + var shardPrm shard.ListWithCursorPrm shardPrm.WithCount(count) - if shardIDs[i] == cursor.shardID { - shardPrm.WithCursor(cursor.shardCursor) - } + shardPrm.WithCursor(cursor.getCurrentShardCursor()) res, err := shardInstance.ListWithCursor(shardPrm) if err != nil { + cursor.setShardRead(curr) continue } - result = append(result, res.AddressList()...) - cursor.shardCursor = res.Cursor() - cursor.shardID = shardIDs[i] + cursor.setCurrentShardCursor(res.Cursor()) } if len(result) == 0 { @@ -127,3 +160,23 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes cursor: cursor, }, nil } + +func getSortedShardIDs(e *StorageEngine) []string { + e.mtx.RLock() + shardIDs := make([]string, 0, len(e.shards)) + for id := range e.shards { + shardIDs = append(shardIDs, id) + } + e.mtx.RUnlock() + sort.Strings(shardIDs) + return shardIDs +} + +func newCursor(shardIDs []string) *Cursor { + shardIDsMap := make(map[string]bool) + shardIDToCursor := make(map[string]*shard.Cursor) + for _, shardID := range shardIDs { + shardIDsMap[shardID] = false + } + return &Cursor{shardIDs: shardIDsMap, shardIDToCursor: shardIDToCursor} +} diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index fde799d05..44062be68 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -2,77 +2,119 @@ package engine import ( "context" - "errors" + "fmt" "os" + "path/filepath" "sort" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) -func TestListWithCursor(t *testing.T) { - s1 := testNewShard(t, 1) - s2 := testNewShard(t, 2) - e := testNewEngine(t).setInitializedShards(t, s1, s2).engine - - t.Cleanup(func() { - e.Close() - os.RemoveAll(t.Name()) - }) - - const total = 20 - - expected := make([]object.AddressWithType, 0, total) - got := make([]object.AddressWithType, 0, total) - - for i := 0; i < total; i++ { - containerID := cidtest.ID() - obj := testutil.GenerateObjectWithCID(containerID) - - var prm PutPrm - prm.WithObject(obj) - - err := e.Put(context.Background(), prm) - require.NoError(t, err) - expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) - } - - expected = sortAddresses(expected) - - var prm ListWithCursorPrm - prm.WithCount(1) - - res, err := e.ListWithCursor(prm) - require.NoError(t, err) - require.NotEmpty(t, res.AddressList()) - got = append(got, res.AddressList()...) - - for i := 0; i < total-1; i++ { - prm.WithCursor(res.Cursor()) - - res, err = e.ListWithCursor(prm) - if errors.Is(err, ErrEndOfListing) { - break - } - got = append(got, res.AddressList()...) - } - - prm.WithCursor(res.Cursor()) - - _, err = e.ListWithCursor(prm) - require.ErrorIs(t, err, ErrEndOfListing) - - got = sortAddresses(got) - require.Equal(t, expected, got) -} - func sortAddresses(addrWithType []object.AddressWithType) []object.AddressWithType { sort.Slice(addrWithType, func(i, j int) bool { return addrWithType[i].Address.EncodeToString() < addrWithType[j].Address.EncodeToString() }) return addrWithType } + +func TestListWithCursor(t *testing.T) { + tests := []struct { + name string + shardNum int + objectNum int + batchSize uint32 + }{ + { + name: "one shard, few objects, small batch size", + shardNum: 1, + objectNum: 2, + batchSize: 1, + }, + { + name: "one shard, many objects, big batch size", + shardNum: 1, + objectNum: 53, + batchSize: 100, + }, + { + name: "many shards, many objects, small batch size", + shardNum: 6, + objectNum: 66, + batchSize: 1, + }, + { + name: "many shards, many objects, big batch size", + shardNum: 6, + objectNum: 99, + batchSize: 100, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := testNewEngine(t).setShardsNumOpts(t, tt.shardNum, func(id int) []shard.Option { + return []shard.Option{ + shard.WithLogger(&logger.Logger{Logger: zap.L()}), + shard.WithBlobStorOptions( + blobstor.WithStorages( + newStorages(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.blobstor", id)), + 1<<20))), + shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.pilorama", id)))), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(t.Name(), tt.name, fmt.Sprintf("%d.metabase", id))), + meta.WithPermissions(0700), + meta.WithEpochState(epochState{}), + )} + }).engine + require.NoError(t, e.Open()) + require.NoError(t, e.Init(context.Background())) + + t.Cleanup(func() { + e.Close() + os.RemoveAll(t.Name()) + }) + + expected := make([]object.AddressWithType, 0, tt.objectNum) + got := make([]object.AddressWithType, 0, tt.objectNum) + + for i := 0; i < tt.objectNum; i++ { + containerID := cidtest.ID() + obj := testutil.GenerateObjectWithCIDWithPayload(containerID, []byte{'a'}) + + var prm PutPrm + prm.WithObject(obj) + + err := e.Put(context.Background(), prm) + require.NoError(t, err) + expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) + } + expected = sortAddresses(expected) + + var prm ListWithCursorPrm + prm.count = tt.batchSize + for { + res, err := e.ListWithCursor(prm) + if err == ErrEndOfListing { + require.Empty(t, res.AddressList()) + break + } + require.NotEmpty(t, res.AddressList()) + got = append(got, res.AddressList()...) + prm.cursor = res.Cursor() + } + + got = sortAddresses(got) + require.Equal(t, expected, got) + }) + } +}