[#2165] pilorama: Optimize TreeApply when used for synchronization

Because synchronization _most likely_ will have apply already existing
operations, it is much faster to check their presence in a read
transaction. However, always doing this will degrade the perfomance
for normal `Apply`. And, let's be honest, it is already not good.
Thus we add a separate parameter which specifies whether this logic is
enabled.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2022-12-19 19:01:57 +03:00 committed by Anton Nikiforov
parent f9fcd85363
commit b4e90cdf51
8 changed files with 40 additions and 19 deletions

View file

@ -51,13 +51,13 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
}
// TreeApply implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error {
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
index, lst, err := e.getTreeShard(d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeApply(d, treeID, m)
err = lst[index].TreeApply(d, treeID, m, backgroundSync)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApply`", err,

View file

@ -255,11 +255,31 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
}
// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error {
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}
if backgroundSync {
var seen bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(d.CID, treeID))
if treeRoot == nil {
return nil
}
b := treeRoot.Bucket(logBucket)
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], m.Time)
seen = b.Get(logKey[:]) != nil
return nil
})
if err != nil || seen {
return err
}
}
return t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
if err != nil {

View file

@ -91,7 +91,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
}
// TreeApply implements the Forest interface.
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error {
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}

View file

@ -418,7 +418,7 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
Child: 10,
Parent: 0,
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
})
}, false)
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
})
@ -427,7 +427,7 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
Child: child,
Parent: parent,
Meta: meta,
}))
}, false))
}
t.Run("add a child, then insert a parent removal", func(t *testing.T) {
@ -489,7 +489,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest)
})
for i := range logs {
require.NoError(t, s.TreeApply(d, treeID, &logs[i]))
require.NoError(t, s.TreeApply(d, treeID, &logs[i], false))
}
testGetOpLog := func(t *testing.T, height uint64, m Move) {
@ -537,7 +537,7 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) {
checkExists(t, false, cid, treeID)
})
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}))
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
@ -573,7 +573,7 @@ func TestApplyTricky1(t *testing.T) {
t.Run(providers[i].name, func(t *testing.T) {
s := providers[i].construct(t)
for i := range ops {
require.NoError(t, s.TreeApply(d, treeID, &ops[i]))
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
}
for i := range expected {
@ -634,7 +634,7 @@ func TestApplyTricky2(t *testing.T) {
t.Run(providers[i].name, func(t *testing.T) {
s := providers[i].construct(t)
for i := range ops {
require.NoError(t, s.TreeApply(d, treeID, &ops[i]))
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
}
for i := range expected {
@ -702,7 +702,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
rand.Read(ops[i].Meta.Items[1].Value)
}
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i]))
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
for i := 0; i < iterCount; i++ {
@ -711,7 +711,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i]))
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
}
for i := uint64(0); i < nodeCount; i++ {
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
@ -821,7 +821,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
op := <-ch
if err := s.TreeApply(d, treeID, op); err != nil {
if err := s.TreeApply(d, treeID, op, false); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}
@ -904,7 +904,7 @@ func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor
Time: uint64(ts),
Items: items,
},
}))
}, false))
}
func TestGetTrees(t *testing.T) {

View file

@ -17,7 +17,8 @@ type Forest interface {
// Internal nodes in path should have exactly one attribute, otherwise a new node is created.
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
// TreeApply applies replicated operation from another node.
TreeApply(d CIDDescriptor, treeID string, m *Move) error
// If background is true, TreeApply will first check whether an operation exists.
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error
// TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the
// AttributeFilename in meta.

View file

@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
}
// TreeApply implements the pilorama.Forest interface.
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error {
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
if s.pilorama == nil {
return ErrPiloramaDisabled
}
@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
return s.pilorama.TreeApply(d, treeID, m)
return s.pilorama.TreeApply(d, treeID, m, backgroundSync)
}
// TreeGetByPath implements the pilorama.Forest interface.

View file

@ -489,7 +489,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
Parent: op.GetParentId(),
Child: op.GetChildId(),
Meta: meta,
})
}, false)
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {

View file

@ -218,7 +218,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return newHeight, err
}
if err := s.forest.TreeApply(d, treeID, m); err != nil {
if err := s.forest.TreeApply(d, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {