diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index fb2550a03..be0acf738 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -222,6 +222,7 @@ type morphContainerReader struct { lister interface { ContainersOf(*user.ID) ([]cid.ID, error) + IterateContainersOf(*user.ID, func(cid.ID) error) error } } @@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) { return x.lister.ContainersOf(id) } +func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error { + return x.lister.IterateContainersOf(id, processCID) +} + type morphContainerWriter struct { neoClient *cntClient.Client } diff --git a/pkg/morph/client/container/containers_of.go b/pkg/morph/client/container/containers_of.go index f1f073806..3daf3bbd3 100644 --- a/pkg/morph/client/container/containers_of.go +++ b/pkg/morph/client/container/containers_of.go @@ -53,20 +53,61 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) { return nil } - if err = c.IterateContainersOf(idUser, cb, handleSessionErr); err != nil { + if err = c.iterateContainers(idUser, cb, handleSessionErr); err != nil { return nil, err } return cidList, nil } -// IterateContainersOf iterates over a list of container identifiers +// IterateContainersOf calls `processCIDList` on a list of container identifiers belonging +// to the specified user of FrostFS system. If idUser is nil, calls it on the list of all containers. +// `processCIDList` is called on a full list of container identifiers if list's length is less than batchSize. +// Otherwise it is called on batchSize'd batches of container identifiers. +func (c *Client) IterateContainersOf(idUser *user.ID, processCID func(cid.ID) error) error { + cb := func(item stackitem.Item) error { + rawID, err := client.BytesFromStackItem(item) + if err != nil { + return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err) + } + + var id cid.ID + + err = id.Decode(rawID) + if err != nil { + return fmt.Errorf("decode container ID: %w", err) + } + + return processCID(id) + } + + handleSessionErr := func() error { + cidList, err := c.list(idUser) + if err != nil { + return err + } + for _, id := range cidList { + if err = processCID(id); err != nil { + return err + } + } + return nil + } + + if err := c.iterateContainers(idUser, cb, handleSessionErr); err != nil { + return err + } + + return nil +} + +// iterateContainers iterates over a list of container identifiers // belonging to the specified user of FrostFS system and executes // `cb` on each element. // // This method uses `(*Client).TestInvokeIterator()` which can return // `unwrap.ErrNoSessionID` if the remote neo-go node does not support sessions. // So providing handler for this type of errors is required. -func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item stackitem.Item) error, handleSessionError func() error) error { +func (c *Client) iterateContainers(idUser *user.ID, cb func(item stackitem.Item) error, handleSessionError func() error) error { var rawID []byte if idUser != nil { rawID = idUser.WalletBytes() diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index e9d1606f1..a3eca7254 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -30,6 +30,7 @@ type Reader interface { // to the specified user of FrostFS system. Returns the identifiers // of all FrostFS containers if pointer to owner identifier is nil. ContainersOf(*user.ID) ([]cid.ID, error) + IterateContainersOf(*user.ID, func(cid.ID) error) error } // Writer is an interface of container storage updater. @@ -215,20 +216,42 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR return fmt.Errorf("invalid user ID: %w", err) } - cnrs, err := s.rdr.ContainersOf(&id) - if err != nil { + resBody := new(container.ListStreamResponseBody) + r := new(container.ListStreamResponse) + r.SetBody(resBody) + + var cidList []cid.ID + processCID := func(id cid.ID) error { + select { + case <-stream.Context().Done(): + return stream.Context().Err() + default: + } + cidList = append(cidList, id) + if len(cidList) == 512 { // 512 is batch size from pkg/morph/client/container/containers_of.go + r.GetBody().SetContainerIDs(getRefCIDList(cidList)) + cidList = make([]cid.ID, 0, 512) + return stream.Send(r) + } + return nil + } + + if err = s.rdr.IterateContainersOf(&id, processCID); err != nil { return err } + if len(cidList) > 0 { + r.GetBody().SetContainerIDs(getRefCIDList(cidList)) + return stream.Send(r) + } + + return nil +} + +func getRefCIDList(cnrs []cid.ID) []refs.ContainerID { cidList := make([]refs.ContainerID, len(cnrs)) for i := range cnrs { cnrs[i].WriteToV2(&cidList[i]) } - - resBody := new(container.ListStreamResponseBody) - resBody.SetContainerIDs(cidList) - r := new(container.ListStreamResponse) - r.SetBody(resBody) - - return stream.Send(r) + return cidList }