[#0] tree: Split GetOpLog stream

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-19 10:30:48 +03:00
parent 45dd5a692f
commit 134e5324d7
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0

View file

@ -219,38 +219,57 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
from := height
const batchSize = 10_000
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: height,
},
}
if err := SignMessage(req, s.key); err != nil {
return err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.GetParentId(),
Child: lm.GetChildId(),
for {
count := 0
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: from,
},
}
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
if err := SignMessage(req, s.key); err != nil {
return err
}
opsCh <- m
}
if !errors.Is(err, io.EOF) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
c, err := treeClient.GetOpLog(streamCtx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.GetParentId(),
Child: lm.GetChildId(),
}
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
return err
}
from = m.Time
opsCh <- m
count++
if count == batchSize {
break // c.Recv()
}
}
if err == nil {
// count == batchSize
// close current stream and start new one
cancel()
continue
}
if errors.Is(err, io.EOF) {
return nil
}
return err
}
return nil
}
// synchronizeTree synchronizes operations getting them from different nodes.