[#396] pilorama: Disallow applying same operations
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful

1. In redo() we save the old state.
2. If we do redo() for the same operation twice, the old state will be
   overritten with the new one.
3. This in turn affects undo() and subsequent isAncestor() check.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-05-25 18:40:27 +03:00
parent a506da97d6
commit 0ca18653b8
3 changed files with 162 additions and 1 deletions

View file

@ -50,10 +50,23 @@ func (b *batch) run() {
return b.operations[i].Time < b.operations[j].Time
})
b.operations = removeDuplicatesInPlace(b.operations)
var lm Move
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
})
for i := range b.operations {
for i := range b.results {
b.results[i] <- err
}
}
func removeDuplicatesInPlace(a []*Move) []*Move {
equalCount := 0
for i := 1; i < len(a); i++ {
if a[i].Time == a[i-1].Time {
equalCount++
} else {
a[i-equalCount] = a[i]
}
}
return a[:len(a)-equalCount]
}

View file

@ -0,0 +1,70 @@
package pilorama
import (
"testing"
"github.com/stretchr/testify/require"
)
func Test_removeDuplicatesInPlace(t *testing.T) {
testCases := []struct {
before []int
after []int
}{
{
before: []int{},
after: []int{},
},
{
before: []int{1},
after: []int{1},
},
{
before: []int{1, 2},
after: []int{1, 2},
},
{
before: []int{1, 2, 3},
after: []int{1, 2, 3},
},
{
before: []int{1, 1, 2},
after: []int{1, 2},
},
{
before: []int{1, 2, 2},
after: []int{1, 2},
},
{
before: []int{1, 2, 2, 3},
after: []int{1, 2, 3},
},
{
before: []int{1, 1, 1},
after: []int{1},
},
{
before: []int{1, 1, 2, 2},
after: []int{1, 2},
},
{
before: []int{1, 1, 1, 2, 3, 3, 3},
after: []int{1, 2, 3},
},
}
for _, tc := range testCases {
ops := make([]*Move, len(tc.before))
for i := range ops {
ops[i] = &Move{Meta: Meta{Time: Timestamp(tc.before[i])}}
}
expected := make([]*Move, len(tc.after))
for i := range expected {
expected[i] = &Move{Meta: Meta{Time: Timestamp(tc.after[i])}}
}
actual := removeDuplicatesInPlace(ops)
require.Equal(t, expected, actual, "%d", tc.before)
}
}

View file

@ -1,6 +1,7 @@
package pilorama
import (
"context"
"fmt"
"math/rand"
"os"
@ -13,6 +14,7 @@ import (
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
var providers = []struct {
@ -445,6 +447,82 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio
})
}
func TestForest_ApplySameOperation(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
parallel := providers[i].name != "inmemory"
testForestApplySameOperation(t, providers[i].construct, parallel)
})
}
}
func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, parallel bool) {
cid := cidtest.ID()
treeID := "version"
batchSize := 3
errG, _ := errgroup.WithContext(context.Background())
if !parallel {
batchSize = 1
errG.SetLimit(1)
}
meta := []Meta{
{Time: 1, Items: []KeyValue{{AttributeFilename, []byte("1")}, {"attr", []byte{1}}}},
{Time: 2, Items: []KeyValue{{AttributeFilename, []byte("2")}, {"attr", []byte{1}}}},
{Time: 3, Items: []KeyValue{{AttributeFilename, []byte("3")}, {"attr", []byte{1}}}},
}
logs := []Move{
{
Child: 1,
Parent: RootID,
Meta: meta[0],
},
{
Child: 2,
Parent: 1,
Meta: meta[1],
},
{
Child: 1,
Parent: 2,
Meta: meta[2],
},
}
check := func(t *testing.T, s Forest) {
testMeta(t, s, cid, treeID, 1, RootID, meta[0])
testMeta(t, s, cid, treeID, 2, 1, meta[1])
nodes, err := s.TreeGetChildren(cid, treeID, RootID)
require.NoError(t, err)
require.Equal(t, []Node{1}, nodes)
nodes, err = s.TreeGetChildren(cid, treeID, 1)
require.NoError(t, err)
require.Equal(t, []Node{2}, nodes)
}
t.Run("expected", func(t *testing.T) {
s := constructor(t)
for i := range logs {
require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
}
check(t, s)
})
s := constructor(t, WithMaxBatchSize(batchSize))
require.NoError(t, s.TreeApply(cid, treeID, &logs[0], false))
for i := 0; i < batchSize; i++ {
errG.Go(func() error {
return s.TreeApply(cid, treeID, &logs[2], false)
})
}
require.NoError(t, errG.Wait())
require.NoError(t, s.TreeApply(cid, treeID, &logs[1], false))
check(t, s)
}
func TestForest_GetOpLog(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {