[#1577] container: Reduce iterations through container list
All checks were successful
DCO action / DCO (pull_request) Successful in 4m9s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m58s
Tests and linters / Run gofumpt (pull_request) Successful in 4m53s
Vulncheck / Vulncheck (pull_request) Successful in 4m58s
Build / Build Components (pull_request) Successful in 5m35s
Tests and linters / gopls check (pull_request) Successful in 5m54s
Tests and linters / Staticcheck (pull_request) Successful in 6m0s
Tests and linters / Tests (pull_request) Successful in 6m22s
Tests and linters / Lint (pull_request) Successful in 6m33s
Tests and linters / Tests with -race (pull_request) Successful in 6m30s

* When listing containers we used to iterate through the
  the whole list of containers twice: first when reading from
  a contract, then when sending them. Now we can send batches
  of containers when reading from the contract.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
Ekaterina Lebedeva 2024-12-25 14:27:47 +03:00
parent f57891dbda
commit ce7fc30350
3 changed files with 77 additions and 11 deletions

View file

@ -222,6 +222,7 @@ type morphContainerReader struct {
lister interface {
ContainersOf(*user.ID) ([]cid.ID, error)
ContainersOfWithProcessing(*user.ID, func([]cid.ID) error) error
}
}
@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id)
}
func (x *morphContainerReader) ContainersOfWithProcessing(id *user.ID, processCIDList func([]cid.ID) error) error {
return x.lister.ContainersOfWithProcessing(id, processCIDList)
}
type morphContainerWriter struct {
neoClient *cntClient.Client
}

View file

@ -59,6 +59,59 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
return cidList, nil
}
// ContainersOfWithProcessing calls `processCIDList` on a list of container identifiers belonging
// to the specified user of FrostFS system. If idUser is nil, calls it on the list of all containers.
// `processCIDList` is called on a full list of container identifiers if list's length is less than batchSize.
// Otherwise it is called on batchSize'd batches of container identifiers.
func (c *Client) ContainersOfWithProcessing(idUser *user.ID, processCIDList func([]cid.ID) error) error {
var cidList []cid.ID
var err error
var batchProcessed bool
cb := func(item stackitem.Item) error {
rawID, err := client.BytesFromStackItem(item)
if err != nil {
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
}
var id cid.ID
err = id.Decode(rawID)
if err != nil {
return fmt.Errorf("decode container ID: %w", err)
}
cidList = append(cidList, id)
if len(cidList) == batchSize {
batchProcessed = true
cidListCopy := make([]cid.ID, batchSize)
copy(cidListCopy, cidList)
cidList = make([]cid.ID, 0, batchSize)
return processCIDList(cidListCopy)
}
batchProcessed = false
return nil
}
handleSessionErr := func() error {
cidList, err = c.list(idUser)
if err != nil {
return err
}
return processCIDList(cidList)
}
if err := c.IterateContainersOf(idUser, cb, handleSessionErr); err != nil {
return err
}
if !batchProcessed {
return processCIDList(cidList)
}
return nil
}
// IterateContainersOf iterates over a list of container identifiers
// belonging to the specified user of FrostFS system and executes
// `cb` on each element.

View file

@ -30,6 +30,13 @@ type Reader interface {
// to the specified user of FrostFS system. Returns the identifiers
// of all FrostFS containers if pointer to owner identifier is nil.
ContainersOf(*user.ID) ([]cid.ID, error)
ContainersOfWithProcessing(*user.ID, func([]cid.ID) error) error
}
type ContainersOfClientPrm struct {
OwnerID *user.ID
Stream containerSvc.ListStream
Resp *container.ListStreamResponse
}
// Writer is an interface of container storage updater.
@ -215,20 +222,21 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
resBody := new(container.ListStreamResponseBody)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
processCIDList := func(cidList []cid.ID) error {
r.GetBody().SetContainerIDs(getRefCIDList(cidList))
return stream.Send(r)
}
return s.rdr.ContainersOfWithProcessing(&id, processCIDList)
}
func getRefCIDList(cnrs []cid.ID) []refs.ContainerID {
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
return cidList
}