From 0ca18653b8c6f4d6e85934f0c9724fd3bdbac7e4 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 25 May 2023 18:40:27 +0300 Subject: [PATCH 1/2] [#396] pilorama: Disallow applying same operations 1. In redo() we save the old state. 2. If we do redo() for the same operation twice, the old state will be overritten with the new one. 3. This in turn affects undo() and subsequent isAncestor() check. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/pilorama/batch.go | 15 +++- .../pilorama/batch_test.go | 70 +++++++++++++++++ .../pilorama/forest_test.go | 78 +++++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 pkg/local_object_storage/pilorama/batch_test.go diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 3065c8370..5722c68aa 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -50,10 +50,23 @@ func (b *batch) run() { return b.operations[i].Time < b.operations[j].Time }) + b.operations = removeDuplicatesInPlace(b.operations) var lm Move return b.forest.applyOperation(bLog, bTree, b.operations, &lm) }) - for i := range b.operations { + for i := range b.results { b.results[i] <- err } } + +func removeDuplicatesInPlace(a []*Move) []*Move { + equalCount := 0 + for i := 1; i < len(a); i++ { + if a[i].Time == a[i-1].Time { + equalCount++ + } else { + a[i-equalCount] = a[i] + } + } + return a[:len(a)-equalCount] +} diff --git a/pkg/local_object_storage/pilorama/batch_test.go b/pkg/local_object_storage/pilorama/batch_test.go new file mode 100644 index 000000000..931fce18c --- /dev/null +++ b/pkg/local_object_storage/pilorama/batch_test.go @@ -0,0 +1,70 @@ +package pilorama + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_removeDuplicatesInPlace(t *testing.T) { + testCases := []struct { + before []int + after []int + }{ + { + before: []int{}, + after: []int{}, + }, + { + before: []int{1}, + after: []int{1}, + }, + { + before: []int{1, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 3}, + after: []int{1, 2, 3}, + }, + { + before: []int{1, 1, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 2, 3}, + after: []int{1, 2, 3}, + }, + { + before: []int{1, 1, 1}, + after: []int{1}, + }, + { + before: []int{1, 1, 2, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 1, 1, 2, 3, 3, 3}, + after: []int{1, 2, 3}, + }, + } + + for _, tc := range testCases { + ops := make([]*Move, len(tc.before)) + for i := range ops { + ops[i] = &Move{Meta: Meta{Time: Timestamp(tc.before[i])}} + } + + expected := make([]*Move, len(tc.after)) + for i := range expected { + expected[i] = &Move{Meta: Meta{Time: Timestamp(tc.after[i])}} + } + + actual := removeDuplicatesInPlace(ops) + require.Equal(t, expected, actual, "%d", tc.before) + } +} diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 948ad6089..d56a3f543 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -1,6 +1,7 @@ package pilorama import ( + "context" "fmt" "math/rand" "os" @@ -13,6 +14,7 @@ import ( cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) var providers = []struct { @@ -445,6 +447,82 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio }) } +func TestForest_ApplySameOperation(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + parallel := providers[i].name != "inmemory" + testForestApplySameOperation(t, providers[i].construct, parallel) + }) + } +} + +func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, parallel bool) { + cid := cidtest.ID() + treeID := "version" + + batchSize := 3 + errG, _ := errgroup.WithContext(context.Background()) + if !parallel { + batchSize = 1 + errG.SetLimit(1) + } + + meta := []Meta{ + {Time: 1, Items: []KeyValue{{AttributeFilename, []byte("1")}, {"attr", []byte{1}}}}, + {Time: 2, Items: []KeyValue{{AttributeFilename, []byte("2")}, {"attr", []byte{1}}}}, + {Time: 3, Items: []KeyValue{{AttributeFilename, []byte("3")}, {"attr", []byte{1}}}}, + } + logs := []Move{ + { + Child: 1, + Parent: RootID, + Meta: meta[0], + }, + { + Child: 2, + Parent: 1, + Meta: meta[1], + }, + { + Child: 1, + Parent: 2, + Meta: meta[2], + }, + } + + check := func(t *testing.T, s Forest) { + testMeta(t, s, cid, treeID, 1, RootID, meta[0]) + testMeta(t, s, cid, treeID, 2, 1, meta[1]) + + nodes, err := s.TreeGetChildren(cid, treeID, RootID) + require.NoError(t, err) + require.Equal(t, []Node{1}, nodes) + + nodes, err = s.TreeGetChildren(cid, treeID, 1) + require.NoError(t, err) + require.Equal(t, []Node{2}, nodes) + } + + t.Run("expected", func(t *testing.T) { + s := constructor(t) + for i := range logs { + require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false)) + } + check(t, s) + }) + + s := constructor(t, WithMaxBatchSize(batchSize)) + require.NoError(t, s.TreeApply(cid, treeID, &logs[0], false)) + for i := 0; i < batchSize; i++ { + errG.Go(func() error { + return s.TreeApply(cid, treeID, &logs[2], false) + }) + } + require.NoError(t, errG.Wait()) + require.NoError(t, s.TreeApply(cid, treeID, &logs[1], false)) + check(t, s) +} + func TestForest_GetOpLog(t *testing.T) { for i := range providers { t.Run(providers[i].name, func(t *testing.T) { -- 2.45.2 From 785bbd5095e3ff29b2ce668bdaf4054be4815f7e Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 26 May 2023 17:00:33 +0300 Subject: [PATCH 2/2] [#396] treesvc: properly remember last height on shutdown Previously `newHeight` was updated in parallel, so that applying operation at height H did not imply successful TreeApply() for H-1. And because we have no context in TreeUpdateLastSyncHeight(), invalid starting height could be written if the context was canceled. In this commit we return the new height only if all operations were successfully applied. Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/sync.go | 46 ++++++++++++++------------------------- 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index ed87eac45..81e4a8e44 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -206,28 +206,26 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri errG, ctx := errgroup.WithContext(ctx) errG.SetLimit(1024) - var heightMtx sync.Mutex - for { - newHeight := height req := &GetOpLogRequest{ Body: &GetOpLogRequest_Body{ ContainerId: rawCID, TreeId: treeID, - Height: newHeight, + Height: height, }, } if err := SignMessage(req, s.key); err != nil { _ = errG.Wait() - return newHeight, err + return height, err } c, err := treeClient.GetOpLog(ctx, req) if err != nil { _ = errG.Wait() - return newHeight, fmt.Errorf("can't initialize client: %w", err) + return height, fmt.Errorf("can't initialize client: %w", err) } + lastApplied := height res, err := c.Recv() for ; err == nil; res, err = c.Recv() { lm := res.GetBody().GetOperation() @@ -237,39 +235,27 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri } if err := m.Meta.FromBytes(lm.Meta); err != nil { _ = errG.Wait() - return newHeight, err + return height, err + } + if lastApplied < m.Meta.Time { + lastApplied = m.Meta.Time } errG.Go(func() error { - err := s.forest.TreeApply(cid, treeID, m, true) - heightMtx.Lock() - defer heightMtx.Unlock() - if err != nil { - if newHeight > height { - height = newHeight - } - return err - } - if m.Time > newHeight { - newHeight = m.Time + 1 - } else { - newHeight++ - } - return nil + return s.forest.TreeApply(cid, treeID, m, true) }) } + // First check local errors: if everything is ok, we can update starting height, + // because everything was applied. applyErr := errG.Wait() - if err == nil { - err = applyErr + if applyErr != nil { + return height, applyErr } - heightMtx.Lock() - if height == newHeight || err != nil && !errors.Is(err, io.EOF) { - heightMtx.Unlock() - return newHeight, err + height = lastApplied + if err != nil && !errors.Is(err, io.EOF) { + return height, err } - height = newHeight - heightMtx.Unlock() } } -- 2.45.2