From 45dd5a692f5a2beae5b6886ad552387abc631343 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 19 Feb 2025 09:57:37 +0300 Subject: [PATCH 1/3] [#0] tree: Increase batch size Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 2012f53d2..a27b6c6a3 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -449,7 +449,7 @@ type stackItem struct { } func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error { - const batchSize = 1000 + const batchSize = 100_000 // For backward compatibility. rootIDs := b.GetRootId() -- 2.45.3 From 134e5324d7c33d6c38160204a499ccc43f8cb753 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 19 Feb 2025 10:30:48 +0300 Subject: [PATCH 2/3] [#0] tree: Split GetOpLog stream Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/sync.go | 71 +++++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 0f85f50b1..45b62c51a 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -219,38 +219,57 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) + from := height + const batchSize = 10_000 - req := &GetOpLogRequest{ - Body: &GetOpLogRequest_Body{ - ContainerId: rawCID, - TreeId: treeID, - Height: height, - }, - } - if err := SignMessage(req, s.key); err != nil { - return err - } - - c, err := treeClient.GetOpLog(ctx, req) - if err != nil { - return fmt.Errorf("can't initialize client: %w", err) - } - res, err := c.Recv() - for ; err == nil; res, err = c.Recv() { - lm := res.GetBody().GetOperation() - m := &pilorama.Move{ - Parent: lm.GetParentId(), - Child: lm.GetChildId(), + for { + count := 0 + req := &GetOpLogRequest{ + Body: &GetOpLogRequest_Body{ + ContainerId: rawCID, + TreeId: treeID, + Height: from, + }, } - if err := m.Meta.FromBytes(lm.GetMeta()); err != nil { + if err := SignMessage(req, s.key); err != nil { return err } - opsCh <- m - } - if !errors.Is(err, io.EOF) { + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + c, err := treeClient.GetOpLog(streamCtx, req) + if err != nil { + return fmt.Errorf("can't initialize client: %w", err) + } + res, err := c.Recv() + for ; err == nil; res, err = c.Recv() { + lm := res.GetBody().GetOperation() + m := &pilorama.Move{ + Parent: lm.GetParentId(), + Child: lm.GetChildId(), + } + if err := m.Meta.FromBytes(lm.GetMeta()); err != nil { + return err + } + from = m.Time + opsCh <- m + count++ + if count == batchSize { + break // c.Recv() + } + } + if err == nil { + // count == batchSize + // close current stream and start new one + cancel() + continue + } + if errors.Is(err, io.EOF) { + return nil + } return err } - return nil } // synchronizeTree synchronizes operations getting them from different nodes. -- 2.45.3 From 0a3a36e74254189baabc1c064d0e5da35dafb978 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 19 Feb 2025 11:00:58 +0300 Subject: [PATCH 3/3] [#0] tree: Synchronous replication Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/replicator.go | 2 +- pkg/services/tree/service.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 346198b3c..0949cd4c6 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -190,7 +190,7 @@ func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) { treeID: treeID, op: op, }: - default: + case <-s.closeCh: } } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index a27b6c6a3..9878bfecc 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -664,7 +664,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e Meta: meta, }, }: - default: + case <-s.closeCh: } return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil } -- 2.45.3