forked from TrueCloudLab/frostfs-node
[#1577] container: Reduce iterations through container list
* Separated iteration through container ids from `ContainersOf()` so that it could be reused. * When listing containers we used to iterate through the the whole list of containers twice: first when reading from a contract, then when sending them. Now we can send batches of containers when reading from the contract. Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
parent
6fe34d266a
commit
242f0095d0
4 changed files with 94 additions and 53 deletions
|
@ -30,6 +30,7 @@ type Reader interface {
|
|||
// to the specified user of FrostFS system. Returns the identifiers
|
||||
// of all FrostFS containers if pointer to owner identifier is nil.
|
||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||
}
|
||||
|
||||
// Writer is an interface of container storage updater.
|
||||
|
@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||
func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||
body := req.GetBody()
|
||||
idV2 := body.GetOwnerID()
|
||||
if idV2 == nil {
|
||||
|
@ -215,20 +216,41 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
|
|||
return fmt.Errorf("invalid user ID: %w", err)
|
||||
}
|
||||
|
||||
cnrs, err := s.rdr.ContainersOf(&id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cidList := make([]refs.ContainerID, len(cnrs))
|
||||
for i := range cnrs {
|
||||
cnrs[i].WriteToV2(&cidList[i])
|
||||
}
|
||||
|
||||
resBody := new(container.ListStreamResponseBody)
|
||||
resBody.SetContainerIDs(cidList)
|
||||
r := new(container.ListStreamResponse)
|
||||
r.SetBody(resBody)
|
||||
|
||||
return stream.Send(r)
|
||||
var cidList []refs.ContainerID
|
||||
|
||||
// Amount of containers to send at once.
|
||||
const batchSize = 1000
|
||||
|
||||
processCID := func(id cid.ID) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
var refID refs.ContainerID
|
||||
id.WriteToV2(&refID)
|
||||
cidList = append(cidList, refID)
|
||||
if len(cidList) == batchSize {
|
||||
r.GetBody().SetContainerIDs(cidList)
|
||||
cidList = cidList[:0]
|
||||
return stream.Send(r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(cidList) > 0 {
|
||||
r.GetBody().SetContainerIDs(cidList)
|
||||
return stream.Send(r)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue