[#398] pilorama: Disallow applying same operations
ci/woodpecker/pr/pre-commit Pipeline was successful Details

1. In redo() we save the old state.
2. If we do redo() for the same operation twice, the old state will be
   overritten with the new one.
3. This in turn affects undo() and subsequent isAncestor() check.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
pull/398/head
Evgenii Stratonikov 2023-05-25 18:40:27 +03:00
parent 2613351008
commit 307738de18
3 changed files with 162 additions and 1 deletions

View File

@ -50,10 +50,23 @@ func (b *batch) run() {
return b.operations[i].Time < b.operations[j].Time
})
b.operations = removeDuplicatesInPlace(b.operations)
var lm Move
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
})
for i := range b.operations {
for i := range b.results {
b.results[i] <- err
}
}
func removeDuplicatesInPlace(a []*Move) []*Move {
equalCount := 0
for i := 1; i < len(a); i++ {
if a[i].Time == a[i-1].Time {
equalCount++
} else {
a[i-equalCount] = a[i]
}
}
return a[:len(a)-equalCount]
}

View File

@ -0,0 +1,70 @@
package pilorama
import (
"testing"
"github.com/stretchr/testify/require"
)
func Test_removeDuplicatesInPlace(t *testing.T) {
testCases := []struct {
before []int
after []int
}{
{
before: []int{},
after: []int{},
},
{
before: []int{1},
after: []int{1},
},
{
before: []int{1, 2},
after: []int{1, 2},
},
{
before: []int{1, 2, 3},
after: []int{1, 2, 3},
},
{
before: []int{1, 1, 2},
after: []int{1, 2},
},
{
before: []int{1, 2, 2},
after: []int{1, 2},
},
{
before: []int{1, 2, 2, 3},
after: []int{1, 2, 3},
},
{
before: []int{1, 1, 1},
after: []int{1},
},
{
before: []int{1, 1, 2, 2},
after: []int{1, 2},
},
{
before: []int{1, 1, 1, 2, 3, 3, 3},
after: []int{1, 2, 3},
},
}
for _, tc := range testCases {
ops := make([]*Move, len(tc.before))
for i := range ops {
ops[i] = &Move{Meta: Meta{Time: Timestamp(tc.before[i])}}
}
expected := make([]*Move, len(tc.after))
for i := range expected {
expected[i] = &Move{Meta: Meta{Time: Timestamp(tc.after[i])}}
}
actual := removeDuplicatesInPlace(ops)
require.Equal(t, expected, actual, "%d", tc.before)
}
}

View File

@ -13,6 +13,7 @@ import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
var providers = []struct {
@ -444,6 +445,83 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio
})
}
func TestForest_ApplySameOperation(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
parallel := providers[i].name != "inmemory"
testForestApplySameOperation(t, providers[i].construct, parallel)
})
}
}
func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, parallel bool) {
cid := cidtest.ID()
treeID := "version"
batchSize := 3
ctx := context.Background()
errG, _ := errgroup.WithContext(ctx)
if !parallel {
batchSize = 1
errG.SetLimit(1)
}
meta := []Meta{
{Time: 1, Items: []KeyValue{{AttributeFilename, []byte("1")}, {"attr", []byte{1}}}},
{Time: 2, Items: []KeyValue{{AttributeFilename, []byte("2")}, {"attr", []byte{1}}}},
{Time: 3, Items: []KeyValue{{AttributeFilename, []byte("3")}, {"attr", []byte{1}}}},
}
logs := []Move{
{
Child: 1,
Parent: RootID,
Meta: meta[0],
},
{
Child: 2,
Parent: 1,
Meta: meta[1],
},
{
Child: 1,
Parent: 2,
Meta: meta[2],
},
}
check := func(t *testing.T, s Forest) {
testMeta(t, s, cid, treeID, 1, RootID, meta[0])
testMeta(t, s, cid, treeID, 2, 1, meta[1])
nodes, err := s.TreeGetChildren(ctx, cid, treeID, RootID)
require.NoError(t, err)
require.Equal(t, []Node{1}, nodes)
nodes, err = s.TreeGetChildren(ctx, cid, treeID, 1)
require.NoError(t, err)
require.Equal(t, []Node{2}, nodes)
}
t.Run("expected", func(t *testing.T) {
s := constructor(t)
for i := range logs {
require.NoError(t, s.TreeApply(ctx, cid, treeID, &logs[i], false))
}
check(t, s)
})
s := constructor(t, WithMaxBatchSize(batchSize))
require.NoError(t, s.TreeApply(ctx, cid, treeID, &logs[0], false))
for i := 0; i < batchSize; i++ {
errG.Go(func() error {
return s.TreeApply(ctx, cid, treeID, &logs[2], false)
})
}
require.NoError(t, errG.Wait())
require.NoError(t, s.TreeApply(ctx, cid, treeID, &logs[1], false))
check(t, s)
}
func TestForest_GetOpLog(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {