pilorama: disallow applying same operations (SUPPORT) #396
4 changed files with 178 additions and 31 deletions
|
@ -50,10 +50,23 @@ func (b *batch) run() {
|
||||||
return b.operations[i].Time < b.operations[j].Time
|
return b.operations[i].Time < b.operations[j].Time
|
||||||
})
|
})
|
||||||
|
|
||||||
|
b.operations = removeDuplicatesInPlace(b.operations)
|
||||||
var lm Move
|
var lm Move
|
||||||
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
||||||
})
|
})
|
||||||
for i := range b.operations {
|
for i := range b.results {
|
||||||
b.results[i] <- err
|
b.results[i] <- err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func removeDuplicatesInPlace(a []*Move) []*Move {
|
||||||
|
equalCount := 0
|
||||||
|
for i := 1; i < len(a); i++ {
|
||||||
|
if a[i].Time == a[i-1].Time {
|
||||||
|
equalCount++
|
||||||
|
} else {
|
||||||
|
a[i-equalCount] = a[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return a[:len(a)-equalCount]
|
||||||
|
}
|
||||||
|
|
70
pkg/local_object_storage/pilorama/batch_test.go
Normal file
70
pkg/local_object_storage/pilorama/batch_test.go
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
package pilorama
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_removeDuplicatesInPlace(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
before []int
|
||||||
|
after []int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
before: []int{},
|
||||||
|
after: []int{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1},
|
||||||
|
after: []int{1},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 2},
|
||||||
|
after: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 2, 3},
|
||||||
|
after: []int{1, 2, 3},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 1, 2},
|
||||||
|
after: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 2, 2},
|
||||||
|
after: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 2, 2, 3},
|
||||||
|
after: []int{1, 2, 3},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 1, 1},
|
||||||
|
after: []int{1},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 1, 2, 2},
|
||||||
|
after: []int{1, 2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
before: []int{1, 1, 1, 2, 3, 3, 3},
|
||||||
|
after: []int{1, 2, 3},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
ops := make([]*Move, len(tc.before))
|
||||||
|
for i := range ops {
|
||||||
|
ops[i] = &Move{Meta: Meta{Time: Timestamp(tc.before[i])}}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := make([]*Move, len(tc.after))
|
||||||
|
for i := range expected {
|
||||||
|
expected[i] = &Move{Meta: Meta{Time: Timestamp(tc.after[i])}}
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := removeDuplicatesInPlace(ops)
|
||||||
|
require.Equal(t, expected, actual, "%d", tc.before)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package pilorama
|
package pilorama
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
var providers = []struct {
|
var providers = []struct {
|
||||||
|
@ -445,6 +447,82 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestForest_ApplySameOperation(t *testing.T) {
|
||||||
|
for i := range providers {
|
||||||
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
|
parallel := providers[i].name != "inmemory"
|
||||||
|
testForestApplySameOperation(t, providers[i].construct, parallel)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, parallel bool) {
|
||||||
|
cid := cidtest.ID()
|
||||||
|
treeID := "version"
|
||||||
|
|
||||||
|
batchSize := 3
|
||||||
|
errG, _ := errgroup.WithContext(context.Background())
|
||||||
|
if !parallel {
|
||||||
|
batchSize = 1
|
||||||
|
errG.SetLimit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
meta := []Meta{
|
||||||
|
{Time: 1, Items: []KeyValue{{AttributeFilename, []byte("1")}, {"attr", []byte{1}}}},
|
||||||
|
{Time: 2, Items: []KeyValue{{AttributeFilename, []byte("2")}, {"attr", []byte{1}}}},
|
||||||
|
{Time: 3, Items: []KeyValue{{AttributeFilename, []byte("3")}, {"attr", []byte{1}}}},
|
||||||
|
}
|
||||||
|
logs := []Move{
|
||||||
|
{
|
||||||
|
Child: 1,
|
||||||
|
Parent: RootID,
|
||||||
|
Meta: meta[0],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Child: 2,
|
||||||
|
Parent: 1,
|
||||||
|
Meta: meta[1],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Child: 1,
|
||||||
|
Parent: 2,
|
||||||
|
Meta: meta[2],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
check := func(t *testing.T, s Forest) {
|
||||||
|
testMeta(t, s, cid, treeID, 1, RootID, meta[0])
|
||||||
|
testMeta(t, s, cid, treeID, 2, 1, meta[1])
|
||||||
|
|
||||||
|
nodes, err := s.TreeGetChildren(cid, treeID, RootID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, []Node{1}, nodes)
|
||||||
|
|
||||||
|
nodes, err = s.TreeGetChildren(cid, treeID, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, []Node{2}, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("expected", func(t *testing.T) {
|
||||||
|
s := constructor(t)
|
||||||
|
for i := range logs {
|
||||||
|
require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
|
||||||
|
}
|
||||||
|
check(t, s)
|
||||||
|
})
|
||||||
|
|
||||||
|
s := constructor(t, WithMaxBatchSize(batchSize))
|
||||||
|
require.NoError(t, s.TreeApply(cid, treeID, &logs[0], false))
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
errG.Go(func() error {
|
||||||
|
return s.TreeApply(cid, treeID, &logs[2], false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, errG.Wait())
|
||||||
|
require.NoError(t, s.TreeApply(cid, treeID, &logs[1], false))
|
||||||
|
check(t, s)
|
||||||
|
}
|
||||||
|
|
||||||
func TestForest_GetOpLog(t *testing.T) {
|
func TestForest_GetOpLog(t *testing.T) {
|
||||||
for i := range providers {
|
for i := range providers {
|
||||||
t.Run(providers[i].name, func(t *testing.T) {
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
|
|
|
@ -206,28 +206,26 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
errG, ctx := errgroup.WithContext(ctx)
|
errG, ctx := errgroup.WithContext(ctx)
|
||||||
errG.SetLimit(1024)
|
errG.SetLimit(1024)
|
||||||
|
|
||||||
var heightMtx sync.Mutex
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
newHeight := height
|
|
||||||
req := &GetOpLogRequest{
|
req := &GetOpLogRequest{
|
||||||
Body: &GetOpLogRequest_Body{
|
Body: &GetOpLogRequest_Body{
|
||||||
ContainerId: rawCID,
|
ContainerId: rawCID,
|
||||||
TreeId: treeID,
|
TreeId: treeID,
|
||||||
Height: newHeight,
|
Height: height,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
_ = errG.Wait()
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return height, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
c, err := treeClient.GetOpLog(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = errG.Wait()
|
_ = errG.Wait()
|
||||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
return height, fmt.Errorf("can't initialize client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastApplied := height
|
||||||
res, err := c.Recv()
|
res, err := c.Recv()
|
||||||
for ; err == nil; res, err = c.Recv() {
|
for ; err == nil; res, err = c.Recv() {
|
||||||
lm := res.GetBody().GetOperation()
|
lm := res.GetBody().GetOperation()
|
||||||
|
@ -237,39 +235,27 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
}
|
}
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
_ = errG.Wait()
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return height, err
|
||||||
|
}
|
||||||
|
if lastApplied < m.Meta.Time {
|
||||||
|
lastApplied = m.Meta.Time
|
||||||
}
|
}
|
||||||
errG.Go(func() error {
|
errG.Go(func() error {
|
||||||
err := s.forest.TreeApply(cid, treeID, m, true)
|
return s.forest.TreeApply(cid, treeID, m, true)
|
||||||
heightMtx.Lock()
|
|
||||||
defer heightMtx.Unlock()
|
|
||||||
if err != nil {
|
|
||||||
if newHeight > height {
|
|
||||||
height = newHeight
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if m.Time > newHeight {
|
|
||||||
newHeight = m.Time + 1
|
|
||||||
} else {
|
|
||||||
newHeight++
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First check local errors: if everything is ok, we can update starting height,
|
||||||
|
// because everything was applied.
|
||||||
applyErr := errG.Wait()
|
applyErr := errG.Wait()
|
||||||
if err == nil {
|
if applyErr != nil {
|
||||||
err = applyErr
|
return height, applyErr
|
||||||
}
|
}
|
||||||
|
|
||||||
heightMtx.Lock()
|
height = lastApplied
|
||||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
heightMtx.Unlock()
|
return height, err
|
||||||
return newHeight, err
|
|
||||||
}
|
}
|
||||||
height = newHeight
|
|
||||||
heightMtx.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue