[#1442] pilorama: Generate timestamp based on node position in the container
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
3caa982283
commit
4437cd7113
13 changed files with 333 additions and 169 deletions
|
@ -9,14 +9,14 @@ import (
|
|||
var _ pilorama.Forest = (*StorageEngine)(nil)
|
||||
|
||||
// TreeMove implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeMove(cid cidSDK.ID, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
|
||||
func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
|
||||
var err error
|
||||
var lm *pilorama.LogMove
|
||||
for _, sh := range e.sortShardsByWeight(cid) {
|
||||
lm, err = sh.TreeMove(cid, treeID, m)
|
||||
for _, sh := range e.sortShardsByWeight(d.CID) {
|
||||
lm, err = sh.TreeMove(d, treeID, m)
|
||||
if err != nil {
|
||||
e.log.Debug("can't put node in a tree",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.String("tree", treeID),
|
||||
zap.String("err", err.Error()))
|
||||
continue
|
||||
|
@ -27,14 +27,14 @@ func (e *StorageEngine) TreeMove(cid cidSDK.ID, treeID string, m *pilorama.Move)
|
|||
}
|
||||
|
||||
// TreeAddByPath implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) {
|
||||
func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) {
|
||||
var err error
|
||||
var lm []pilorama.LogMove
|
||||
for _, sh := range e.sortShardsByWeight(cid) {
|
||||
lm, err = sh.TreeAddByPath(cid, treeID, attr, path, m)
|
||||
for _, sh := range e.sortShardsByWeight(d.CID) {
|
||||
lm, err = sh.TreeAddByPath(d, treeID, attr, path, m)
|
||||
if err != nil {
|
||||
e.log.Debug("can't put node in a tree",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.String("tree", treeID),
|
||||
zap.String("err", err.Error()))
|
||||
continue
|
||||
|
@ -45,13 +45,13 @@ func (e *StorageEngine) TreeAddByPath(cid cidSDK.ID, treeID string, attr string,
|
|||
}
|
||||
|
||||
// TreeApply implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeApply(cid cidSDK.ID, treeID string, m *pilorama.Move) error {
|
||||
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error {
|
||||
var err error
|
||||
for _, sh := range e.sortShardsByWeight(cid) {
|
||||
err = sh.TreeApply(cid, treeID, m)
|
||||
for _, sh := range e.sortShardsByWeight(d.CID) {
|
||||
err = sh.TreeApply(d, treeID, m)
|
||||
if err != nil {
|
||||
e.log.Debug("can't put node in a tree",
|
||||
zap.Stringer("cid", cid),
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.String("tree", treeID),
|
||||
zap.String("err", err.Error()))
|
||||
continue
|
||||
|
|
|
@ -24,6 +24,7 @@ func BenchmarkTreeVsSearch(b *testing.B) {
|
|||
func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||
e, _, _ := newEngineWithErrorThreshold(b, "", 0)
|
||||
cid := cidtest.ID()
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: 0, Size: 1}
|
||||
treeID := "someTree"
|
||||
|
||||
for i := 0; i < objCount; i++ {
|
||||
|
@ -33,7 +34,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
|||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
_, err = e.TreeAddByPath(cid, treeID, pilorama.AttributeFilename, nil,
|
||||
_, err = e.TreeAddByPath(d, treeID, pilorama.AttributeFilename, nil,
|
||||
[]pilorama.KeyValue{{pilorama.AttributeFilename, []byte(strconv.Itoa(i))}})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
|
|
|
@ -74,15 +74,19 @@ func (t *boltForest) Open() error {
|
|||
func (t *boltForest) Close() error { return t.db.Close() }
|
||||
|
||||
// TreeMove implements the Forest interface.
|
||||
func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error) {
|
||||
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
var lm *LogMove
|
||||
return lm, t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Time = t.getLatestTimestamp(bLog)
|
||||
m.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||
if m.Child == RootID {
|
||||
m.Child = t.findSpareID(bTree)
|
||||
}
|
||||
|
@ -92,7 +96,10 @@ func (t *boltForest) TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove,
|
|||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) {
|
||||
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
if !isAttributeInternal(attr) {
|
||||
return nil, ErrNotPathAttribute
|
||||
}
|
||||
|
@ -101,7 +108,7 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
var key [17]byte
|
||||
|
||||
err := t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -111,12 +118,13 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
return err
|
||||
}
|
||||
|
||||
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||
lm = make([]LogMove, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i].Move = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: t.getLatestTimestamp(bLog),
|
||||
Time: ts,
|
||||
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}},
|
||||
},
|
||||
Child: t.findSpareID(bTree),
|
||||
|
@ -127,13 +135,14 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
return err
|
||||
}
|
||||
|
||||
ts = nextTimestamp(ts, uint64(d.Position), uint64(d.Size))
|
||||
node = lm[j-i].Child
|
||||
}
|
||||
|
||||
lm[len(lm)-1].Move = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: t.getLatestTimestamp(bLog),
|
||||
Time: ts,
|
||||
Items: meta,
|
||||
},
|
||||
Child: t.findSpareID(bTree),
|
||||
|
@ -145,14 +154,15 @@ func (t *boltForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
|
||||
// getLatestTimestamp returns timestamp for a new operation which is guaranteed to be bigger than
|
||||
// all timestamps corresponding to already stored operations.
|
||||
// FIXME timestamp should be based on a node position in the container.
|
||||
func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket) uint64 {
|
||||
func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint64 {
|
||||
var ts uint64
|
||||
|
||||
c := bLog.Cursor()
|
||||
key, _ := c.Last()
|
||||
if len(key) == 0 {
|
||||
return 1
|
||||
if len(key) != 0 {
|
||||
ts = binary.BigEndian.Uint64(key)
|
||||
}
|
||||
return binary.BigEndian.Uint64(key) + 1
|
||||
return nextTimestamp(ts, uint64(pos), uint64(size))
|
||||
}
|
||||
|
||||
// findSpareID returns random unused ID.
|
||||
|
@ -173,9 +183,13 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
|||
}
|
||||
|
||||
// TreeApply implements the Forest interface.
|
||||
func (t *boltForest) TreeApply(cid cidSDK.ID, treeID string, m *Move) error {
|
||||
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error {
|
||||
if !d.checkValid() {
|
||||
return ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
return t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, cid, treeID)
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -23,15 +23,19 @@ func NewMemoryForest() ForestStorage {
|
|||
}
|
||||
|
||||
// TreeMove implements the Forest interface.
|
||||
func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMove, error) {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogMove, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
fullID := d.CID.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
f.treeMap[fullID] = s
|
||||
}
|
||||
|
||||
op.Time = s.timestamp()
|
||||
op.Time = s.timestamp(d.Position, d.Size)
|
||||
if op.Child == RootID {
|
||||
op.Child = s.findSpareID()
|
||||
}
|
||||
|
@ -42,12 +46,15 @@ func (f *memoryForest) TreeMove(cid cidSDK.ID, treeID string, op *Move) (*LogMov
|
|||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) {
|
||||
func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
if !isAttributeInternal(attr) {
|
||||
return nil, ErrNotPathAttribute
|
||||
}
|
||||
|
||||
fullID := cid.String() + "/" + treeID
|
||||
fullID := d.CID.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
|
@ -59,8 +66,10 @@ func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string,
|
|||
for j := i; j < len(path); j++ {
|
||||
lm[j-i] = s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{Time: s.timestamp(), Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
|
||||
Child: s.findSpareID(),
|
||||
Meta: Meta{
|
||||
Time: s.timestamp(d.Position, d.Size),
|
||||
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
node = lm[j-i].Child
|
||||
s.operations = append(s.operations, lm[j-i])
|
||||
|
@ -70,15 +79,22 @@ func (f *memoryForest) TreeAddByPath(cid cidSDK.ID, treeID string, attr string,
|
|||
copy(mCopy, m)
|
||||
lm[len(lm)-1] = s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{Time: s.timestamp(), Items: mCopy},
|
||||
Child: s.findSpareID(),
|
||||
Meta: Meta{
|
||||
Time: s.timestamp(d.Position, d.Size),
|
||||
Items: mCopy,
|
||||
},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
return lm, nil
|
||||
}
|
||||
|
||||
// TreeApply implements the Forest interface.
|
||||
func (f *memoryForest) TreeApply(cid cidSDK.ID, treeID string, op *Move) error {
|
||||
fullID := cid.String() + "/" + treeID
|
||||
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error {
|
||||
if !d.checkValid() {
|
||||
return ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
fullID := d.CID.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
|
|
|
@ -59,18 +59,27 @@ func TestForest_TreeMove(t *testing.T) {
|
|||
|
||||
func testForestTreeMove(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
meta := []KeyValue{
|
||||
{Key: AttributeVersion, Value: []byte("XXX")},
|
||||
{Key: AttributeFilename, Value: []byte("file.txt")}}
|
||||
lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
lm, err := s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(lm))
|
||||
|
||||
nodeID := lm[2].Child
|
||||
t.Run("invalid descriptor", func(t *testing.T) {
|
||||
_, err = s.TreeMove(CIDDescriptor{cid, 0, 0}, treeID, &Move{
|
||||
Parent: lm[1].Child,
|
||||
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
|
||||
Child: nodeID,
|
||||
})
|
||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
||||
})
|
||||
t.Run("same parent, update meta", func(t *testing.T) {
|
||||
_, err = s.TreeMove(cid, treeID, &Move{
|
||||
_, err = s.TreeMove(d, treeID, &Move{
|
||||
Parent: lm[1].Child,
|
||||
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
|
||||
Child: nodeID,
|
||||
|
@ -82,7 +91,7 @@ func testForestTreeMove(t *testing.T, s Forest) {
|
|||
require.ElementsMatch(t, []Node{nodeID}, nodes)
|
||||
})
|
||||
t.Run("different parent", func(t *testing.T) {
|
||||
_, err = s.TreeMove(cid, treeID, &Move{
|
||||
_, err = s.TreeMove(d, treeID, &Move{
|
||||
Parent: RootID,
|
||||
Meta: Meta{Items: append(meta, KeyValue{Key: "NewKey", Value: []byte("NewValue")})},
|
||||
Child: nodeID,
|
||||
|
@ -109,10 +118,11 @@ func TestMemoryForest_TreeGetChildren(t *testing.T) {
|
|||
|
||||
func testForestTreeGetChildren(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
treeAdd := func(t *testing.T, child, parent Node) {
|
||||
_, err := s.TreeMove(cid, treeID, &Move{
|
||||
_, err := s.TreeMove(d, treeID, &Move{
|
||||
Parent: parent,
|
||||
Child: child,
|
||||
})
|
||||
|
@ -165,16 +175,24 @@ func TestForest_TreeAdd(t *testing.T) {
|
|||
|
||||
func testForestTreeAdd(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
meta := []KeyValue{
|
||||
{Key: AttributeVersion, Value: []byte("XXX")},
|
||||
{Key: AttributeFilename, Value: []byte("file.txt")}}
|
||||
lm, err := s.TreeMove(cid, treeID, &Move{
|
||||
m := &Move{
|
||||
Parent: RootID,
|
||||
Child: RootID,
|
||||
Meta: Meta{Items: meta},
|
||||
}
|
||||
|
||||
t.Run("invalid descriptor", func(t *testing.T) {
|
||||
_, err := s.TreeMove(CIDDescriptor{cid, 0, 0}, treeID, m)
|
||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
||||
})
|
||||
|
||||
lm, err := s.TreeMove(d, treeID, m)
|
||||
require.NoError(t, err)
|
||||
|
||||
testMeta(t, s, cid, treeID, lm.Child, lm.Parent, Meta{Time: lm.Time, Items: meta})
|
||||
|
@ -202,18 +220,23 @@ func TestForest_TreeAddByPath(t *testing.T) {
|
|||
|
||||
func testForestTreeAddByPath(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
meta := []KeyValue{
|
||||
{Key: AttributeVersion, Value: []byte("XXX")},
|
||||
{Key: AttributeFilename, Value: []byte("file.txt")}}
|
||||
|
||||
t.Run("invalid descriptor", func(t *testing.T) {
|
||||
_, err := s.TreeAddByPath(CIDDescriptor{cid, 0, 0}, treeID, AttributeFilename, []string{"yyy"}, meta)
|
||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
||||
})
|
||||
t.Run("invalid attribute", func(t *testing.T) {
|
||||
_, err := s.TreeAddByPath(cid, treeID, AttributeVersion, []string{"yyy"}, meta)
|
||||
_, err := s.TreeAddByPath(d, treeID, AttributeVersion, []string{"yyy"}, meta)
|
||||
require.ErrorIs(t, err, ErrNotPathAttribute)
|
||||
})
|
||||
|
||||
lm, err := s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
lm, err := s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(lm))
|
||||
testMeta(t, s, cid, treeID, lm[0].Child, lm[0].Parent, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("path")}}})
|
||||
|
@ -223,7 +246,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) {
|
|||
testMeta(t, s, cid, treeID, firstID, lm[2].Parent, Meta{Time: lm[2].Time, Items: meta})
|
||||
|
||||
meta[0].Value = []byte("YYY")
|
||||
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "to"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(lm))
|
||||
|
||||
|
@ -244,7 +267,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) {
|
|||
|
||||
meta[0].Value = []byte("ZZZ")
|
||||
meta[1].Value = []byte("cat.jpg")
|
||||
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "dir"}, meta)
|
||||
lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "dir"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(lm))
|
||||
testMeta(t, s, cid, treeID, lm[0].Child, lm[0].Parent, Meta{Time: lm[0].Time, Items: []KeyValue{{AttributeFilename, []byte("dir")}}})
|
||||
|
@ -253,7 +276,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) {
|
|||
t.Run("create internal nodes", func(t *testing.T) {
|
||||
meta[0].Value = []byte("SomeValue")
|
||||
meta[1].Value = []byte("another")
|
||||
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path"}, meta)
|
||||
lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(lm))
|
||||
|
||||
|
@ -261,7 +284,7 @@ func testForestTreeAddByPath(t *testing.T, s Forest) {
|
|||
|
||||
meta[0].Value = []byte("Leaf")
|
||||
meta[1].Value = []byte("file.txt")
|
||||
lm, err = s.TreeAddByPath(cid, treeID, AttributeFilename, []string{"path", "another"}, meta)
|
||||
lm, err = s.TreeAddByPath(d, treeID, AttributeFilename, []string{"path", "another"}, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(lm))
|
||||
|
||||
|
@ -299,10 +322,21 @@ func TestForest_Apply(t *testing.T) {
|
|||
|
||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
t.Run("invalid descriptor", func(t *testing.T) {
|
||||
s := constructor(t)
|
||||
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
|
||||
Child: 10,
|
||||
Parent: 0,
|
||||
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
|
||||
})
|
||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
||||
})
|
||||
|
||||
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
||||
Child: child,
|
||||
Parent: parent,
|
||||
Meta: meta,
|
||||
|
@ -341,6 +375,7 @@ func TestForest_GetOpLog(t *testing.T) {
|
|||
|
||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
logs := []Move{
|
||||
{
|
||||
|
@ -366,7 +401,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest)
|
|||
})
|
||||
|
||||
for i := range logs {
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &logs[i]))
|
||||
require.NoError(t, s.TreeApply(d, treeID, &logs[i]))
|
||||
}
|
||||
|
||||
testGetOpLog := func(t *testing.T, height uint64, m Move) {
|
||||
|
@ -407,6 +442,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
expected := constructor(t)
|
||||
|
||||
|
@ -444,7 +480,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
rand.Read(ops[i].Meta.Items[1].Value)
|
||||
}
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i]))
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i]))
|
||||
}
|
||||
|
||||
for i := 0; i < iterCount; i++ {
|
||||
|
@ -453,7 +489,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
|
||||
actual := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i]))
|
||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i]))
|
||||
}
|
||||
for i := uint64(0); i < nodeCount; i++ {
|
||||
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
|
||||
|
@ -534,6 +570,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
|||
|
||||
ops := genFunc(b.N)
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
ch := make(chan *Move, b.N)
|
||||
for i := range ops {
|
||||
|
@ -546,7 +583,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
|||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
op := <-ch
|
||||
if err := s.TreeApply(cid, treeID, op); err != nil {
|
||||
if err := s.TreeApply(d, treeID, op); err != nil {
|
||||
b.Fatalf("error in `Apply`: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -563,6 +600,7 @@ func TestTreeGetByPath(t *testing.T) {
|
|||
|
||||
func testTreeGetByPath(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
// /
|
||||
|
@ -572,12 +610,12 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
|||
// |- cat1.jpg, Version=XXX (4)
|
||||
// |- cat1.jpg, Version=YYY (5)
|
||||
// |- cat2.jpg, Version=ZZZ (6)
|
||||
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
|
||||
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
|
||||
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
|
||||
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
|
||||
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
|
||||
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
|
||||
testMove(t, s, 0, 1, 0, d, treeID, "a", "")
|
||||
testMove(t, s, 1, 2, 0, d, treeID, "b", "")
|
||||
testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT")
|
||||
testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX")
|
||||
testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY")
|
||||
testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ")
|
||||
|
||||
if mf, ok := s.(*memoryForest); ok {
|
||||
single := mf.treeMap[cid.String()+"/"+treeID]
|
||||
|
@ -614,14 +652,14 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
|||
})
|
||||
}
|
||||
|
||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
|
||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) {
|
||||
items := make([]KeyValue, 1, 2)
|
||||
items[0] = KeyValue{AttributeFilename, []byte(filename)}
|
||||
if version != "" {
|
||||
items = append(items, KeyValue{AttributeVersion, []byte(version)})
|
||||
}
|
||||
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
||||
Parent: parent,
|
||||
Child: node,
|
||||
Meta: Meta{
|
||||
|
|
|
@ -133,11 +133,11 @@ func (s *state) removeChild(child, parent Node) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *state) timestamp() Timestamp {
|
||||
func (s *state) timestamp(pos, size int) Timestamp {
|
||||
if len(s.operations) == 0 {
|
||||
return 0
|
||||
return nextTimestamp(0, uint64(pos), uint64(size))
|
||||
}
|
||||
return s.operations[len(s.operations)-1].Time + 1
|
||||
return nextTimestamp(s.operations[len(s.operations)-1].Time, uint64(pos), uint64(size))
|
||||
}
|
||||
|
||||
func (s *state) findSpareID() Node {
|
||||
|
|
|
@ -1,19 +1,23 @@
|
|||
package pilorama
|
||||
|
||||
import cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
import (
|
||||
"errors"
|
||||
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// Forest represents CRDT tree.
|
||||
type Forest interface {
|
||||
// TreeMove moves node in the tree.
|
||||
// If the parent of the move operation is TrashID, the node is removed.
|
||||
// If the child of the move operation is RootID, new ID is generated and added to a tree.
|
||||
TreeMove(cid cidSDK.ID, treeID string, m *Move) (*LogMove, error)
|
||||
TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error)
|
||||
// TreeAddByPath adds new node in the tree using provided path.
|
||||
// The path is constructed by descending from the root using the values of the attr in meta.
|
||||
// Internal nodes in path should have exactly one attribute, otherwise a new node is created.
|
||||
TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
|
||||
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
|
||||
// TreeApply applies replicated operation from another node.
|
||||
TreeApply(cid cidSDK.ID, treeID string, m *Move) error
|
||||
TreeApply(d CIDDescriptor, treeID string, m *Move) error
|
||||
// TreeGetByPath returns all nodes corresponding to the path.
|
||||
// The path is constructed by descending from the root using the values of the
|
||||
// AttributeFilename in meta.
|
||||
|
@ -42,3 +46,19 @@ const (
|
|||
AttributeFilename = "FileName"
|
||||
AttributeVersion = "Version"
|
||||
)
|
||||
|
||||
// CIDDescriptor contains container ID and information about the node position
|
||||
// in the list of container nodes.
|
||||
type CIDDescriptor struct {
|
||||
CID cidSDK.ID
|
||||
Position int
|
||||
Size int
|
||||
}
|
||||
|
||||
// ErrInvalidCIDDescriptor is returned when info about tne node position
|
||||
// in the container is invalid.
|
||||
var ErrInvalidCIDDescriptor = errors.New("cid descriptor is invalid")
|
||||
|
||||
func (d CIDDescriptor) checkValid() bool {
|
||||
return 0 <= d.Position && d.Position < d.Size
|
||||
}
|
||||
|
|
11
pkg/local_object_storage/pilorama/util.go
Normal file
11
pkg/local_object_storage/pilorama/util.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package pilorama
|
||||
|
||||
// nextTimestamp accepts the latest local timestamp, node position in a container and container size.
|
||||
// Returns the next timestamp which can be generated by this node.
|
||||
func nextTimestamp(ts Timestamp, pos, size uint64) Timestamp {
|
||||
base := ts/size*size + pos
|
||||
if ts < base {
|
||||
return base
|
||||
}
|
||||
return base + size
|
||||
}
|
38
pkg/local_object_storage/pilorama/util_test.go
Normal file
38
pkg/local_object_storage/pilorama/util_test.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNextTimestamp(t *testing.T) {
|
||||
testCases := []struct {
|
||||
latest Timestamp
|
||||
pos, size uint64
|
||||
expected Timestamp
|
||||
}{
|
||||
{0, 0, 1, 1},
|
||||
{2, 0, 1, 3},
|
||||
{0, 0, 2, 2},
|
||||
{0, 1, 2, 1},
|
||||
{10, 0, 4, 12},
|
||||
{11, 0, 4, 12},
|
||||
{12, 0, 4, 16},
|
||||
{10, 1, 4, 13},
|
||||
{11, 1, 4, 13},
|
||||
{12, 1, 4, 13},
|
||||
{10, 2, 4, 14},
|
||||
{11, 2, 4, 14},
|
||||
{12, 2, 4, 14},
|
||||
{10, 3, 4, 11},
|
||||
{11, 3, 4, 15},
|
||||
{12, 3, 4, 15},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
actual := nextTimestamp(tc.latest, tc.pos, tc.size)
|
||||
require.Equal(t, tc.expected, actual,
|
||||
"latest %d, pos %d, size %d", tc.latest, tc.pos, tc.size)
|
||||
}
|
||||
}
|
|
@ -8,18 +8,18 @@ import (
|
|||
var _ pilorama.Forest = (*Shard)(nil)
|
||||
|
||||
// TreeMove implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeMove(cid cidSDK.ID, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
|
||||
return s.pilorama.TreeMove(cid, treeID, m)
|
||||
func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
|
||||
return s.pilorama.TreeMove(d, treeID, m)
|
||||
}
|
||||
|
||||
// TreeAddByPath implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeAddByPath(cid cidSDK.ID, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) {
|
||||
return s.pilorama.TreeAddByPath(cid, treeID, attr, path, meta)
|
||||
func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) {
|
||||
return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta)
|
||||
}
|
||||
|
||||
// TreeApply implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeApply(cid cidSDK.ID, treeID string, m *pilorama.Move) error {
|
||||
return s.pilorama.TreeApply(cid, treeID, m)
|
||||
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error {
|
||||
return s.pilorama.TreeApply(d, treeID, m)
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the pilorama.Forest interface.
|
||||
|
|
|
@ -4,25 +4,18 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
||||
|
||||
// forEachNode executes callback for each node in the container.
|
||||
// If the node belongs to a container, nil error is returned.
|
||||
// Otherwise, f is executed for each node, stopping if true is returned.
|
||||
func (s *Service) forEachNode(ctx context.Context, cid cidSDK.ID, f func(c TreeServiceClient) bool) error {
|
||||
cntNodes, err := s.getContainerNodes(cid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get container nodes for %s: %w", cid, err)
|
||||
}
|
||||
|
||||
// forEachNode executes callback for each node in the container until true is returned.
|
||||
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
|
||||
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {
|
||||
for _, n := range cntNodes {
|
||||
if bytes.Equal(n.PublicKey(), s.rawPub) {
|
||||
return nil
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/eacl"
|
||||
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -66,19 +67,22 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var resp *AddResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Add(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if resp != nil || outErr != nil {
|
||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var resp *AddResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Add(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
}
|
||||
|
||||
log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
||||
Parent: b.GetParentId(),
|
||||
Child: pilorama.RootID,
|
||||
Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())},
|
||||
|
@ -108,15 +112,17 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var resp *AddByPathResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.AddByPath(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if resp != nil || outErr != nil {
|
||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var resp *AddByPathResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.AddByPath(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
}
|
||||
|
||||
|
@ -127,7 +133,8 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
attr = pilorama.AttributeFilename
|
||||
}
|
||||
|
||||
logs, err := s.forest.TreeAddByPath(cid, b.GetTreeId(), attr, b.GetPath(), meta)
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||
logs, err := s.forest.TreeAddByPath(d, b.GetTreeId(), attr, b.GetPath(), meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -162,15 +169,17 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var resp *RemoveResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Remove(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if resp != nil || outErr != nil {
|
||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var resp *RemoveResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Remove(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
}
|
||||
|
||||
|
@ -178,7 +187,8 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
|
||||
}
|
||||
|
||||
log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
||||
Parent: pilorama.TrashID,
|
||||
Child: b.GetNodeId(),
|
||||
})
|
||||
|
@ -205,15 +215,17 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var resp *MoveResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Move(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if resp != nil || outErr != nil {
|
||||
ns, pos, size, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var resp *MoveResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Move(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
}
|
||||
|
||||
|
@ -221,7 +233,8 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
|
||||
}
|
||||
|
||||
log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||
log, err := s.forest.TreeMove(d, b.GetTreeId(), &pilorama.Move{
|
||||
Parent: b.GetParentId(),
|
||||
Child: b.GetNodeId(),
|
||||
Meta: pilorama.Meta{Items: protoToMeta(b.GetMeta())},
|
||||
|
@ -247,15 +260,17 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var resp *GetNodeByPathResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.GetNodeByPath(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if resp != nil || outErr != nil {
|
||||
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var resp *GetNodeByPathResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.GetNodeByPath(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
}
|
||||
|
||||
|
@ -325,22 +340,25 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
|
|||
return err
|
||||
}
|
||||
|
||||
var cli TreeService_GetSubTreeClient
|
||||
var outErr error
|
||||
err = s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool {
|
||||
cli, outErr = c.GetSubTree(srv.Context(), req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if outErr != nil {
|
||||
return outErr
|
||||
} else if cli != nil {
|
||||
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var cli TreeService_GetSubTreeClient
|
||||
var outErr error
|
||||
err = s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
||||
cli, outErr = c.GetSubTree(srv.Context(), req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if outErr != nil {
|
||||
return outErr
|
||||
}
|
||||
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
|
||||
if err := srv.Send(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
queue := []nodeDepthPair{{[]uint64{b.GetRootId()}, 0}}
|
||||
|
@ -391,18 +409,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
return nil, err
|
||||
}
|
||||
|
||||
found := false
|
||||
key := req.GetSignature().GetKey()
|
||||
nodes, _ := s.getContainerNodes(cid)
|
||||
|
||||
loop:
|
||||
for _, n := range nodes {
|
||||
if bytes.Equal(key, n.PublicKey()) {
|
||||
found = true
|
||||
break loop
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
_, pos, size, err := s.getContainerInfo(cid, key)
|
||||
if err == errNotInContainer {
|
||||
return nil, errors.New("`Apply` request must be signed by a container node")
|
||||
}
|
||||
|
||||
|
@ -413,7 +423,8 @@ loop:
|
|||
return nil, fmt.Errorf("can't parse meta-information: %w", err)
|
||||
}
|
||||
|
||||
return nil, s.forest.TreeApply(cid, req.GetBody().GetTreeId(), &pilorama.Move{
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||
return nil, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
|
||||
Parent: op.GetParentId(),
|
||||
Child: op.GetChildId(),
|
||||
Meta: meta,
|
||||
|
@ -428,22 +439,25 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
return err
|
||||
}
|
||||
|
||||
var cli TreeService_GetOpLogClient
|
||||
var outErr error
|
||||
err := s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool {
|
||||
cli, outErr = c.GetOpLog(srv.Context(), req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if outErr != nil {
|
||||
return outErr
|
||||
} else if cli != nil {
|
||||
ns, _, _, err := s.getContainerInfo(cid, s.rawPub)
|
||||
if err == errNotInContainer {
|
||||
var cli TreeService_GetOpLogClient
|
||||
var outErr error
|
||||
err := s.forEachNode(srv.Context(), ns, func(c TreeServiceClient) bool {
|
||||
cli, outErr = c.GetOpLog(srv.Context(), req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if outErr != nil {
|
||||
return outErr
|
||||
}
|
||||
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
|
||||
if err := srv.Send(resp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
h := b.GetHeight()
|
||||
|
@ -491,3 +505,21 @@ func metaToProto(arr []pilorama.KeyValue) []*KeyValue {
|
|||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
var errNotInContainer = errors.New("node doesn't belong to a container")
|
||||
|
||||
// getContainerInfo returns the list of container nodes, position in the container for the node
|
||||
// with pub key and total amount of nodes in all replicas.
|
||||
func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeInfo, int, int, error) {
|
||||
cntNodes, err := s.getContainerNodes(cid)
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
for i, node := range cntNodes {
|
||||
if bytes.Equal(node.PublicKey(), pub) {
|
||||
return cntNodes, i, len(cntNodes), nil
|
||||
}
|
||||
}
|
||||
return nil, 0, 0, errNotInContainer
|
||||
}
|
||||
|
|
|
@ -88,7 +88,8 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
|||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if err := s.forest.TreeApply(cid, treeID, m); err != nil {
|
||||
d := pilorama.CIDDescriptor{CID: cid}
|
||||
if err := s.forest.TreeApply(d, treeID, m); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
|
|
Loading…
Reference in a new issue