WIP: tree improvements for support/v0.42 #1649
3 changed files with 48 additions and 29 deletions
|
@ -190,7 +190,7 @@ func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
||||||
treeID: treeID,
|
treeID: treeID,
|
||||||
op: op,
|
op: op,
|
||||||
}:
|
}:
|
||||||
default:
|
case <-s.closeCh:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,7 +449,7 @@ type stackItem struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
|
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
|
||||||
const batchSize = 1000
|
const batchSize = 100_000
|
||||||
|
|
||||||
// For backward compatibility.
|
// For backward compatibility.
|
||||||
rootIDs := b.GetRootId()
|
rootIDs := b.GetRootId()
|
||||||
|
@ -664,7 +664,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
Meta: meta,
|
Meta: meta,
|
||||||
},
|
},
|
||||||
}:
|
}:
|
||||||
default:
|
case <-s.closeCh:
|
||||||
}
|
}
|
||||||
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -219,38 +219,57 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
|
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
cid.Encode(rawCID)
|
cid.Encode(rawCID)
|
||||||
|
from := height
|
||||||
|
const batchSize = 10_000
|
||||||
|
|
||||||
req := &GetOpLogRequest{
|
for {
|
||||||
Body: &GetOpLogRequest_Body{
|
count := 0
|
||||||
ContainerId: rawCID,
|
req := &GetOpLogRequest{
|
||||||
TreeId: treeID,
|
Body: &GetOpLogRequest_Body{
|
||||||
Height: height,
|
ContainerId: rawCID,
|
||||||
},
|
TreeId: treeID,
|
||||||
}
|
Height: from,
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
},
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't initialize client: %w", err)
|
|
||||||
}
|
|
||||||
res, err := c.Recv()
|
|
||||||
for ; err == nil; res, err = c.Recv() {
|
|
||||||
lm := res.GetBody().GetOperation()
|
|
||||||
m := &pilorama.Move{
|
|
||||||
Parent: lm.GetParentId(),
|
|
||||||
Child: lm.GetChildId(),
|
|
||||||
}
|
}
|
||||||
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
opsCh <- m
|
|
||||||
}
|
streamCtx, cancel := context.WithCancel(ctx)
|
||||||
if !errors.Is(err, io.EOF) {
|
defer cancel()
|
||||||
|
|
||||||
|
c, err := treeClient.GetOpLog(streamCtx, req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't initialize client: %w", err)
|
||||||
|
}
|
||||||
|
res, err := c.Recv()
|
||||||
|
for ; err == nil; res, err = c.Recv() {
|
||||||
|
lm := res.GetBody().GetOperation()
|
||||||
|
m := &pilorama.Move{
|
||||||
|
Parent: lm.GetParentId(),
|
||||||
|
Child: lm.GetChildId(),
|
||||||
|
}
|
||||||
|
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
from = m.Time
|
||||||
|
opsCh <- m
|
||||||
|
count++
|
||||||
|
if count == batchSize {
|
||||||
|
break // c.Recv()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
// count == batchSize
|
||||||
|
// close current stream and start new one
|
||||||
|
cancel()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// synchronizeTree synchronizes operations getting them from different nodes.
|
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||||
|
|
Loading…
Add table
Reference in a new issue