forked from TrueCloudLab/frostfs-node
Hello, I am a robot. #1
9 changed files with 57 additions and 90 deletions
|
@ -51,17 +51,17 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the pilorama.Forest interface.
|
// TreeApply implements the pilorama.Forest interface.
|
||||||
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
func (e *StorageEngine) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||||
index, lst, err := e.getTreeShard(d.CID, treeID)
|
index, lst, err := e.getTreeShard(cnr, treeID)
|
||||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = lst[index].TreeApply(d, treeID, m, backgroundSync)
|
err = lst[index].TreeApply(cnr, treeID, m, backgroundSync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||||
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
|
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cnr),
|
||||||
zap.String("tree", treeID))
|
zap.String("tree", treeID))
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -327,11 +327,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the Forest interface.
|
// TreeApply implements the Forest interface.
|
||||||
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error {
|
func (t *boltForest) TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
|
||||||
if !d.checkValid() {
|
|
||||||
return ErrInvalidCIDDescriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
t.modeMtx.RLock()
|
t.modeMtx.RLock()
|
||||||
defer t.modeMtx.RUnlock()
|
defer t.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -344,7 +340,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
||||||
if backgroundSync {
|
if backgroundSync {
|
||||||
var seen bool
|
var seen bool
|
||||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||||
treeRoot := tx.Bucket(bucketName(d.CID, treeID))
|
treeRoot := tx.Bucket(bucketName(cnr, treeID))
|
||||||
if treeRoot == nil {
|
if treeRoot == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -362,7 +358,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.db.MaxBatchSize == 1 {
|
if t.db.MaxBatchSize == 1 {
|
||||||
fullID := bucketName(d.CID, treeID)
|
fullID := bucketName(cnr, treeID)
|
||||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||||
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -375,11 +371,11 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan error, 1)
|
ch := make(chan error, 1)
|
||||||
t.addBatch(d, treeID, m, ch)
|
t.addBatch(cnr, treeID, m, ch)
|
||||||
return <-ch
|
return <-ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
|
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
|
||||||
t.mtx.Lock()
|
t.mtx.Lock()
|
||||||
for i := 0; i < len(t.batches); i++ {
|
for i := 0; i < len(t.batches); i++ {
|
||||||
t.batches[i].mtx.Lock()
|
t.batches[i].mtx.Lock()
|
||||||
|
@ -391,7 +387,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
|
found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
|
||||||
if found {
|
if found {
|
||||||
t.batches[i].results = append(t.batches[i].results, ch)
|
t.batches[i].results = append(t.batches[i].results, ch)
|
||||||
t.batches[i].operations = append(t.batches[i].operations, m)
|
t.batches[i].operations = append(t.batches[i].operations, m)
|
||||||
|
@ -412,7 +408,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
||||||
}
|
}
|
||||||
b := &batch{
|
b := &batch{
|
||||||
forest: t,
|
forest: t,
|
||||||
cid: d.CID,
|
cid: cnr,
|
||||||
treeID: treeID,
|
treeID: treeID,
|
||||||
results: []chan<- error{ch},
|
results: []chan<- error{ch},
|
||||||
operations: []*Move{m},
|
operations: []*Move{m},
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -93,12 +94,8 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the Forest interface.
|
// TreeApply implements the Forest interface.
|
||||||
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error {
|
func (f *memoryForest) TreeApply(cnr cid.ID, treeID string, op *Move, _ bool) error {
|
||||||
if !d.checkValid() {
|
fullID := cnr.String() + "/" + treeID
|
||||||
return ErrInvalidCIDDescriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
fullID := d.CID.String() + "/" + treeID
|
|
||||||
s, ok := f.treeMap[fullID]
|
s, ok := f.treeMap[fullID]
|
||||||
if !ok {
|
if !ok {
|
||||||
s = newState()
|
s = newState()
|
||||||
|
|
|
@ -411,21 +411,10 @@ func TestForest_Apply(t *testing.T) {
|
||||||
|
|
||||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
t.Run("invalid descriptor", func(t *testing.T) {
|
|
||||||
s := constructor(t)
|
|
||||||
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
|
|
||||||
Child: 10,
|
|
||||||
Parent: 0,
|
|
||||||
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
|
|
||||||
}, false)
|
|
||||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
|
||||||
})
|
|
||||||
|
|
||||||
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||||
Child: child,
|
Child: child,
|
||||||
Parent: parent,
|
Parent: parent,
|
||||||
Meta: meta,
|
Meta: meta,
|
||||||
|
@ -465,7 +454,6 @@ func TestForest_GetOpLog(t *testing.T) {
|
||||||
|
|
||||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
logs := []Move{
|
logs := []Move{
|
||||||
{
|
{
|
||||||
|
@ -491,7 +479,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op
|
||||||
})
|
})
|
||||||
|
|
||||||
for i := range logs {
|
for i := range logs {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &logs[i], false))
|
require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
testGetOpLog := func(t *testing.T, height uint64, m Move) {
|
testGetOpLog := func(t *testing.T, height uint64, m Move) {
|
||||||
|
@ -533,13 +521,12 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
||||||
|
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
|
|
||||||
t.Run("empty state, no panic", func(t *testing.T) {
|
t.Run("empty state, no panic", func(t *testing.T) {
|
||||||
checkExists(t, false, cid, treeID)
|
checkExists(t, false, cid, treeID)
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false))
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||||
checkExists(t, true, cid, treeID)
|
checkExists(t, true, cid, treeID)
|
||||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||||
|
@ -570,16 +557,16 @@ func TestApplyTricky1(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
|
cid := cidtest.ID()
|
||||||
for i := range providers {
|
for i := range providers {
|
||||||
t.Run(providers[i].name, func(t *testing.T) {
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
s := providers[i].construct(t)
|
s := providers[i].construct(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range expected {
|
for i := range expected {
|
||||||
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
|
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expected[i].parent, parent)
|
require.Equal(t, expected[i].parent, parent)
|
||||||
}
|
}
|
||||||
|
@ -631,16 +618,16 @@ func TestApplyTricky2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
|
cid := cidtest.ID()
|
||||||
for i := range providers {
|
for i := range providers {
|
||||||
t.Run(providers[i].name, func(t *testing.T) {
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
s := providers[i].construct(t)
|
s := providers[i].construct(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range expected {
|
for i := range expected {
|
||||||
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
|
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expected[i].parent, parent)
|
require.Equal(t, expected[i].parent, parent)
|
||||||
}
|
}
|
||||||
|
@ -746,12 +733,11 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
|
||||||
ops := prepareRandomTree(nodeCount, opCount)
|
ops := prepareRandomTree(nodeCount, opCount)
|
||||||
|
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
expected := constructor(t)
|
expected := constructor(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < iterCount; i++ {
|
for i := 0; i < iterCount; i++ {
|
||||||
|
@ -766,7 +752,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for op := range ch {
|
for op := range ch {
|
||||||
require.NoError(t, actual.TreeApply(d, treeID, op, false))
|
require.NoError(t, actual.TreeApply(cid, treeID, op, false))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -792,12 +778,11 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
|
||||||
ops := prepareRandomTree(nodeCount, opCount)
|
ops := prepareRandomTree(nodeCount, opCount)
|
||||||
|
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
expected := constructor(t)
|
expected := constructor(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
const iterCount = 200
|
const iterCount = 200
|
||||||
|
@ -807,7 +792,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
|
||||||
|
|
||||||
actual := constructor(t)
|
actual := constructor(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||||
}
|
}
|
||||||
|
@ -889,7 +874,6 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
||||||
|
|
||||||
ops := genFunc(b.N)
|
ops := genFunc(b.N)
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
ch := make(chan int, b.N)
|
ch := make(chan int, b.N)
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
@ -901,7 +885,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
||||||
b.SetParallelism(10)
|
b.SetParallelism(10)
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
|
if err := s.TreeApply(cid, treeID, &ops[<-ch], false); err != nil {
|
||||||
b.Fatalf("error in `Apply`: %v", err)
|
b.Fatalf("error in `Apply`: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -918,7 +902,6 @@ func TestTreeGetByPath(t *testing.T) {
|
||||||
|
|
||||||
func testTreeGetByPath(t *testing.T, s Forest) {
|
func testTreeGetByPath(t *testing.T, s Forest) {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
// /
|
// /
|
||||||
|
@ -928,12 +911,12 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
||||||
// |- cat1.jpg, Version=XXX (4)
|
// |- cat1.jpg, Version=XXX (4)
|
||||||
// |- cat1.jpg, Version=YYY (5)
|
// |- cat1.jpg, Version=YYY (5)
|
||||||
// |- cat2.jpg, Version=ZZZ (6)
|
// |- cat2.jpg, Version=ZZZ (6)
|
||||||
testMove(t, s, 0, 1, 0, d, treeID, "a", "")
|
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
|
||||||
testMove(t, s, 1, 2, 0, d, treeID, "b", "")
|
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
|
||||||
testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT")
|
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
|
||||||
testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX")
|
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
|
||||||
testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY")
|
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
|
||||||
testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ")
|
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
|
||||||
|
|
||||||
if mf, ok := s.(*memoryForest); ok {
|
if mf, ok := s.(*memoryForest); ok {
|
||||||
single := mf.treeMap[cid.String()+"/"+treeID]
|
single := mf.treeMap[cid.String()+"/"+treeID]
|
||||||
|
@ -970,14 +953,14 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) {
|
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
|
||||||
items := make([]KeyValue, 1, 2)
|
items := make([]KeyValue, 1, 2)
|
||||||
items[0] = KeyValue{AttributeFilename, []byte(filename)}
|
items[0] = KeyValue{AttributeFilename, []byte(filename)}
|
||||||
if version != "" {
|
if version != "" {
|
||||||
items = append(items, KeyValue{AttributeVersion, []byte(version)})
|
items = append(items, KeyValue{AttributeVersion, []byte(version)})
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||||
Parent: parent,
|
Parent: parent,
|
||||||
Child: node,
|
Child: node,
|
||||||
Meta: Meta{
|
Meta: Meta{
|
||||||
|
|
|
@ -18,7 +18,7 @@ type Forest interface {
|
||||||
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
|
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
|
||||||
// TreeApply applies replicated operation from another node.
|
// TreeApply applies replicated operation from another node.
|
||||||
// If background is true, TreeApply will first check whether an operation exists.
|
// If background is true, TreeApply will first check whether an operation exists.
|
||||||
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error
|
TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||||
// TreeGetByPath returns all nodes corresponding to the path.
|
// TreeGetByPath returns all nodes corresponding to the path.
|
||||||
// The path is constructed by descending from the root using the values of the
|
// The path is constructed by descending from the root using the values of the
|
||||||
// AttributeFilename in meta.
|
// AttributeFilename in meta.
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the pilorama.Forest interface.
|
// TreeApply implements the pilorama.Forest interface.
|
||||||
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
func (s *Shard) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return ErrPiloramaDisabled
|
return ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
|
||||||
if s.info.Mode.ReadOnly() {
|
if s.info.Mode.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeApply(d, treeID, m, backgroundSync)
|
return s.pilorama.TreeApply(cnr, treeID, m, backgroundSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeGetByPath implements the pilorama.Forest interface.
|
// TreeGetByPath implements the pilorama.Forest interface.
|
||||||
|
|
|
@ -27,7 +27,7 @@ type replicationTask struct {
|
||||||
|
|
||||||
type applyOp struct {
|
type applyOp struct {
|
||||||
treeID string
|
treeID string
|
||||||
pilorama.CIDDescriptor
|
cid cidSDK.ID
|
||||||
pilorama.Move
|
pilorama.Move
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func (s *Service) localReplicationWorker() {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
return
|
return
|
||||||
case op := <-s.replicateLocalCh:
|
case op := <-s.replicateLocalCh:
|
||||||
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
|
err := s.forest.TreeApply(op.cid, op.treeID, &op.Move, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("failed to apply replicated operation",
|
s.log.Error("failed to apply replicated operation",
|
||||||
zap.String("err", err.Error()))
|
zap.String("err", err.Error()))
|
||||||
|
|
|
@ -468,7 +468,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
|
|
||||||
key := req.GetSignature().GetKey()
|
key := req.GetSignature().GetKey()
|
||||||
|
|
||||||
_, pos, size, err := s.getContainerInfo(cid, key)
|
_, pos, _, err := s.getContainerInfo(cid, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -486,7 +486,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
select {
|
select {
|
||||||
case s.replicateLocalCh <- applyOp{
|
case s.replicateLocalCh <- applyOp{
|
||||||
treeID: req.GetBody().GetTreeId(),
|
treeID: req.GetBody().GetTreeId(),
|
||||||
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
|
cid: cid,
|
||||||
Move: pilorama.Move{
|
Move: pilorama.Move{
|
||||||
Parent: op.GetParentId(),
|
Parent: op.GetParentId(),
|
||||||
Child: op.GetChildId(),
|
Child: op.GetChildId(),
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -40,11 +41,6 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
||||||
return ErrNotInContainer
|
return ErrNotInContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
var d pilorama.CIDDescriptor
|
|
||||||
d.CID = cid
|
|
||||||
d.Position = pos
|
|
||||||
d.Size = len(nodes)
|
|
||||||
|
|
||||||
nodes = randomizeNodeOrder(nodes, pos)
|
nodes = randomizeNodeOrder(nodes, pos)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -87,18 +83,18 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tid := range treesToSync {
|
for _, tid := range treesToSync {
|
||||||
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
|
h, err := s.forest.TreeLastSyncHeight(cid, tid)
|
||||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
s.log.Warn("could not get last synchronized height for a tree",
|
s.log.Warn("could not get last synchronized height for a tree",
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cid),
|
||||||
zap.String("tree", tid))
|
zap.String("tree", tid))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
|
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
|
||||||
if h < newHeight {
|
if h < newHeight {
|
||||||
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
|
if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
|
||||||
s.log.Warn("could not update last synchronized height for a tree",
|
s.log.Warn("could not update last synchronized height for a tree",
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cid),
|
||||||
zap.String("tree", tid))
|
zap.String("tree", tid))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,24 +114,19 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
||||||
return ErrNotInContainer
|
return ErrNotInContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
var d pilorama.CIDDescriptor
|
|
||||||
d.CID = cid
|
|
||||||
d.Position = pos
|
|
||||||
d.Size = len(nodes)
|
|
||||||
|
|
||||||
nodes = randomizeNodeOrder(nodes, pos)
|
nodes = randomizeNodeOrder(nodes, pos)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.synchronizeTree(ctx, d, 0, treeID, nodes)
|
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64,
|
func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint64,
|
||||||
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
||||||
s.log.Debug("synchronize tree",
|
s.log.Debug("synchronize tree",
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cid),
|
||||||
zap.String("tree", treeID),
|
zap.String("tree", treeID),
|
||||||
zap.Uint64("from", from))
|
zap.Uint64("from", from))
|
||||||
|
|
||||||
|
@ -157,7 +148,7 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
||||||
|
|
||||||
treeClient := NewTreeServiceClient(cc)
|
treeClient := NewTreeServiceClient(cc)
|
||||||
for {
|
for {
|
||||||
h, err := s.synchronizeSingle(ctx, d, treeID, height, treeClient)
|
h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient)
|
||||||
if height < h {
|
if height < h {
|
||||||
height = h
|
height = h
|
||||||
}
|
}
|
||||||
|
@ -179,9 +170,9 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
||||||
return newHeight
|
return newHeight
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
func (s *Service) synchronizeSingle(ctx context.Context, cid cidSDK.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
d.CID.Encode(rawCID)
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
newHeight := height
|
newHeight := height
|
||||||
|
@ -211,7 +202,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
if err := s.forest.TreeApply(d, treeID, m, true); err != nil {
|
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
if m.Time > newHeight {
|
if m.Time > newHeight {
|
||||||
|
|
Loading…
Reference in a new issue