WIP: tree improvements for support/v0.42 #1649

Draft
dstepanov-yadro wants to merge 3 commits from dstepanov-yadro/frostfs-node:support/credit_systems into support/v0.42
3 changed files with 48 additions and 29 deletions

View file

@ -190,7 +190,7 @@ func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
treeID: treeID,
op: op,
}:
default:
case <-s.closeCh:
}
}

View file

@ -449,7 +449,7 @@ type stackItem struct {
}
func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid cidSDK.ID, b *GetSubTreeRequest_Body, forest pilorama.Forest) error {
const batchSize = 1000
const batchSize = 100_000
// For backward compatibility.
rootIDs := b.GetRootId()
@ -664,7 +664,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
Meta: meta,
},
}:
default:
case <-s.closeCh:
}
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
}

View file

@ -219,38 +219,57 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
from := height
const batchSize = 10_000
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: height,
},
}
if err := SignMessage(req, s.key); err != nil {
return err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.GetParentId(),
Child: lm.GetChildId(),
for {
count := 0
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: from,
},
}
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
if err := SignMessage(req, s.key); err != nil {
return err
}
opsCh <- m
}
if !errors.Is(err, io.EOF) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
c, err := treeClient.GetOpLog(streamCtx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.GetParentId(),
Child: lm.GetChildId(),
}
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
return err
}
from = m.Time
opsCh <- m
count++
if count == batchSize {
break // c.Recv()
}
}
if err == nil {
// count == batchSize
// close current stream and start new one
cancel()
continue
}
if errors.Is(err, io.EOF) {
return nil
}
return err
}
return nil
}
// synchronizeTree synchronizes operations getting them from different nodes.