From 4437cd71138c0aabdb30e8d83d9d1d1980daa767 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 27 May 2022 15:55:02 +0300 Subject: [PATCH] [#1442] pilorama: Generate timestamp based on node position in the container Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/tree.go | 24 +-- pkg/local_object_storage/engine/tree_test.go | 3 +- pkg/local_object_storage/pilorama/boltdb.go | 42 ++-- pkg/local_object_storage/pilorama/forest.go | 38 +++- .../pilorama/forest_test.go | 86 +++++--- pkg/local_object_storage/pilorama/inmemory.go | 6 +- .../pilorama/interface.go | 28 ++- pkg/local_object_storage/pilorama/util.go | 11 + .../pilorama/util_test.go | 38 ++++ pkg/local_object_storage/shard/tree.go | 12 +- pkg/services/tree/redirect.go | 15 +- pkg/services/tree/service.go | 196 ++++++++++-------- pkg/services/tree/sync.go | 3 +- 13 files changed, 333 insertions(+), 169 deletions(-) create mode 100644 pkg/local_object_storage/pilorama/util.go create mode 100644 pkg/local_object_storage/pilorama/util_test.go diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index a800b75b6..43ac27f5a 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -9,14 +9,14 @@ import ( var _ pilorama.Forest = (*StorageEngine)(nil) // TreeMove implements the pilorama.Forest interface. -func (e *StorageEngine) TreeMove(cid cidSDK.ID, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) { +func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) { var err error var lm *pilorama.LogMove - for _, sh := range e.sortShardsByWeight(cid) { - lm, err = sh.TreeMove(cid, treeID, m) + for _, sh := range e.sortShardsByWeight(d.CID) { + lm, err = sh.TreeMove(d, treeID, m) if err != nil { e.log.Debug("can't put node in a tree", - zap.Stringer("cid", cid), + zap.Stringer("cid", d.CID), zap.String("tree", treeID), zap.String("err", err.Error())) continue @@ -27,14 +27,14 @@ func (e *StorageEngine) TreeMove(cid cidSDK.ID, treeID string, m *pilorama.Move) } // TreeAddByPath implements the pilorama.Forest interface. -func (e *StorageEngine) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) { +func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) { var err error var lm []pilorama.LogMove - for _, sh := range e.sortShardsByWeight(cid) { - lm, err = sh.TreeAddByPath(cid, treeID, attr, path, m) + for _, sh := range e.sortShardsByWeight(d.CID) { + lm, err = sh.TreeAddByPath(d, treeID, attr, path, m) if err != nil { e.log.Debug("can't put node in a tree", - zap.Stringer("cid", cid), + zap.Stringer("cid", d.CID), zap.String("tree", treeID), zap.String("err", err.Error())) continue @@ -45,13 +45,13 @@ func (e *StorageEngine) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, } // TreeApply implements the pilorama.Forest interface. -func (e *StorageEngine) TreeApply(cid cidSDK.ID, treeID string, m *pilorama.Move) error { +func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error { var err error - for _, sh := range e.sortShardsByWeight(cid) { - err = sh.TreeApply(cid, treeID, m) + for _, sh := range e.sortShardsByWeight(d.CID) { + err = sh.TreeApply(d, treeID, m) if err != nil { e.log.Debug("can't put node in a tree", - zap.Stringer("cid", cid), + zap.Stringer("cid", d.CID), zap.String("tree", treeID), zap.String("err", err.Error())) continue diff --git a/pkg/local_object_storage/engine/tree_test.go b/pkg/local_object_storage/engine/tree_test.go index ef79deb8e..1c5b569ea 100644 --- a/pkg/local_object_storage/engine/tree_test.go +++ b/pkg/local_object_storage/engine/tree_test.go @@ -24,6 +24,7 @@ func BenchmarkTreeVsSearch(b *testing.B) { func benchmarkTreeVsSearch(b *testing.B, objCount int) { e, _, _ := newEngineWithErrorThreshold(b, "", 0) cid := cidtest.ID() + d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1} treeID := "someTree" for i := 0; i < objCount; i++ { @@ -33,7 +34,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { if err != nil { b.Fatal(err) } - _, err = e.TreeAddByPath(cid, treeID, pilorama.AttributeFilename, nil, + _, err = e.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil, []pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}}) if err != nil { b.Fatal(err) diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 687dcaa7a..e1444aab8 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -74,15 +74,19 @@ func (t *boltForest) Open() error { func (t *boltForest) Close() error { return t.db.Close() } // TreeMove implements the Forest interface. -func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) { +func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) { + if !d.checkValid() { + return nil, ErrInvalidCIDDescriptor + } + var lm *LogMove return lm, t.db.Batch(func(tx *bbolt.Tx) error { - bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) + bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) if err != nil { return err } - m.Time = t.getLatestTimestamp(bLog) + m.Time = t.getLatestTimestamp(bLog, d.Position, d.Size) if m.Child == RootID { m.Child = t.findSpareID(bTree) } @@ -92,7 +96,10 @@ func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, } // TreeAddByPath implements the Forest interface. -func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) { +func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) { + if !d.checkValid() { + return nil, ErrInvalidCIDDescriptor + } if !isAttributeInternal(attr) { return nil, ErrNotPathAttribute } @@ -101,7 +108,7 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa var key [17]byte err := t.db.Batch(func(tx *bbolt.Tx) error { - bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) + bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) if err != nil { return err } @@ -111,12 +118,13 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa return err } + ts := t.getLatestTimestamp(bLog, d.Position, d.Size) lm = make([]LogMove, len(path)-i+1) for j := i; j < len(path); j++ { lm[j-i].Move = Move{ Parent: node, Meta: Meta{ - Time: t.getLatestTimestamp(bLog), + Time: ts, Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}, }, Child: t.findSpareID(bTree), @@ -127,13 +135,14 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa return err } + ts = nextTimestamp(ts, uint64(d.Position), uint64(d.Size)) node = lm[j-i].Child } lm[len(lm)-1].Move = Move{ Parent: node, Meta: Meta{ - Time: t.getLatestTimestamp(bLog), + Time: ts, Items: meta, }, Child: t.findSpareID(bTree), @@ -145,14 +154,15 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa // getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than // all timestamps corresponding to already stored operations. -// FIXME timestamp should be based on a node position in the container. -func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket) uint64 { +func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint64 { + var ts uint64 + c := bLog.Cursor() key, _ := c.Last() - if len(key) == 0 { - return 1 + if len(key) != 0 { + ts = binary.BigEndian.Uint64(key) } - return binary.BigEndian.Uint64(key) + 1 + return nextTimestamp(ts, uint64(pos), uint64(size)) } // findSpareID returns random unused ID. @@ -173,9 +183,13 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { } // TreeApply implements the Forest interface. -func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error { +func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error { + if !d.checkValid() { + return ErrInvalidCIDDescriptor + } + return t.db.Batch(func(tx *bbolt.Tx) error { - bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID) + bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) if err != nil { return err } diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 21d143aa9..d5f674ae4 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -23,15 +23,19 @@ func NewMemoryForest() ForestStorage { } // TreeMove implements the Forest interface. -func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMove, error) { - fullID := cid.String() + "/" + treeID +func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogMove, error) { + if !d.checkValid() { + return nil, ErrInvalidCIDDescriptor + } + + fullID := d.CID.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { s = newState() f.treeMap[fullID] = s } - op.Time = s.timestamp() + op.Time = s.timestamp(d.Position, d.Size) if op.Child == RootID { op.Child = s.findSpareID() } @@ -42,12 +46,15 @@ func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMov } // TreeAddByPath implements the Forest interface. -func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) { +func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) { + if !d.checkValid() { + return nil, ErrInvalidCIDDescriptor + } if !isAttributeInternal(attr) { return nil, ErrNotPathAttribute } - fullID := cid.String() + "/" + treeID + fullID := d.CID.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { s = newState() @@ -59,8 +66,10 @@ func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, for j := i; j < len(path); j++ { lm[j-i] = s.do(&Move{ Parent: node, - Meta: Meta{Time: s.timestamp(), Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}}, - Child: s.findSpareID(), + Meta: Meta{ + Time: s.timestamp(d.Position, d.Size), + Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}}, + Child: s.findSpareID(), }) node = lm[j-i].Child s.operations = append(s.operations, lm[j-i]) @@ -70,15 +79,22 @@ func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, copy(mCopy, m) lm[len(lm)-1] = s.do(&Move{ Parent: node, - Meta: Meta{Time: s.timestamp(), Items: mCopy}, - Child: s.findSpareID(), + Meta: Meta{ + Time: s.timestamp(d.Position, d.Size), + Items: mCopy, + }, + Child: s.findSpareID(), }) return lm, nil } // TreeApply implements the Forest interface. -func (f *memoryForest) TreeApply(cid cidSDK.ID, treeID string, op *Move) error { - fullID := cid.String() + "/" + treeID +func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error { + if !d.checkValid() { + return ErrInvalidCIDDescriptor + } + + fullID := d.CID.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { s = newState() diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index fe1ce52d4..20ca79399 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -59,18 +59,27 @@ func TestForest_TreeMove(t *testing.T) { func testForestTreeMove(t *testing.T, s Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" meta := []KeyValue{ {Key: AttributeVersion, Value: []byte("XXX")}, {Key: AttributeFilename, Value: []byte("file.txt")}} - lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) + lm, err := s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "to"}, meta) require.NoError(t, err) require.Equal(t, 3, len(lm)) nodeID := lm[2].Child + t.Run("invalid descriptor", func(t *testing.T) { + _, err = s.TreeMove(CIDDescriptor{cid, 0, 0}, treeID, &Move{ + Parent: lm[1].Child, + Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, + Child: nodeID, + }) + require.ErrorIs(t, err, ErrInvalidCIDDescriptor) + }) t.Run("same parent, update meta", func(t *testing.T) { - _, err = s.TreeMove(cid, treeID, &Move{ + _, err = s.TreeMove(d, treeID, &Move{ Parent: lm[1].Child, Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, Child: nodeID, @@ -82,7 +91,7 @@ func testForestTreeMove(t *testing.T, s Forest) { require.ElementsMatch(t, []Node{nodeID}, nodes) }) t.Run("different parent", func(t *testing.T) { - _, err = s.TreeMove(cid, treeID, &Move{ + _, err = s.TreeMove(d, treeID, &Move{ Parent: RootID, Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})}, Child: nodeID, @@ -109,10 +118,11 @@ func TestMemoryForest_TreeGetChildren(t *testing.T) { func testForestTreeGetChildren(t *testing.T, s Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" treeAdd := func(t *testing.T, child, parent Node) { - _, err := s.TreeMove(cid, treeID, &Move{ + _, err := s.TreeMove(d, treeID, &Move{ Parent: parent, Child: child, }) @@ -165,16 +175,24 @@ func TestForest_TreeAdd(t *testing.T) { func testForestTreeAdd(t *testing.T, s Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" meta := []KeyValue{ {Key: AttributeVersion, Value: []byte("XXX")}, {Key: AttributeFilename, Value: []byte("file.txt")}} - lm, err := s.TreeMove(cid, treeID, &Move{ + m := &Move{ Parent: RootID, Child: RootID, Meta: Meta{Items: meta}, + } + + t.Run("invalid descriptor", func(t *testing.T) { + _, err := s.TreeMove(CIDDescriptor{cid, 0, 0}, treeID, m) + require.ErrorIs(t, err, ErrInvalidCIDDescriptor) }) + + lm, err := s.TreeMove(d, treeID, m) require.NoError(t, err) testMeta(t, s, cid, treeID, lm.Child, lm.Parent, Meta{Time: lm.Time, Items: meta}) @@ -202,18 +220,23 @@ func TestForest_TreeAddByPath(t *testing.T) { func testForestTreeAddByPath(t *testing.T, s Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" meta := []KeyValue{ {Key: AttributeVersion, Value: []byte("XXX")}, {Key: AttributeFilename, Value: []byte("file.txt")}} + t.Run("invalid descriptor", func(t *testing.T) { + _, err := s.TreeAddByPath(CIDDescriptor{cid, 0, 0}, treeID, AttributeFilename, []string{"yyy"}, meta) + require.ErrorIs(t, err, ErrInvalidCIDDescriptor) + }) t.Run("invalid attribute", func(t *testing.T) { - _, err := s.TreeAddByPath(cid, treeID, AttributeVersion, []string{"yyy"}, meta) + _, err := s.TreeAddByPath(d, treeID, AttributeVersion, []string{"yyy"}, meta) require.ErrorIs(t, err, ErrNotPathAttribute) }) - lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) + lm, err := s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "to"}, meta) require.NoError(t, err) require.Equal(t, 3, len(lm)) testMeta(t, s, cid, treeID, lm[0].Child, lm[0].Parent, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("path")}}}) @@ -223,7 +246,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) { testMeta(t, s, cid, treeID, firstID, lm[2].Parent, Meta{Time: lm[2].Time, Items: meta}) meta[0].Value = []byte("YYY") - lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta) + lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "to"}, meta) require.NoError(t, err) require.Equal(t, 1, len(lm)) @@ -244,7 +267,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) { meta[0].Value = []byte("ZZZ") meta[1].Value = []byte("cat.jpg") - lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "dir"}, meta) + lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "dir"}, meta) require.NoError(t, err) require.Equal(t, 2, len(lm)) testMeta(t, s, cid, treeID, lm[0].Child, lm[0].Parent, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("dir")}}}) @@ -253,7 +276,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) { t.Run("create internal nodes", func(t *testing.T) { meta[0].Value = []byte("SomeValue") meta[1].Value = []byte("another") - lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path"}, meta) + lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path"}, meta) require.NoError(t, err) require.Equal(t, 1, len(lm)) @@ -261,7 +284,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) { meta[0].Value = []byte("Leaf") meta[1].Value = []byte("file.txt") - lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "another"}, meta) + lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "another"}, meta) require.NoError(t, err) require.Equal(t, 2, len(lm)) @@ -299,10 +322,21 @@ func TestForest_Apply(t *testing.T) { func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" + t.Run("invalid descriptor", func(t *testing.T) { + s := constructor(t) + err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{ + Child: 10, + Parent: 0, + Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}}, + }) + require.ErrorIs(t, err, ErrInvalidCIDDescriptor) + }) + testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) { - require.NoError(t, s.TreeApply(cid, treeID, &Move{ + require.NoError(t, s.TreeApply(d, treeID, &Move{ Child: child, Parent: parent, Meta: meta, @@ -341,6 +375,7 @@ func TestForest_GetOpLog(t *testing.T) { func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" logs := []Move{ { @@ -366,7 +401,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) }) for i := range logs { - require.NoError(t, s.TreeApply(cid, treeID, &logs[i])) + require.NoError(t, s.TreeApply(d, treeID, &logs[i])) } testGetOpLog := func(t *testing.T, height uint64, m Move) { @@ -407,6 +442,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore ) cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" expected := constructor(t) @@ -444,7 +480,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore rand.Read(ops[i].Meta.Items[1].Value) } for i := range ops { - require.NoError(t, expected.TreeApply(cid, treeID, &ops[i])) + require.NoError(t, expected.TreeApply(d, treeID, &ops[i])) } for i := 0; i < iterCount; i++ { @@ -453,7 +489,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore actual := constructor(t) for i := range ops { - require.NoError(t, actual.TreeApply(cid, treeID, &ops[i])) + require.NoError(t, actual.TreeApply(d, treeID, &ops[i])) } for i := uint64(0); i < nodeCount; i++ { expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i) @@ -534,6 +570,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { ops := genFunc(b.N) cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" ch := make(chan *Move, b.N) for i := range ops { @@ -546,7 +583,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { op := <-ch - if err := s.TreeApply(cid, treeID, op); err != nil { + if err := s.TreeApply(d, treeID, op); err != nil { b.Fatalf("error in `Apply`: %v", err) } } @@ -563,6 +600,7 @@ func TestTreeGetByPath(t *testing.T) { func testTreeGetByPath(t *testing.T, s Forest) { cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} treeID := "version" // / @@ -572,12 +610,12 @@ func testTreeGetByPath(t *testing.T, s Forest) { // |- cat1.jpg, Version=XXX (4) // |- cat1.jpg, Version=YYY (5) // |- cat2.jpg, Version=ZZZ (6) - testMove(t, s, 0, 1, 0, cid, treeID, "a", "") - testMove(t, s, 1, 2, 0, cid, treeID, "b", "") - testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT") - testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX") - testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY") - testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ") + testMove(t, s, 0, 1, 0, d, treeID, "a", "") + testMove(t, s, 1, 2, 0, d, treeID, "b", "") + testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT") + testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX") + testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY") + testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ") if mf, ok := s.(*memoryForest); ok { single := mf.treeMap[cid.String()+"/"+treeID] @@ -614,14 +652,14 @@ func testTreeGetByPath(t *testing.T, s Forest) { }) } -func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) { +func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) { items := make([]KeyValue, 1, 2) items[0] = KeyValue{AttributeFilename, []byte(filename)} if version != "" { items = append(items, KeyValue{AttributeVersion, []byte(version)}) } - require.NoError(t, s.TreeApply(cid, treeID, &Move{ + require.NoError(t, s.TreeApply(d, treeID, &Move{ Parent: parent, Child: node, Meta: Meta{ diff --git a/pkg/local_object_storage/pilorama/inmemory.go b/pkg/local_object_storage/pilorama/inmemory.go index 505cca26d..c15de658b 100644 --- a/pkg/local_object_storage/pilorama/inmemory.go +++ b/pkg/local_object_storage/pilorama/inmemory.go @@ -133,11 +133,11 @@ func (s *state) removeChild(child, parent Node) { } } -func (s *state) timestamp() Timestamp { +func (s *state) timestamp(pos, size int) Timestamp { if len(s.operations) == 0 { - return 0 + return nextTimestamp(0, uint64(pos), uint64(size)) } - return s.operations[len(s.operations)-1].Time + 1 + return nextTimestamp(s.operations[len(s.operations)-1].Time, uint64(pos), uint64(size)) } func (s *state) findSpareID() Node { diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 9dc9ca7dc..92c47b6af 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -1,19 +1,23 @@ package pilorama -import cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" +import ( + "errors" + + cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" +) // Forest represents CRDT tree. type Forest interface { // TreeMove moves node in the tree. // If the parent of the move operation is TrashID, the node is removed. // If the child of the move operation is RootID, new ID is generated and added to a tree. - TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) + TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) // TreeAddByPath adds new node in the tree using provided path. // The path is constructed by descending from the root using the values of the attr in meta. // Internal nodes in path should have exactly one attribute, otherwise a new node is created. - TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) + TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) // TreeApply applies replicated operation from another node. - TreeApply(cid cidSDK.ID, treeID string, m *Move) error + TreeApply(d CIDDescriptor, treeID string, m *Move) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. @@ -42,3 +46,19 @@ const ( AttributeFilename = "FileName" AttributeVersion = "Version" ) + +// CIDDescriptor contains container ID and information about the node position +// in the list of container nodes. +type CIDDescriptor struct { + CID cidSDK.ID + Position int + Size int +} + +// ErrInvalidCIDDescriptor is returned when info about tne node position +// in the container is invalid. +var ErrInvalidCIDDescriptor = errors.New("cid descriptor is invalid") + +func (d CIDDescriptor) checkValid() bool { + return 0 <= d.Position && d.Position < d.Size +} diff --git a/pkg/local_object_storage/pilorama/util.go b/pkg/local_object_storage/pilorama/util.go new file mode 100644 index 000000000..53b7e1d50 --- /dev/null +++ b/pkg/local_object_storage/pilorama/util.go @@ -0,0 +1,11 @@ +package pilorama + +// nextTimestamp accepts the latest local timestamp, node position in a container and container size. +// Returns the next timestamp which can be generated by this node. +func nextTimestamp(ts Timestamp, pos, size uint64) Timestamp { + base := ts/size*size + pos + if ts < base { + return base + } + return base + size +} diff --git a/pkg/local_object_storage/pilorama/util_test.go b/pkg/local_object_storage/pilorama/util_test.go new file mode 100644 index 000000000..bfa141c70 --- /dev/null +++ b/pkg/local_object_storage/pilorama/util_test.go @@ -0,0 +1,38 @@ +package pilorama + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNextTimestamp(t *testing.T) { + testCases := []struct { + latest Timestamp + pos, size uint64 + expected Timestamp + }{ + {0, 0, 1, 1}, + {2, 0, 1, 3}, + {0, 0, 2, 2}, + {0, 1, 2, 1}, + {10, 0, 4, 12}, + {11, 0, 4, 12}, + {12, 0, 4, 16}, + {10, 1, 4, 13}, + {11, 1, 4, 13}, + {12, 1, 4, 13}, + {10, 2, 4, 14}, + {11, 2, 4, 14}, + {12, 2, 4, 14}, + {10, 3, 4, 11}, + {11, 3, 4, 15}, + {12, 3, 4, 15}, + } + + for _, tc := range testCases { + actual := nextTimestamp(tc.latest, tc.pos, tc.size) + require.Equal(t, tc.expected, actual, + "latest %d, pos %d, size %d", tc.latest, tc.pos, tc.size) + } +} diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index 36e62220a..fae5e5341 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -8,18 +8,18 @@ import ( var _ pilorama.Forest = (*Shard)(nil) // TreeMove implements the pilorama.Forest interface. -func (s *Shard) TreeMove(cid cidSDK.ID, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) { - return s.pilorama.TreeMove(cid, treeID, m) +func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) { + return s.pilorama.TreeMove(d, treeID, m) } // TreeAddByPath implements the pilorama.Forest interface. -func (s *Shard) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) { - return s.pilorama.TreeAddByPath(cid, treeID, attr, path, meta) +func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) { + return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta) } // TreeApply implements the pilorama.Forest interface. -func (s *Shard) TreeApply(cid cidSDK.ID, treeID string, m *pilorama.Move) error { - return s.pilorama.TreeApply(cid, treeID, m) +func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error { + return s.pilorama.TreeApply(d, treeID, m) } // TreeGetByPath implements the pilorama.Forest interface. diff --git a/pkg/services/tree/redirect.go b/pkg/services/tree/redirect.go index 95cbccff9..304ed0d7d 100644 --- a/pkg/services/tree/redirect.go +++ b/pkg/services/tree/redirect.go @@ -4,25 +4,18 @@ import ( "bytes" "context" "errors" - "fmt" "github.com/nspcc-dev/neofs-node/pkg/network" - cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" "google.golang.org/grpc" ) var errNoSuitableNode = errors.New("no node was found to execute the request") -// forEachNode executes callback for each node in the container. -// If the node belongs to a container, nil error is returned. -// Otherwise, f is executed for each node, stopping if true is returned. -func (s *Service) forEachNode(ctx context.Context, cid cidSDK.ID, f func(c TreeServiceClient) bool) error { - cntNodes, err := s.getContainerNodes(cid) - if err != nil { - return fmt.Errorf("can't get container nodes for %s: %w", cid, err) - } - +// forEachNode executes callback for each node in the container until true is returned. +// Returns errNoSuitableNode if there was no successful attempt to dial any node. +func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error { for _, n := range cntNodes { if bytes.Equal(n.PublicKey(), s.rawPub) { return nil diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 835b863b8..b0d86ce10 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -9,6 +9,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" + netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -66,19 +67,22 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error return nil, err } - var resp *AddResponse - var outErr error - err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { - resp, outErr = c.Add(ctx, req) - return true - }) - if err != nil { - return nil, err - } else if resp != nil || outErr != nil { + ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var resp *AddResponse + var outErr error + err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { + resp, outErr = c.Add(ctx, req) + return true + }) + if err != nil { + return nil, err + } return resp, outErr } - log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{ + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: pilorama.RootID, Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())}, @@ -108,15 +112,17 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP return nil, err } - var resp *AddByPathResponse - var outErr error - err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { - resp, outErr = c.AddByPath(ctx, req) - return true - }) - if err != nil { - return nil, err - } else if resp != nil || outErr != nil { + ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var resp *AddByPathResponse + var outErr error + err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { + resp, outErr = c.AddByPath(ctx, req) + return true + }) + if err != nil { + return nil, err + } return resp, outErr } @@ -127,7 +133,8 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP attr = pilorama.AttributeFilename } - logs, err := s.forest.TreeAddByPath(cid, b.GetTreeId(), attr, b.GetPath(), meta) + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + logs, err := s.forest.TreeAddByPath(d, b.GetTreeId(), attr, b.GetPath(), meta) if err != nil { return nil, err } @@ -162,15 +169,17 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon return nil, err } - var resp *RemoveResponse - var outErr error - err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { - resp, outErr = c.Remove(ctx, req) - return true - }) - if err != nil { - return nil, err - } else if resp != nil || outErr != nil { + ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var resp *RemoveResponse + var outErr error + err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { + resp, outErr = c.Remove(ctx, req) + return true + }) + if err != nil { + return nil, err + } return resp, outErr } @@ -178,7 +187,8 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId()) } - log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{ + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{ Parent: pilorama.TrashID, Child: b.GetNodeId(), }) @@ -205,15 +215,17 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er return nil, err } - var resp *MoveResponse - var outErr error - err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { - resp, outErr = c.Move(ctx, req) - return true - }) - if err != nil { - return nil, err - } else if resp != nil || outErr != nil { + ns, pos, size, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var resp *MoveResponse + var outErr error + err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { + resp, outErr = c.Move(ctx, req) + return true + }) + if err != nil { + return nil, err + } return resp, outErr } @@ -221,7 +233,8 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId()) } - log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{ + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: b.GetNodeId(), Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())}, @@ -247,15 +260,17 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) return nil, err } - var resp *GetNodeByPathResponse - var outErr error - err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { - resp, outErr = c.GetNodeByPath(ctx, req) - return true - }) - if err != nil { - return nil, err - } else if resp != nil || outErr != nil { + ns, _, _, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var resp *GetNodeByPathResponse + var outErr error + err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool { + resp, outErr = c.GetNodeByPath(ctx, req) + return true + }) + if err != nil { + return nil, err + } return resp, outErr } @@ -325,22 +340,25 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS return err } - var cli TreeService_GetSubTreeClient - var outErr error - err = s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool { - cli, outErr = c.GetSubTree(srv.Context(), req) - return true - }) - if err != nil { - return err - } else if outErr != nil { - return outErr - } else if cli != nil { + ns, _, _, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var cli TreeService_GetSubTreeClient + var outErr error + err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool { + cli, outErr = c.GetSubTree(srv.Context(), req) + return true + }) + if err != nil { + return err + } else if outErr != nil { + return outErr + } for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() { if err := srv.Send(resp); err != nil { return err } } + return nil } queue := []nodeDepthPair{{[]uint64{b.GetRootId()}, 0}} @@ -391,18 +409,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e return nil, err } - found := false key := req.GetSignature().GetKey() - nodes, _ := s.getContainerNodes(cid) -loop: - for _, n := range nodes { - if bytes.Equal(key, n.PublicKey()) { - found = true - break loop - } - } - if !found { + _, pos, size, err := s.getContainerInfo(cid, key) + if err == errNotInContainer { return nil, errors.New("`Apply` request must be signed by a container node") } @@ -413,7 +423,8 @@ loop: return nil, fmt.Errorf("can't parse meta-information: %w", err) } - return nil, s.forest.TreeApply(cid, req.GetBody().GetTreeId(), &pilorama.Move{ + d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} + return nil, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ Parent: op.GetParentId(), Child: op.GetChildId(), Meta: meta, @@ -428,22 +439,25 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) return err } - var cli TreeService_GetOpLogClient - var outErr error - err := s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool { - cli, outErr = c.GetOpLog(srv.Context(), req) - return true - }) - if err != nil { - return err - } else if outErr != nil { - return outErr - } else if cli != nil { + ns, _, _, err := s.getContainerInfo(cid, s.rawPub) + if err == errNotInContainer { + var cli TreeService_GetOpLogClient + var outErr error + err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool { + cli, outErr = c.GetOpLog(srv.Context(), req) + return true + }) + if err != nil { + return err + } else if outErr != nil { + return outErr + } for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() { if err := srv.Send(resp); err != nil { return err } } + return nil } h := b.GetHeight() @@ -491,3 +505,21 @@ func metaToProto(arr []pilorama.KeyValue) []*KeyValue { } return meta } + +var errNotInContainer = errors.New("node doesn't belong to a container") + +// getContainerInfo returns the list of container nodes, position in the container for the node +// with pub key and total amount of nodes in all replicas. +func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) { + cntNodes, err := s.getContainerNodes(cid) + if err != nil { + return nil, 0, 0, err + } + + for i, node := range cntNodes { + if bytes.Equal(node.PublicKey(), pub) { + return cntNodes, i, len(cntNodes), nil + } + } + return nil, 0, 0, errNotInContainer +} diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 5824c3f49..663c87385 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -88,7 +88,8 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri if err := m.Meta.FromBytes(lm.Meta); err != nil { return newHeight, err } - if err := s.forest.TreeApply(cid, treeID, m); err != nil { + d := pilorama.CIDDescriptor{CID: cid} + if err := s.forest.TreeApply(d, treeID, m); err != nil { return newHeight, err } if m.Time > newHeight {