[#1577] container: Reduce iterations through container list
All checks were successful
DCO action / DCO (pull_request) Successful in 5m21s
Vulncheck / Vulncheck (pull_request) Successful in 5m29s
Tests and linters / Run gofumpt (pull_request) Successful in 5m39s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m52s
Build / Build Components (pull_request) Successful in 6m23s
Tests and linters / Staticcheck (pull_request) Successful in 6m46s
Tests and linters / gopls check (pull_request) Successful in 7m4s
Tests and linters / Tests (pull_request) Successful in 7m29s
Tests and linters / Tests with -race (pull_request) Successful in 7m33s
Tests and linters / Lint (pull_request) Successful in 7m37s
All checks were successful
DCO action / DCO (pull_request) Successful in 5m21s
Vulncheck / Vulncheck (pull_request) Successful in 5m29s
Tests and linters / Run gofumpt (pull_request) Successful in 5m39s
Pre-commit hooks / Pre-commit (pull_request) Successful in 5m52s
Build / Build Components (pull_request) Successful in 6m23s
Tests and linters / Staticcheck (pull_request) Successful in 6m46s
Tests and linters / gopls check (pull_request) Successful in 7m4s
Tests and linters / Tests (pull_request) Successful in 7m29s
Tests and linters / Tests with -race (pull_request) Successful in 7m33s
Tests and linters / Lint (pull_request) Successful in 7m37s
* When listing containers we used to iterate through the the whole list of containers twice: first when reading from a contract, then when sending them. Now we can send batches of containers when reading from the contract. Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
parent
2f54f7970e
commit
f1a1f3c568
3 changed files with 81 additions and 12 deletions
|
@ -222,6 +222,7 @@ type morphContainerReader struct {
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
|
||||||
|
return x.lister.IterateContainersOf(id, processCID)
|
||||||
|
}
|
||||||
|
|
||||||
type morphContainerWriter struct {
|
type morphContainerWriter struct {
|
||||||
neoClient *cntClient.Client
|
neoClient *cntClient.Client
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,20 +53,61 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.IterateContainersOf(idUser, cb, handleSessionErr); err != nil {
|
if err = c.iterateContainers(idUser, cb, handleSessionErr); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return cidList, nil
|
return cidList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateContainersOf iterates over a list of container identifiers
|
// IterateContainersOf calls `processCIDList` on a list of container identifiers belonging
|
||||||
|
// to the specified user of FrostFS system. If idUser is nil, calls it on the list of all containers.
|
||||||
|
// `processCIDList` is called on a full list of container identifiers if list's length is less than batchSize.
|
||||||
|
// Otherwise it is called on batchSize'd batches of container identifiers.
|
||||||
|
func (c *Client) IterateContainersOf(idUser *user.ID, processCID func(cid.ID) error) error {
|
||||||
|
cb := func(item stackitem.Item) error {
|
||||||
|
rawID, err := client.BytesFromStackItem(item)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var id cid.ID
|
||||||
|
|
||||||
|
err = id.Decode(rawID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return processCID(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
handleSessionErr := func() error {
|
||||||
|
cidList, err := c.list(idUser)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, id := range cidList {
|
||||||
|
if err = processCID(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.iterateContainers(idUser, cb, handleSessionErr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// iterateContainers iterates over a list of container identifiers
|
||||||
// belonging to the specified user of FrostFS system and executes
|
// belonging to the specified user of FrostFS system and executes
|
||||||
// `cb` on each element.
|
// `cb` on each element.
|
||||||
//
|
//
|
||||||
// This method uses `(*Client).TestInvokeIterator()` which can return
|
// This method uses `(*Client).TestInvokeIterator()` which can return
|
||||||
// `unwrap.ErrNoSessionID` if the remote neo-go node does not support sessions.
|
// `unwrap.ErrNoSessionID` if the remote neo-go node does not support sessions.
|
||||||
// So providing handler for this type of errors is required.
|
// So providing handler for this type of errors is required.
|
||||||
func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item stackitem.Item) error, handleSessionError func() error) error {
|
func (c *Client) iterateContainers(idUser *user.ID, cb func(item stackitem.Item) error, handleSessionError func() error) error {
|
||||||
var rawID []byte
|
var rawID []byte
|
||||||
if idUser != nil {
|
if idUser != nil {
|
||||||
rawID = idUser.WalletBytes()
|
rawID = idUser.WalletBytes()
|
||||||
|
|
|
@ -30,6 +30,7 @@ type Reader interface {
|
||||||
// to the specified user of FrostFS system. Returns the identifiers
|
// to the specified user of FrostFS system. Returns the identifiers
|
||||||
// of all FrostFS containers if pointer to owner identifier is nil.
|
// of all FrostFS containers if pointer to owner identifier is nil.
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writer is an interface of container storage updater.
|
// Writer is an interface of container storage updater.
|
||||||
|
@ -215,20 +216,42 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
|
||||||
return fmt.Errorf("invalid user ID: %w", err)
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrs, err := s.rdr.ContainersOf(&id)
|
resBody := new(container.ListStreamResponseBody)
|
||||||
if err != nil {
|
r := new(container.ListStreamResponse)
|
||||||
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
var cidList []cid.ID
|
||||||
|
processCID := func(id cid.ID) error {
|
||||||
|
select {
|
||||||
|
case <-stream.Context().Done():
|
||||||
|
return stream.Context().Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
cidList = append(cidList, id)
|
||||||
|
if len(cidList) == 512 { // 512 is batch size from pkg/morph/client/container/containers_of.go
|
||||||
|
r.GetBody().SetContainerIDs(getRefCIDList(cidList))
|
||||||
|
cidList = make([]cid.ID, 0, 512)
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(cidList) > 0 {
|
||||||
|
r.GetBody().SetContainerIDs(getRefCIDList(cidList))
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRefCIDList(cnrs []cid.ID) []refs.ContainerID {
|
||||||
cidList := make([]refs.ContainerID, len(cnrs))
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
for i := range cnrs {
|
for i := range cnrs {
|
||||||
cnrs[i].WriteToV2(&cidList[i])
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
}
|
}
|
||||||
|
return cidList
|
||||||
resBody := new(container.ListStreamResponseBody)
|
|
||||||
resBody.SetContainerIDs(cidList)
|
|
||||||
r := new(container.ListStreamResponse)
|
|
||||||
r.SetBody(resBody)
|
|
||||||
|
|
||||||
return stream.Send(r)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue