From 209cf1f45ff5fd20520483d0c9d96baadf73b89a Mon Sep 17 00:00:00 2001 From: aarifullin Date: Mon, 25 Sep 2023 16:05:51 +0300 Subject: [PATCH] Debug --- pkg/local_object_storage/engine/tree.go | 5 +- pkg/local_object_storage/pilorama/batch.go | 6 + .../pilorama/bench_test.go | 2 +- pkg/local_object_storage/pilorama/boltdb.go | 110 ++++++++++++- pkg/local_object_storage/pilorama/forest.go | 3 +- .../pilorama/forest_test.go | 96 ++++++++++-- .../pilorama/interface.go | 3 +- pkg/local_object_storage/shard/tree.go | 5 +- pkg/services/tree/replicator.go | 2 +- pkg/services/tree/sync.go | 146 +++++++++++++++++- pkg/services/tree/sync_test.go | 2 +- 11 files changed, 349 insertions(+), 31 deletions(-) diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 55b9f9c5..4aaf9e66 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.opentelemetry.io/otel/attribute" @@ -79,7 +80,7 @@ func (e *StorageEngine) TreeAddByPath(ctx context.Context, d pilorama.CIDDescrip } // TreeApply implements the pilorama.Forest interface. -func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { +func (e *StorageEngine) TreeApply(l *logger.Logger, ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApply", trace.WithAttributes( attribute.String("container_id", cnr.EncodeToString()), @@ -94,7 +95,7 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str return err } - err = lst[index].TreeApply(ctx, cnr, treeID, m, backgroundSync) + err = lst[index].TreeApply(l, ctx, cnr, treeID, m, backgroundSync) if err != nil { if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { e.reportShardError(lst[index], "can't perform `TreeApply`", err, diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index c65488b7..0fb328e9 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -6,8 +6,10 @@ import ( "sync" "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.etcd.io/bbolt" + "go.uber.org/zap" ) type batch struct { @@ -22,6 +24,7 @@ type batch struct { treeID string results []chan<- error operations []*Move + log *logger.Logger } func (b *batch) trigger() { @@ -35,6 +38,9 @@ func (b *batch) trigger() { func (b *batch) run() { fullID := bucketName(b.cid, b.treeID) + if b.log != nil { + b.log.Info("boltForest.TreeApply.batch.run()", zap.Stringer("cid", b.cid), zap.String("tree_id", b.treeID)) + } err := b.forest.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID) if err != nil { diff --git a/pkg/local_object_storage/pilorama/bench_test.go b/pkg/local_object_storage/pilorama/bench_test.go index 3d5ff1a7..0067567b 100644 --- a/pkg/local_object_storage/pilorama/bench_test.go +++ b/pkg/local_object_storage/pilorama/bench_test.go @@ -47,7 +47,7 @@ func BenchmarkCreate(b *testing.B) { Child: Node(i + 1), Parent: RootID, } - if err := f.TreeApply(ctx, cid, treeID, op, true); err != nil { + if err := f.TreeApply(nil, ctx, cid, treeID, op, true); err != nil { b.FailNow() } } diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 2689e345..58b58acd 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -17,12 +17,14 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "github.com/nspcc-dev/neo-go/pkg/io" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) type boltForest struct { @@ -474,7 +476,18 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { } // TreeApply implements the Forest interface. -func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error { +func (t *boltForest) TreeApply(log *logger.Logger, ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error { + if log != nil { + log.Info("boltForest.TreeApply", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Bool("background_sync", backgroundSync), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } + var ( startedAt = time.Now() success = false @@ -519,12 +532,46 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string return nil }) if err != nil || seen { + if err == nil { + if log != nil { + log.Info("boltForest.TreeApply.err != nil || seen ", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Bool("background_sync", backgroundSync), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } + } else { + if log != nil { + log.Info("boltForest.TreeApply.err != nil || seen ", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Bool("background_sync", backgroundSync), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Error(err), + ) + } + } success = err == nil return metaerr.Wrap(err) } } if t.db.MaxBatchSize == 1 { + if log != nil { + log.Info("boltForest.TreeApply.t.db.MaxBatchSize == 1", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Bool("background_sync", backgroundSync), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } fullID := bucketName(cnr, treeID) err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, fullID) @@ -533,6 +580,16 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string } var lm Move + if log != nil { + log.Info("boltForest.TreeApply.applyOperation", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Bool("background_sync", backgroundSync), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } return t.applyOperation(bLog, bTree, []*Move{m}, &lm) })) success = err == nil @@ -540,13 +597,23 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string } ch := make(chan error, 1) - t.addBatch(cnr, treeID, m, ch) + if log != nil { + log.Info("boltForest.TreeApply.t.db.addBatch", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Bool("background_sync", backgroundSync), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } + t.addBatch(log, cnr, treeID, m, ch) err := <-ch success = err == nil return metaerr.Wrap(err) } -func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { +func (t *boltForest) addBatch(log *logger.Logger, cnr cidSDK.ID, treeID string, m *Move, ch chan error) { t.mtx.Lock() for i := 0; i < len(t.batches); i++ { t.batches[i].mtx.Lock() @@ -560,14 +627,41 @@ func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan err found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID if found { + if log != nil { + log.Info("boltForest.TreeApply.t.db.addBatch.found", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } t.batches[i].results = append(t.batches[i].results, ch) t.batches[i].operations = append(t.batches[i].operations, m) if len(t.batches[i].operations) == t.db.MaxBatchSize { + if log != nil { + log.Info("boltForest.TreeApply.t.db.addBatch.len(t.batches[i].operations) == t.db.MaxBatchSize", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } t.batches[i].timer.Stop() t.batches[i].timer = nil t.batches[i].mtx.Unlock() b := t.batches[i] t.mtx.Unlock() + if log != nil { + log.Info("boltForest.TreeApply.t.db.addBatch.trigger()", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } b.trigger() return } @@ -577,6 +671,15 @@ func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan err } t.batches[i].mtx.Unlock() } + if log != nil { + log.Info("boltForest.TreeApply.t.db.addBatch.b := &batch", + zap.Stringer("cid", cnr), + zap.String("tree_id", treeID), + zap.Uint64("m.Time", m.Time), + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + ) + } b := &batch{ forest: t, cid: cnr, @@ -585,6 +688,7 @@ func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan err operations: []*Move{m}, } b.mtx.Lock() + b.log = log b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger) b.mtx.Unlock() t.batches = append(t.batches, b) diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index e5612d2b..7fa16b47 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -6,6 +6,7 @@ import ( "strings" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) @@ -95,7 +96,7 @@ func (f *memoryForest) TreeAddByPath(_ context.Context, d CIDDescriptor, treeID } // TreeApply implements the Forest interface. -func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, op *Move, _ bool) error { +func (f *memoryForest) TreeApply(l *logger.Logger, _ context.Context, cnr cid.ID, treeID string, op *Move, _ bool) error { fullID := cnr.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 8e7fec20..3bb3588b 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -3,6 +3,7 @@ package pilorama import ( "context" "crypto/rand" + "encoding/binary" "fmt" mrand "math/rand" "path/filepath" @@ -11,13 +12,78 @@ import ( "testing" "time" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" "golang.org/x/sync/errgroup" ) +func TestT(t *testing.T) { + db, err := bbolt.Open("/home/aarifullin/ws/frostfs-dev-env/s03pil1", 0600, nil) + require.NoError(t, err) + defer db.Close() + + var cid cid.ID + err = cid.DecodeString("FTDvMtUCL6Nxb2mDUguAWgkU1LFqrnsLTX5o4DhQtEH") + require.NoError(t, err) + + //treeID := "version" + + //key := stateKey(make([]byte, 9), 3060020799381655479) + + err = db.View(func(tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + // Print the name of each bucket + + // if name[0] == 'm' { + // t.Logf("%s", name) + + if len(name) > 32 { + name = name[:32] + } else { + return nil + } + + var cid cidSDK.ID + dErr := cid.Decode(name) + if dErr != nil { + return nil + } + + if "FTDvMtUCL6Nxb2mDUguAWgkU1LFqrnsLTX5o4DhQtEH" != cid.EncodeToString() { + return nil + } + + dataBucket := []byte{0} + bb := b.Bucket(dataBucket) + _ = bb + + bb.ForEach(func(k, v []byte) error { + //2987480043680844786 + if k[0] == 's' { + nodeID := binary.LittleEndian.Uint64(k[1:]) + //binary.LittleEndian.PutUint64(key[1:], child) + t.Logf("nodeID = %d", nodeID) + } + return nil + }) + + // treeRoot := make([]byte, 32+len("version")) + // cid.Encode(treeRoot) + // copy(treeRoot[32:], treeID) + // return treeRoot + + // if cid.EncodeToString() == "FTDvMtUCL6Nxb2mDUguAWgkU1LFqrnsLTX5o4DhQtEH" { + // } + return nil + }) + }) + require.NoError(t, err) +} + var providers = []struct { name string construct func(t testing.TB, opts ...Option) Forest @@ -424,7 +490,7 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Optio treeID := "version" testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) { - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{ + require.NoError(t, s.TreeApply(nil, context.Background(), cid, treeID, &Move{ Child: child, Parent: parent, Meta: meta, @@ -514,20 +580,20 @@ func testForestApplySameOperation(t *testing.T, constructor func(t testing.TB, _ t.Run("expected", func(t *testing.T) { s := constructor(t) for i := range logs { - require.NoError(t, s.TreeApply(ctx, cid, treeID, &logs[i], false)) + require.NoError(t, s.TreeApply(nil, ctx, cid, treeID, &logs[i], false)) } check(t, s) }) s := constructor(t, WithMaxBatchSize(batchSize)) - require.NoError(t, s.TreeApply(ctx, cid, treeID, &logs[0], false)) + require.NoError(t, s.TreeApply(nil, ctx, cid, treeID, &logs[0], false)) for i := 0; i < batchSize; i++ { errG.Go(func() error { - return s.TreeApply(ctx, cid, treeID, &logs[2], false) + return s.TreeApply(nil, ctx, cid, treeID, &logs[2], false) }) } require.NoError(t, errG.Wait()) - require.NoError(t, s.TreeApply(ctx, cid, treeID, &logs[1], false)) + require.NoError(t, s.TreeApply(nil, ctx, cid, treeID, &logs[1], false)) check(t, s) } @@ -566,7 +632,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op }) for i := range logs { - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &logs[i], false)) + require.NoError(t, s.TreeApply(nil, context.Background(), cid, treeID, &logs[i], false)) } testGetOpLog := func(t *testing.T, height uint64, m Move) { @@ -613,7 +679,7 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O checkExists(t, false, cid, treeID) }) - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false)) + require.NoError(t, s.TreeApply(nil, context.Background(), cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false)) checkExists(t, true, cid, treeID) height, err := s.TreeHeight(context.Background(), cid, treeID) @@ -658,7 +724,7 @@ func TestApplyTricky1(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) { s := providers[i].construct(t) for i := range ops { - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &ops[i], false)) + require.NoError(t, s.TreeApply(nil, context.Background(), cid, treeID, &ops[i], false)) } for i := range expected { @@ -719,7 +785,7 @@ func TestApplyTricky2(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) { s := providers[i].construct(t) for i := range ops { - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &ops[i], false)) + require.NoError(t, s.TreeApply(nil, context.Background(), cid, treeID, &ops[i], false)) } for i := range expected { @@ -827,7 +893,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ expected := constructor(t, WithNoSync(true)) for i := range ops { - require.NoError(t, expected.TreeApply(context.Background(), cid, treeID, &ops[i], false)) + require.NoError(t, expected.TreeApply(nil, context.Background(), cid, treeID, &ops[i], false)) } for i := 0; i < iterCount; i++ { @@ -842,7 +908,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ go func() { defer wg.Done() for op := range ch { - require.NoError(t, actual.TreeApply(context.Background(), cid, treeID, op, false)) + require.NoError(t, actual.TreeApply(nil, context.Background(), cid, treeID, op, false)) } }() } @@ -872,7 +938,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ .. expected := constructor(t, WithNoSync(true)) for i := range ops { - require.NoError(t, expected.TreeApply(context.Background(), cid, treeID, &ops[i], false)) + require.NoError(t, expected.TreeApply(nil, context.Background(), cid, treeID, &ops[i], false)) } const iterCount = 200 @@ -882,7 +948,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ .. actual := constructor(t, WithNoSync(true)) for i := range ops { - require.NoError(t, actual.TreeApply(context.Background(), cid, treeID, &ops[i], false)) + require.NoError(t, actual.TreeApply(nil, context.Background(), cid, treeID, &ops[i], false)) } compareForests(t, expected, actual, cid, treeID, nodeCount) } @@ -975,7 +1041,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { b.SetParallelism(10) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - if err := s.TreeApply(context.Background(), cid, treeID, &ops[<-ch], false); err != nil { + if err := s.TreeApply(nil, context.Background(), cid, treeID, &ops[<-ch], false); err != nil { b.Fatalf("error in `Apply`: %v", err) } } @@ -1050,7 +1116,7 @@ func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, items = append(items, KeyValue{AttributeVersion, []byte(version)}) } - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{ + require.NoError(t, s.TreeApply(nil, context.Background(), cid, treeID, &Move{ Parent: parent, Child: node, Meta: Meta{ diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index e7f7eb51..6dcafca4 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -5,6 +5,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) @@ -20,7 +21,7 @@ type Forest interface { TreeAddByPath(ctx context.Context, d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) // TreeApply applies replicated operation from another node. // If background is true, TreeApply will first check whether an operation exists. - TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error + TreeApply(l *logger.Logger, ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index 7795b820..731e4790 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.opentelemetry.io/otel/attribute" @@ -79,7 +80,7 @@ func (s *Shard) TreeAddByPath(ctx context.Context, d pilorama.CIDDescriptor, tre } // TreeApply implements the pilorama.Forest interface. -func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { +func (s *Shard) TreeApply(l *logger.Logger, ctx context.Context, cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApply", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), @@ -103,7 +104,7 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m * if s.info.Mode.NoMetabase() { return ErrDegradedMode } - return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync) + return s.pilorama.TreeApply(l, ctx, cnr, treeID, m, backgroundSync) } // TreeGetByPath implements the pilorama.Forest interface. diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 0ca30273..3e3ebbd0 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -54,7 +54,7 @@ func (s *Service) localReplicationWorker(ctx context.Context) { ), ) - err := s.forest.TreeApply(ctx, op.cid, op.treeID, &op.Move, false) + err := s.forest.TreeApply(nil, ctx, op.cid, op.treeID, &op.Move, false) if err != nil { s.log.Error(logs.TreeFailedToApplyReplicatedOperation, zap.String("err", err.Error())) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 9cff8b35..3b74669d 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -8,6 +8,7 @@ import ( "io" "math" "math/rand" + "strings" "sync" "time" @@ -38,17 +39,30 @@ const defaultSyncWorkerCount = 20 // tree IDs from the other container nodes. Returns ErrNotInContainer if the node // is not included in the container. func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { + s.log.Info("Service.synchronizeAllTrees.synchronizeAllTrees.Start") nodes, pos, err := s.getContainerNodes(cid) if err != nil { return fmt.Errorf("can't get container nodes: %w", err) } + s.log.Info("Service.synchronizeAllTrees.getContainerNodes", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), + ) if pos < 0 { + s.log.Info("Service.synchronizeAllTrees.ErrNotInContainer", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), + ) return ErrNotInContainer } nodes = randomizeNodeOrder(nodes, pos) if len(nodes) == 0 { + s.log.Info("Service.synchronizeAllTrees.len(nodes) == 0", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), + ) return nil } @@ -61,6 +75,10 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { }, } + s.log.Info("Service.synchronizeAllTrees.SignMessage", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid)) + err = SignMessage(req, s.key) if err != nil { return fmt.Errorf("could not sign request: %w", err) @@ -70,14 +88,28 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { var treesToSync []string var outErr error + s.log.Info("Service.synchronizeAllTrees.forEachNode", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid)) + err = s.forEachNode(ctx, nodes, func(c TreeServiceClient) bool { + s.log.Info("Service.synchronizeAllTrees.TreeList", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid)) resp, outErr = c.TreeList(ctx, req) if outErr != nil { + s.log.Info("Service.synchronizeAllTrees.outErr != nil", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid)) return false } treesToSync = resp.GetBody().GetIds() + s.log.Info("Service.synchronizeAllTrees.treesToSync", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), zap.String("treeToSync", strings.Join(treesToSync, ", "))) + return true }) if err != nil { @@ -89,6 +121,9 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { } for _, tid := range treesToSync { + s.log.Info("Service.synchronizeAllTrees.TreeLastSyncHeight", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), zap.String("treeToSync", strings.Join(treesToSync, ", "))) h, err := s.forest.TreeLastSyncHeight(ctx, cid, tid) if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { s.log.Warn(logs.TreeCouldNotGetLastSynchronizedHeightForATree, @@ -98,6 +133,10 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { } newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes) if h < newHeight { + s.log.Info("Service.synchronizeAllTrees.TreeUpdateLastSyncHeight", + zap.Int("new_height", int(newHeight)), + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), zap.String("treeToSync", strings.Join(treesToSync, ", "))) if err := s.forest.TreeUpdateLastSyncHeight(ctx, cid, tid, newHeight); err != nil { s.log.Warn(logs.TreeCouldNotUpdateLastSynchronizedHeightForATree, zap.Stringer("cid", cid), @@ -111,12 +150,21 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { // SynchronizeTree tries to synchronize log starting from the last stored height. func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string) error { + s.log.Info("Service.synchronizeAllTrees.synchronizeAllTrees.SynchronizeTree") nodes, pos, err := s.getContainerNodes(cid) if err != nil { return fmt.Errorf("can't get container nodes: %w", err) } + s.log.Info("Service.synchronizeAllTrees.SynchronizeTree", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), + ) if pos < 0 { + s.log.Info("Service.synchronizeAllTrees.ErrNotInContainer", + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), + ) return ErrNotInContainer } @@ -125,12 +173,17 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string return nil } + s.log.Info("Service.synchronizeAllTrees.synchronizeTree", + zap.String("tree_id", treeID), + zap.Int("nodes_length", len(nodes)), + zap.Stringer("cid", cid), + ) s.synchronizeTree(ctx, cid, 0, treeID, nodes) return nil } // mergeOperationStreams performs merge sort for node operation streams to one stream. -func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { +func mergeOperationStreams(s *Service, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { defer close(merged) ms := make([]*pilorama.Move, len(streams)) @@ -148,6 +201,10 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram // If all ops have been successfully applied, we must start from the last // operation height from the stream B. This height is stored in minStreamedLastHeight. var minStreamedLastHeight uint64 = math.MaxUint64 + if s != nil { + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams", + zap.Uint64("from", minStreamedLastHeight)) + } for { var minTimeMoveTime uint64 = math.MaxUint64 @@ -156,18 +213,49 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram if m != nil && minTimeMoveTime > m.Time { minTimeMoveTime = m.Time minTimeMoveIndex = i + if s != nil { + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams", + zap.Uint64("minTimeMoveTime", minTimeMoveTime), + zap.Int("minTimeMoveIndex", minTimeMoveIndex), + ) + } } } if minTimeMoveIndex == -1 { + if s != nil { + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams.break", + zap.Uint64("minTimeMoveTime", minTimeMoveTime), + zap.Int("minTimeMoveIndex", minTimeMoveIndex), + ) + } break } merged <- ms[minTimeMoveIndex] height := ms[minTimeMoveIndex].Time + + if s != nil { + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams. before if", + zap.Uint64("m.Parent", ms[minTimeMoveIndex].Parent), + zap.Uint64("m.Child", ms[minTimeMoveIndex].Child), + zap.Int("height", int(ms[minTimeMoveIndex].Time)), + ) + } if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil { if minStreamedLastHeight > height { + if s != nil { + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams.minStreamedLastHeight > height", + zap.Uint64("minStreamedLastHeight", height), + ) + } minStreamedLastHeight = height + } else { + if s != nil { + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams.minStreamedLastHeight <= height", + zap.Uint64("height", height), + ) + } } } } @@ -178,7 +266,7 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, operationStream <-chan *pilorama.Move) uint64 { errGroup, _ := errgroup.WithContext(ctx) - const workersCount = 1024 + const workersCount = 4 errGroup.SetLimit(workersCount) // We run TreeApply concurrently for the operation batch. Let's consider two operations @@ -188,21 +276,49 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s var unappliedOperationHeight uint64 = math.MaxUint64 var heightMtx sync.Mutex + s.log.Debug("(*Service).applyOperationStream", + zap.Uint64("unappliedOperationHeight", unappliedOperationHeight)) + var prev *pilorama.Move for m := range operationStream { m := m + s.log.Debug("(*Service).applyOperationStream.for m := range operationStream", + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Uint64("m.Time", m.Time)) + // skip already applied op if prev != nil && prev.Time == m.Time { + s.log.Debug("(*Service).applyOperationStream.skip already applied op", + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Uint64("m.Time", m.Time)) continue } prev = m errGroup.Go(func() error { - if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { + s.log.Debug("(*Service).TreeApply", + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Uint64("m.Time", m.Time), + zap.Uint64("unappliedOperationHeight", unappliedOperationHeight)) + if err := s.forest.TreeApply(s.log, ctx, cid, treeID, m, true); err != nil { heightMtx.Lock() if m.Time < unappliedOperationHeight { unappliedOperationHeight = m.Time + s.log.Debug("(*Service).applyOperationStream.m.Time < unappliedOperationHeight", + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Uint64("m.Time", m.Time), + zap.Uint64("unappliedOperationHeight", unappliedOperationHeight)) + } else { + s.log.Debug("(*Service).applyOperationStream.m.Time >= unappliedOperationHeight", + zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Uint64("m.Time", m.Time), + zap.Uint64("unappliedOperationHeight", unappliedOperationHeight)) } heightMtx.Unlock() return err @@ -216,6 +332,9 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) { + s.log.Debug("(*Service).startStream", + zap.Stringer("cid", cid), + zap.Uint64("height", height)) rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) @@ -247,11 +366,22 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, return 0, err } opsCh <- m + s.log.Debug("(*Service).startStream.m", + zap.Stringer("cid", cid), + zap.Uint64("height", height), zap.Uint64("m.Parent", m.Parent), + zap.Uint64("m.Child", m.Child), + zap.Uint64("m.Time", m.Time)) } if height == newHeight || err != nil && !errors.Is(err, io.EOF) { + s.log.Debug("(*Service).startStream.if height == newHeight", + zap.Stringer("cid", cid), + zap.Uint64("height", height), zap.Uint64("newHeight", newHeight)) return newHeight, err } height = newHeight + s.log.Debug("(*Service).startStream.if height = newHeight", + zap.Stringer("cid", cid), + zap.Uint64("height", height), zap.Uint64("newHeight", newHeight)) } } @@ -278,11 +408,19 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, merged := make(chan *pilorama.Move) var minStreamedLastHeight uint64 errGroup.Go(func() error { - minStreamedLastHeight = mergeOperationStreams(nodeOperationStreams, merged) + s.log.Debug("(*Service).synchronizeTree.mergeOperationStreams", + zap.Stringer("cid", cid), + zap.String("tree", treeID), + zap.Uint64("from", from)) + minStreamedLastHeight = mergeOperationStreams(nil, nodeOperationStreams, merged) return nil }) var minUnappliedHeight uint64 errGroup.Go(func() error { + s.log.Debug("(*Service).synchronizeTree.applyOperationStream", + zap.Stringer("cid", cid), + zap.String("tree", treeID), + zap.Uint64("from", from)) minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged) return nil }) diff --git a/pkg/services/tree/sync_test.go b/pkg/services/tree/sync_test.go index 190b4ccb..2a186ddf 100644 --- a/pkg/services/tree/sync_test.go +++ b/pkg/services/tree/sync_test.go @@ -66,7 +66,7 @@ func Test_mergeOperationStreams(t *testing.T) { merged := make(chan *pilorama.Move, 1) min := make(chan uint64) go func() { - min <- mergeOperationStreams(nodeOpChans, merged) + min <- mergeOperationStreams(nil, nodeOpChans, merged) }() var res []uint64