WIP: tree improvements for support/v0.42 #1649
3 changed files with 48 additions and 29 deletions
|
@ -190,7 +190,7 @@ func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
|||
treeID: treeID,
|
||||
op: op,
|
||||
}:
|
||||
default:
|
||||
case <-s.closeCh:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -449,7 +449,7 @@ type stackItem struct {
|
|||
}
|
||||
|
||||
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
|
||||
const batchSize = 1000
|
||||
const batchSize = 100_000
|
||||
|
||||
// For backward compatibility.
|
||||
rootIDs := b.GetRootId()
|
||||
|
@ -664,7 +664,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
Meta: meta,
|
||||
},
|
||||
}:
|
||||
default:
|
||||
case <-s.closeCh:
|
||||
}
|
||||
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
||||
}
|
||||
|
|
|
@ -219,38 +219,57 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
|||
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
from := height
|
||||
const batchSize = 10_000
|
||||
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: height,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.GetParentId(),
|
||||
Child: lm.GetChildId(),
|
||||
for {
|
||||
count := 0
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: from,
|
||||
},
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return err
|
||||
}
|
||||
opsCh <- m
|
||||
}
|
||||
if !errors.Is(err, io.EOF) {
|
||||
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
c, err := treeClient.GetOpLog(streamCtx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.GetParentId(),
|
||||
Child: lm.GetChildId(),
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
|
||||
return err
|
||||
}
|
||||
from = m.Time
|
||||
opsCh <- m
|
||||
count++
|
||||
if count == batchSize {
|
||||
break // c.Recv()
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
// count == batchSize
|
||||
// close current stream and start new one
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||
|
|
Loading…
Add table
Reference in a new issue