diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 3065c8370b..5722c68aac 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -50,10 +50,23 @@ func (b *batch) run() { return b.operations[i].Time < b.operations[j].Time }) + b.operations = removeDuplicatesInPlace(b.operations) var lm Move return b.forest.applyOperation(bLog, bTree, b.operations, &lm) }) - for i := range b.operations { + for i := range b.results { b.results[i] <- err } } + +func removeDuplicatesInPlace(a []*Move) []*Move { + equalCount := 0 + for i := 1; i < len(a); i++ { + if a[i].Time == a[i-1].Time { + equalCount++ + } else { + a[i-equalCount] = a[i] + } + } + return a[:len(a)-equalCount] +} diff --git a/pkg/local_object_storage/pilorama/batch_test.go b/pkg/local_object_storage/pilorama/batch_test.go new file mode 100644 index 0000000000..931fce18cd --- /dev/null +++ b/pkg/local_object_storage/pilorama/batch_test.go @@ -0,0 +1,70 @@ +package pilorama + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_removeDuplicatesInPlace(t *testing.T) { + testCases := []struct { + before []int + after []int + }{ + { + before: []int{}, + after: []int{}, + }, + { + before: []int{1}, + after: []int{1}, + }, + { + before: []int{1, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 3}, + after: []int{1, 2, 3}, + }, + { + before: []int{1, 1, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 2, 2, 3}, + after: []int{1, 2, 3}, + }, + { + before: []int{1, 1, 1}, + after: []int{1}, + }, + { + before: []int{1, 1, 2, 2}, + after: []int{1, 2}, + }, + { + before: []int{1, 1, 1, 2, 3, 3, 3}, + after: []int{1, 2, 3}, + }, + } + + for _, tc := range testCases { + ops := make([]*Move, len(tc.before)) + for i := range ops { + ops[i] = &Move{Meta: Meta{Time: Timestamp(tc.before[i])}} + } + + expected := make([]*Move, len(tc.after)) + for i := range expected { + expected[i] = &Move{Meta: Meta{Time: Timestamp(tc.after[i])}} + } + + actual := removeDuplicatesInPlace(ops) + require.Equal(t, expected, actual, "%d", tc.before) + } +} diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 948ad60897..d56a3f5432 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -1,6 +1,7 @@ package pilorama import ( + "context" "fmt" "math/rand" "os" @@ -13,6 +14,7 @@ import ( cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) var providers = []struct { @@ -445,6 +447,82 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio }) } +func TestForest_ApplySameOperation(t *testing.T) { + for i := range providers { + t.Run(providers[i].name, func(t *testing.T) { + parallel := providers[i].name != "inmemory" + testForestApplySameOperation(t, providers[i].construct, parallel) + }) + } +} + +func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, parallel bool) { + cid := cidtest.ID() + treeID := "version" + + batchSize := 3 + errG, _ := errgroup.WithContext(context.Background()) + if !parallel { + batchSize = 1 + errG.SetLimit(1) + } + + meta := []Meta{ + {Time: 1, Items: []KeyValue{{AttributeFilename, []byte("1")}, {"attr", []byte{1}}}}, + {Time: 2, Items: []KeyValue{{AttributeFilename, []byte("2")}, {"attr", []byte{1}}}}, + {Time: 3, Items: []KeyValue{{AttributeFilename, []byte("3")}, {"attr", []byte{1}}}}, + } + logs := []Move{ + { + Child: 1, + Parent: RootID, + Meta: meta[0], + }, + { + Child: 2, + Parent: 1, + Meta: meta[1], + }, + { + Child: 1, + Parent: 2, + Meta: meta[2], + }, + } + + check := func(t *testing.T, s Forest) { + testMeta(t, s, cid, treeID, 1, RootID, meta[0]) + testMeta(t, s, cid, treeID, 2, 1, meta[1]) + + nodes, err := s.TreeGetChildren(cid, treeID, RootID) + require.NoError(t, err) + require.Equal(t, []Node{1}, nodes) + + nodes, err = s.TreeGetChildren(cid, treeID, 1) + require.NoError(t, err) + require.Equal(t, []Node{2}, nodes) + } + + t.Run("expected", func(t *testing.T) { + s := constructor(t) + for i := range logs { + require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false)) + } + check(t, s) + }) + + s := constructor(t, WithMaxBatchSize(batchSize)) + require.NoError(t, s.TreeApply(cid, treeID, &logs[0], false)) + for i := 0; i < batchSize; i++ { + errG.Go(func() error { + return s.TreeApply(cid, treeID, &logs[2], false) + }) + } + require.NoError(t, errG.Wait()) + require.NoError(t, s.TreeApply(cid, treeID, &logs[1], false)) + check(t, s) +} + func TestForest_GetOpLog(t *testing.T) { for i := range providers { t.Run(providers[i].name, func(t *testing.T) {