diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index fb2550a03..be0acf738 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -222,6 +222,7 @@ type morphContainerReader struct { lister interface { ContainersOf(*user.ID) ([]cid.ID, error) + IterateContainersOf(*user.ID, func(cid.ID) error) error } } @@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) { return x.lister.ContainersOf(id) } +func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error { + return x.lister.IterateContainersOf(id, processCID) +} + type morphContainerWriter struct { neoClient *cntClient.Client } diff --git a/pkg/morph/client/container/containers_of.go b/pkg/morph/client/container/containers_of.go index 5fe15be0d..5482a5111 100644 --- a/pkg/morph/client/container/containers_of.go +++ b/pkg/morph/client/container/containers_of.go @@ -2,61 +2,65 @@ package container import ( "errors" - "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" ) +// We would like to have batch size as big as possible, +// to reduce the number of round-trips and avoid creating sessions. +// The limit depends on 2 things: +// 1. VM limits: max 2048 items on stack. +// 2. JSON encoded size for the item with type = 128k. +// It turns out, that for container ID the second limit is hit first, +// 512 is big enough value and it is beautiful. +const batchSize = 512 + // ContainersOf returns a list of container identifiers belonging // to the specified user of FrostFS system. If idUser is nil, returns the list of all containers. // // If remote RPC does not support neo-go session API, fallback to List() method. func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) { - var rawID []byte + var cidList []cid.ID + var err error + cb := func(id cid.ID) error { + cidList = append(cidList, id) + return nil + } + if err = c.IterateContainersOf(idUser, cb); err != nil { + return nil, err + } + return cidList, nil +} + +// iterateContainers iterates over a list of container identifiers +// belonging to the specified user of FrostFS system and executes +// `cb` on each element. If idUser is nil, calls it on the list of all containers. +func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item cid.ID) error) error { + var rawID []byte if idUser != nil { rawID = idUser.WalletBytes() } - var cidList []cid.ID - cb := func(item stackitem.Item) error { - rawID, err := client.BytesFromStackItem(item) + cnrHash := c.client.ContractAddress() + itemCb := func(item stackitem.Item) error { + id, err := getCIDfromStackItem(item) if err != nil { - return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err) + return err } - - var id cid.ID - - err = id.Decode(rawID) - if err != nil { - return fmt.Errorf("decode container ID: %w", err) + if err = cb(id); err != nil { + return err } - - cidList = append(cidList, id) return nil } - // We would like to have batch size as big as possible, - // to reduce the number of round-trips and avoid creating sessions. - // The limit depends on 2 things: - // 1. VM limits: max 2048 items on stack. - // 2. JSON encoded size for the item with type = 128k. - // It turns out, that for container ID the second limit is hit first, - // 512 is big enough value and it is beautiful. - const batchSize = 512 - - cnrHash := c.client.ContractAddress() - err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID) - if err != nil { - if errors.Is(err, unwrap.ErrNoSessionID) { - return c.list(idUser) - } - return nil, err + err := c.client.Morph().TestInvokeIterator(itemCb, batchSize, cnrHash, containersOfMethod, rawID) + if err != nil && errors.Is(err, unwrap.ErrNoSessionID) { + return c.iterate(idUser, cb) } - return cidList, nil + return err } diff --git a/pkg/morph/client/container/list.go b/pkg/morph/client/container/list.go index d9719aedd..ca31742be 100644 --- a/pkg/morph/client/container/list.go +++ b/pkg/morph/client/container/list.go @@ -6,15 +6,16 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" ) -// list returns a list of container identifiers belonging -// to the specified user of FrostFS system. The list is composed +// iterate iterates through container identifiers belonging +// to the specified user of FrostFS system. The iterate is composed // through Container contract call. // // Returns the identifiers of all FrostFS containers if pointer // to user identifier is nil. -func (c *Client) list(idUser *user.ID) ([]cid.ID, error) { +func (c *Client) iterate(idUser *user.ID, cb func(cid.ID) error) error { var rawID []byte if idUser != nil { @@ -27,32 +28,41 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) { res, err := c.client.TestInvoke(prm) if err != nil { - return nil, fmt.Errorf("test invoke (%s): %w", listMethod, err) + return fmt.Errorf("test invoke (%s): %w", listMethod, err) } else if ln := len(res); ln != 1 { - return nil, fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln) + return fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln) } res, err = client.ArrayFromStackItem(res[0]) if err != nil { - return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err) + return fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err) } - cidList := make([]cid.ID, 0, len(res)) for i := range res { - rawID, err := client.BytesFromStackItem(res[i]) + id, err := getCIDfromStackItem(res[i]) if err != nil { - return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err) + return err } - var id cid.ID - - err = id.Decode(rawID) - if err != nil { - return nil, fmt.Errorf("decode container ID: %w", err) + if err = cb(id); err != nil { + return err } - - cidList = append(cidList, id) } - return cidList, nil + return nil +} + +func getCIDfromStackItem(item stackitem.Item) (cid.ID, error) { + rawID, err := client.BytesFromStackItem(item) + if err != nil { + return [32]byte{}, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err) + } + + var id cid.ID + + err = id.Decode(rawID) + if err != nil { + return [32]byte{}, fmt.Errorf("decode container ID: %w", err) + } + return id, nil } diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index e9d1606f1..1f77357aa 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -30,6 +30,7 @@ type Reader interface { // to the specified user of FrostFS system. Returns the identifiers // of all FrostFS containers if pointer to owner identifier is nil. ContainersOf(*user.ID) ([]cid.ID, error) + IterateContainersOf(*user.ID, func(cid.ID) error) error } // Writer is an interface of container storage updater. @@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) return res, nil } -func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { +func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { body := req.GetBody() idV2 := body.GetOwnerID() if idV2 == nil { @@ -215,20 +216,42 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR return fmt.Errorf("invalid user ID: %w", err) } - cnrs, err := s.rdr.ContainersOf(&id) - if err != nil { + resBody := new(container.ListStreamResponseBody) + r := new(container.ListStreamResponse) + r.SetBody(resBody) + + var cidList []cid.ID + processCID := func(id cid.ID) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + cidList = append(cidList, id) + if len(cidList) == 512 { // 512 is batch size from pkg/morph/client/container/containers_of.go + r.GetBody().SetContainerIDs(getRefCIDList(cidList)) + cidList = make([]cid.ID, 0, 512) + return stream.Send(r) + } + return nil + } + + if err = s.rdr.IterateContainersOf(&id, processCID); err != nil { return err } + if len(cidList) > 0 { + r.GetBody().SetContainerIDs(getRefCIDList(cidList)) + return stream.Send(r) + } + + return nil +} + +func getRefCIDList(cnrs []cid.ID) []refs.ContainerID { cidList := make([]refs.ContainerID, len(cnrs)) for i := range cnrs { cnrs[i].WriteToV2(&cidList[i]) } - - resBody := new(container.ListStreamResponseBody) - resBody.SetContainerIDs(cidList) - r := new(container.ListStreamResponse) - r.SetBody(resBody) - - return stream.Send(r) + return cidList }