diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 8ac0afe834..eb80e25ba1 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -51,13 +51,13 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a } // TreeApply implements the pilorama.Forest interface. -func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error { +func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error { index, lst, err := e.getTreeShard(d.CID, treeID) if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { return err } - err = lst[index].TreeApply(d, treeID, m) + err = lst[index].TreeApply(d, treeID, m, backgroundSync) if err != nil { if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { e.reportShardError(lst[index], "can't perform `TreeApply`", err, diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index a337d65c52..4c4c4e3ce6 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -255,11 +255,31 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { } // TreeApply implements the Forest interface. -func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error { +func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error { if !d.checkValid() { return ErrInvalidCIDDescriptor } + if backgroundSync { + var seen bool + err := t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(d.CID, treeID)) + if treeRoot == nil { + return nil + } + + b := treeRoot.Bucket(logBucket) + + var logKey [8]byte + binary.BigEndian.PutUint64(logKey[:], m.Time) + seen = b.Get(logKey[:]) != nil + return nil + }) + if err != nil || seen { + return err + } + } + return t.db.Batch(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) if err != nil { diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 9dd40708a7..ead1334ba9 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -91,7 +91,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string } // TreeApply implements the Forest interface. -func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error { +func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error { if !d.checkValid() { return ErrInvalidCIDDescriptor } diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index d7ddbc700f..54ae941235 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -418,7 +418,7 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) { Child: 10, Parent: 0, Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}}, - }) + }, false) require.ErrorIs(t, err, ErrInvalidCIDDescriptor) }) @@ -427,7 +427,7 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) { Child: child, Parent: parent, Meta: meta, - })) + }, false)) } t.Run("add a child, then insert a parent removal", func(t *testing.T) { @@ -489,7 +489,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) }) for i := range logs { - require.NoError(t, s.TreeApply(d, treeID, &logs[i])) + require.NoError(t, s.TreeApply(d, treeID, &logs[i], false)) } testGetOpLog := func(t *testing.T, height uint64, m Move) { @@ -537,7 +537,7 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) { checkExists(t, false, cid, treeID) }) - require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1})) + require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false)) checkExists(t, true, cid, treeID) checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree checkExists(t, false, cid, "another tree") // same CID, different tree @@ -573,7 +573,7 @@ func TestApplyTricky1(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) { s := providers[i].construct(t) for i := range ops { - require.NoError(t, s.TreeApply(d, treeID, &ops[i])) + require.NoError(t, s.TreeApply(d, treeID, &ops[i], false)) } for i := range expected { @@ -634,7 +634,7 @@ func TestApplyTricky2(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) { s := providers[i].construct(t) for i := range ops { - require.NoError(t, s.TreeApply(d, treeID, &ops[i])) + require.NoError(t, s.TreeApply(d, treeID, &ops[i], false)) } for i := range expected { @@ -702,7 +702,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore rand.Read(ops[i].Meta.Items[1].Value) } for i := range ops { - require.NoError(t, expected.TreeApply(d, treeID, &ops[i])) + require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) } for i := 0; i < iterCount; i++ { @@ -711,7 +711,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore actual := constructor(t) for i := range ops { - require.NoError(t, actual.TreeApply(d, treeID, &ops[i])) + require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false)) } for i := uint64(0); i < nodeCount; i++ { expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i) @@ -821,7 +821,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { op := <-ch - if err := s.TreeApply(d, treeID, op); err != nil { + if err := s.TreeApply(d, treeID, op, false); err != nil { b.Fatalf("error in `Apply`: %v", err) } } @@ -904,7 +904,7 @@ func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor Time: uint64(ts), Items: items, }, - })) + }, false)) } func TestGetTrees(t *testing.T) { diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 3e1c88f5b9..6ddfb848d0 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -17,7 +17,8 @@ type Forest interface { // Internal nodes in path should have exactly one attribute, otherwise a new node is created. TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) // TreeApply applies replicated operation from another node. - TreeApply(d CIDDescriptor, treeID string, m *Move) error + // If background is true, TreeApply will first check whether an operation exists. + TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index f45a06d919..12600fcdad 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri } // TreeApply implements the pilorama.Forest interface. -func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error { +func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error { if s.pilorama == nil { return ErrPiloramaDisabled } @@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M if s.info.Mode.ReadOnly() { return ErrReadOnlyMode } - return s.pilorama.TreeApply(d, treeID, m) + return s.pilorama.TreeApply(d, treeID, m, backgroundSync) } // TreeGetByPath implements the pilorama.Forest interface. diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 260bffba5e..9a78b6e353 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -489,7 +489,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e Parent: op.GetParentId(), Child: op.GetChildId(), Meta: meta, - }) + }, false) } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 1dbd336ab5..749b730552 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -218,7 +218,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto if err := m.Meta.FromBytes(lm.Meta); err != nil { return newHeight, err } - if err := s.forest.TreeApply(d, treeID, m); err != nil { + if err := s.forest.TreeApply(d, treeID, m, true); err != nil { return newHeight, err } if m.Time > newHeight {