diff --git a/cmd/frostfs-node/config/container/container.go b/cmd/frostfs-node/config/container/container.go index b0b8043d6..1cd64a6f8 100644 --- a/cmd/frostfs-node/config/container/container.go +++ b/cmd/frostfs-node/config/container/container.go @@ -6,7 +6,7 @@ const ( subsection = "container" listStreamSubsection = "list_stream" - // ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once. + // ContainerBatchSizeDefault represents the maximum amount of containers to send via stream at once. ContainerBatchSizeDefault = 1000 ) diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index fb2550a03..be0acf738 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -222,6 +222,7 @@ type morphContainerReader struct { lister interface { ContainersOf(*user.ID) ([]cid.ID, error) + IterateContainersOf(*user.ID, func(cid.ID) error) error } } @@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) { return x.lister.ContainersOf(id) } +func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error { + return x.lister.IterateContainersOf(id, processCID) +} + type morphContainerWriter struct { neoClient *cntClient.Client } diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 37599e696..01fcc98e5 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -210,7 +210,7 @@ func (c *Client) Invoke(ctx context.Context, contract util.Uint160, fee fixedn.F // TestInvokeIterator invokes contract method returning an iterator and executes cb on each element. // If cb returns an error, the session is closed and this error is returned as-is. -// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned. +// If the remote neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned. // batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created. // The default batchSize is 100, the default limit from neo-go. func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error { diff --git a/pkg/morph/client/container/containers_of.go b/pkg/morph/client/container/containers_of.go index 5fe15be0d..074a586be 100644 --- a/pkg/morph/client/container/containers_of.go +++ b/pkg/morph/client/container/containers_of.go @@ -2,9 +2,7 @@ package container import ( "errors" - "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" @@ -16,27 +14,36 @@ import ( // // If remote RPC does not support neo-go session API, fallback to List() method. func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) { - var rawID []byte + var cidList []cid.ID + var err error + cb := func(id cid.ID) error { + cidList = append(cidList, id) + return nil + } + if err = c.IterateContainersOf(idUser, cb); err != nil { + return nil, err + } + return cidList, nil +} + +// iterateContainers iterates over a list of container identifiers +// belonging to the specified user of FrostFS system and executes +// `cb` on each element. If idUser is nil, calls it on the list of all containers. +func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item cid.ID) error) error { + var rawID []byte if idUser != nil { rawID = idUser.WalletBytes() } - var cidList []cid.ID - cb := func(item stackitem.Item) error { - rawID, err := client.BytesFromStackItem(item) + itemCb := func(item stackitem.Item) error { + id, err := getCIDfromStackItem(item) if err != nil { - return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err) + return err } - - var id cid.ID - - err = id.Decode(rawID) - if err != nil { - return fmt.Errorf("decode container ID: %w", err) + if err = cb(id); err != nil { + return err } - - cidList = append(cidList, id) return nil } @@ -50,13 +57,10 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) { const batchSize = 512 cnrHash := c.client.ContractAddress() - err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID) - if err != nil { - if errors.Is(err, unwrap.ErrNoSessionID) { - return c.list(idUser) - } - return nil, err + err := c.client.Morph().TestInvokeIterator(itemCb, batchSize, cnrHash, containersOfMethod, rawID) + if err != nil && errors.Is(err, unwrap.ErrNoSessionID) { + return c.iterate(idUser, cb) } - return cidList, nil + return err } diff --git a/pkg/morph/client/container/list.go b/pkg/morph/client/container/list.go index d9719aedd..78ea8278f 100644 --- a/pkg/morph/client/container/list.go +++ b/pkg/morph/client/container/list.go @@ -6,15 +6,16 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" ) -// list returns a list of container identifiers belonging +// iterate iterates through a list of container identifiers belonging // to the specified user of FrostFS system. The list is composed // through Container contract call. // -// Returns the identifiers of all FrostFS containers if pointer +// Iterates through the identifiers of all FrostFS containers if pointer // to user identifier is nil. -func (c *Client) list(idUser *user.ID) ([]cid.ID, error) { +func (c *Client) iterate(idUser *user.ID, cb func(cid.ID) error) error { var rawID []byte if idUser != nil { @@ -27,32 +28,41 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) { res, err := c.client.TestInvoke(prm) if err != nil { - return nil, fmt.Errorf("test invoke (%s): %w", listMethod, err) + return fmt.Errorf("test invoke (%s): %w", listMethod, err) } else if ln := len(res); ln != 1 { - return nil, fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln) + return fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln) } res, err = client.ArrayFromStackItem(res[0]) if err != nil { - return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err) + return fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err) } - cidList := make([]cid.ID, 0, len(res)) for i := range res { - rawID, err := client.BytesFromStackItem(res[i]) + id, err := getCIDfromStackItem(res[i]) if err != nil { - return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err) + return err } - var id cid.ID - - err = id.Decode(rawID) - if err != nil { - return nil, fmt.Errorf("decode container ID: %w", err) + if err = cb(id); err != nil { + return err } - - cidList = append(cidList, id) } - return cidList, nil + return nil +} + +func getCIDfromStackItem(item stackitem.Item) (cid.ID, error) { + rawID, err := client.BytesFromStackItem(item) + if err != nil { + return cid.ID{}, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err) + } + + var id cid.ID + + err = id.Decode(rawID) + if err != nil { + return cid.ID{}, fmt.Errorf("decode container ID: %w", err) + } + return id, nil } diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index e9d1606f1..cadf92e19 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -30,6 +30,7 @@ type Reader interface { // to the specified user of FrostFS system. Returns the identifiers // of all FrostFS containers if pointer to owner identifier is nil. ContainersOf(*user.ID) ([]cid.ID, error) + IterateContainersOf(*user.ID, func(cid.ID) error) error } // Writer is an interface of container storage updater. @@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) return res, nil } -func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { +func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { body := req.GetBody() idV2 := body.GetOwnerID() if idV2 == nil { @@ -215,20 +216,41 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR return fmt.Errorf("invalid user ID: %w", err) } - cnrs, err := s.rdr.ContainersOf(&id) - if err != nil { - return err - } - - cidList := make([]refs.ContainerID, len(cnrs)) - for i := range cnrs { - cnrs[i].WriteToV2(&cidList[i]) - } - resBody := new(container.ListStreamResponseBody) - resBody.SetContainerIDs(cidList) r := new(container.ListStreamResponse) r.SetBody(resBody) - return stream.Send(r) + var cidList []refs.ContainerID + + // Amount of containers to send at once. + const batchSize = 1000 + + processCID := func(id cid.ID) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var refID refs.ContainerID + id.WriteToV2(&refID) + cidList = append(cidList, refID) + if len(cidList) == batchSize { + r.GetBody().SetContainerIDs(cidList) + cidList = cidList[:0] + return stream.Send(r) + } + return nil + } + + if err = s.rdr.IterateContainersOf(&id, processCID); err != nil { + return err + } + + if len(cidList) > 0 { + r.GetBody().SetContainerIDs(cidList) + return stream.Send(r) + } + + return nil }