container: Reduce iterations through container list #1577
6 changed files with 95 additions and 54 deletions
|
@ -6,7 +6,7 @@ const (
|
||||||
subsection = "container"
|
subsection = "container"
|
||||||
listStreamSubsection = "list_stream"
|
listStreamSubsection = "list_stream"
|
||||||
|
|
||||||
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
|
// ContainerBatchSizeDefault represents the maximum amount of containers to send via stream at once.
|
||||||
ContainerBatchSizeDefault = 1000
|
ContainerBatchSizeDefault = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -222,6 +222,7 @@ type morphContainerReader struct {
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
|
||||||
|
return x.lister.IterateContainersOf(id, processCID)
|
||||||
|
}
|
||||||
|
|
||||||
type morphContainerWriter struct {
|
type morphContainerWriter struct {
|
||||||
neoClient *cntClient.Client
|
neoClient *cntClient.Client
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,7 +210,7 @@ func (c *Client) Invoke(ctx context.Context, contract util.Uint160, fee fixedn.F
|
||||||
|
|
||||||
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
|
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
|
||||||
// If cb returns an error, the session is closed and this error is returned as-is.
|
// If cb returns an error, the session is closed and this error is returned as-is.
|
||||||
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
// If the remote neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
||||||
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
|
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
|
||||||
// The default batchSize is 100, the default limit from neo-go.
|
// The default batchSize is 100, the default limit from neo-go.
|
||||||
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
|
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
|
||||||
|
|
|
@ -2,9 +2,7 @@ package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
|
@ -16,27 +14,36 @@ import (
|
||||||
//
|
//
|
||||||
// If remote RPC does not support neo-go session API, fallback to List() method.
|
// If remote RPC does not support neo-go session API, fallback to List() method.
|
||||||
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
var rawID []byte
|
var cidList []cid.ID
|
||||||
|
var err error
|
||||||
|
|
||||||
|
cb := func(id cid.ID) error {
|
||||||
|
cidList = append(cidList, id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err = c.IterateContainersOf(idUser, cb); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cidList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// iterateContainers iterates over a list of container identifiers
|
||||||
|
// belonging to the specified user of FrostFS system and executes
|
||||||
|
// `cb` on each element. If idUser is nil, calls it on the list of all containers.
|
||||||
|
func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item cid.ID) error) error {
|
||||||
|
var rawID []byte
|
||||||
if idUser != nil {
|
if idUser != nil {
|
||||||
rawID = idUser.WalletBytes()
|
rawID = idUser.WalletBytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
var cidList []cid.ID
|
itemCb := func(item stackitem.Item) error {
|
||||||
|
|||||||
cb := func(item stackitem.Item) error {
|
id, err := getCIDfromStackItem(item)
|
||||||
rawID, err := client.BytesFromStackItem(item)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
|
return err
|
||||||
}
|
}
|
||||||
|
if err = cb(id); err != nil {
|
||||||
var id cid.ID
|
return err
|
||||||
|
|
||||||
err = id.Decode(rawID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("decode container ID: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList = append(cidList, id)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,13 +57,10 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
const batchSize = 512
|
const batchSize = 512
|
||||||
|
|
||||||
cnrHash := c.client.ContractAddress()
|
cnrHash := c.client.ContractAddress()
|
||||||
err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID)
|
err := c.client.Morph().TestInvokeIterator(itemCb, batchSize, cnrHash, containersOfMethod, rawID)
|
||||||
if err != nil {
|
if err != nil && errors.Is(err, unwrap.ErrNoSessionID) {
|
||||||
if errors.Is(err, unwrap.ErrNoSessionID) {
|
return c.iterate(idUser, cb)
|
||||||
return c.list(idUser)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cidList, nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,15 +6,16 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||||
)
|
)
|
||||||
|
|
||||||
// list returns a list of container identifiers belonging
|
// iterate iterates through a list of container identifiers belonging
|
||||||
// to the specified user of FrostFS system. The list is composed
|
// to the specified user of FrostFS system. The list is composed
|
||||||
// through Container contract call.
|
// through Container contract call.
|
||||||
//
|
//
|
||||||
// Returns the identifiers of all FrostFS containers if pointer
|
// Iterates through the identifiers of all FrostFS containers if pointer
|
||||||
// to user identifier is nil.
|
// to user identifier is nil.
|
||||||
func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
|
func (c *Client) iterate(idUser *user.ID, cb func(cid.ID) error) error {
|
||||||
var rawID []byte
|
var rawID []byte
|
||||||
|
|
||||||
if idUser != nil {
|
if idUser != nil {
|
||||||
|
@ -27,32 +28,41 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
|
||||||
|
|
||||||
res, err := c.client.TestInvoke(prm)
|
res, err := c.client.TestInvoke(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("test invoke (%s): %w", listMethod, err)
|
return fmt.Errorf("test invoke (%s): %w", listMethod, err)
|
||||||
} else if ln := len(res); ln != 1 {
|
} else if ln := len(res); ln != 1 {
|
||||||
return nil, fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln)
|
return fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = client.ArrayFromStackItem(res[0])
|
res, err = client.ArrayFromStackItem(res[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
|
return fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList := make([]cid.ID, 0, len(res))
|
|
||||||
for i := range res {
|
for i := range res {
|
||||||
rawID, err := client.BytesFromStackItem(res[i])
|
id, err := getCIDfromStackItem(res[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = cb(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCIDfromStackItem(item stackitem.Item) (cid.ID, error) {
|
||||||
|
rawID, err := client.BytesFromStackItem(item)
|
||||||
|
if err != nil {
|
||||||
|
return cid.ID{}, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var id cid.ID
|
var id cid.ID
|
||||||
|
|
||||||
err = id.Decode(rawID)
|
err = id.Decode(rawID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("decode container ID: %w", err)
|
return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
|
||||||
}
|
}
|
||||||
|
return id, nil
|
||||||
cidList = append(cidList, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
return cidList, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ type Reader interface {
|
||||||
// to the specified user of FrostFS system. Returns the identifiers
|
// to the specified user of FrostFS system. Returns the identifiers
|
||||||
// of all FrostFS containers if pointer to owner identifier is nil.
|
// of all FrostFS containers if pointer to owner identifier is nil.
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writer is an interface of container storage updater.
|
// Writer is an interface of container storage updater.
|
||||||
|
@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||||
body := req.GetBody()
|
body := req.GetBody()
|
||||||
idV2 := body.GetOwnerID()
|
idV2 := body.GetOwnerID()
|
||||||
if idV2 == nil {
|
if idV2 == nil {
|
||||||
|
@ -215,20 +216,41 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
|
||||||
return fmt.Errorf("invalid user ID: %w", err)
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrs, err := s.rdr.ContainersOf(&id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cidList := make([]refs.ContainerID, len(cnrs))
|
|
||||||
for i := range cnrs {
|
|
||||||
cnrs[i].WriteToV2(&cidList[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
resBody := new(container.ListStreamResponseBody)
|
resBody := new(container.ListStreamResponseBody)
|
||||||
resBody.SetContainerIDs(cidList)
|
|
||||||
r := new(container.ListStreamResponse)
|
r := new(container.ListStreamResponse)
|
||||||
r.SetBody(resBody)
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
var cidList []refs.ContainerID
|
||||||
|
|
||||||
|
// Amount of containers to send at once.
|
||||||
|
const batchSize = 1000
|
||||||
|
|
||||||
|
processCID := func(id cid.ID) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
var refID refs.ContainerID
|
||||||
|
id.WriteToV2(&refID)
|
||||||
|
cidList = append(cidList, refID)
|
||||||
|
if len(cidList) == batchSize {
|
||||||
|
r.GetBody().SetContainerIDs(cidList)
|
||||||
|
cidList = cidList[:0]
|
||||||
return stream.Send(r)
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cidList) > 0 {
|
||||||
|
r.GetBody().SetContainerIDs(cidList)
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
This line was right before the
TestInvokeIterator
previously, why has it moved?Accidentally. Moved it back.