[#1577] container: Reduce iterations through container list
Some checks failed
Vulncheck / Vulncheck (pull_request) Has been cancelled
Tests and linters / Tests (pull_request) Failing after 14m5s
Tests and linters / Staticcheck (pull_request) Has been cancelled
Tests and linters / Lint (pull_request) Has been cancelled
Tests and linters / Run gofumpt (pull_request) Has been cancelled
Tests and linters / gopls check (pull_request) Has been cancelled
Tests and linters / Tests with -race (pull_request) Has been cancelled
Pre-commit hooks / Pre-commit (pull_request) Has been cancelled
DCO action / DCO (pull_request) Has been cancelled
Build / Build Components (pull_request) Has been cancelled

* Separated iteration through container ids from `ContainersOf()`
  so that it could be reused.
* When listing containers we used to iterate through the
  the whole list of containers twice: first when reading from
  a contract, then when sending them. Now we can send batches
  of containers when reading from the contract.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
Ekaterina Lebedeva 2024-12-25 14:25:29 +03:00
parent 6fe34d266a
commit 76692f019f
4 changed files with 101 additions and 59 deletions

View file

@ -222,6 +222,7 @@ type morphContainerReader struct {
lister interface { lister interface {
ContainersOf(*user.ID) ([]cid.ID, error) ContainersOf(*user.ID) ([]cid.ID, error)
IterateContainersOf(*user.ID, func(cid.ID) error) error
} }
} }
@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id) return x.lister.ContainersOf(id)
} }
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
return x.lister.IterateContainersOf(id, processCID)
}
type morphContainerWriter struct { type morphContainerWriter struct {
neoClient *cntClient.Client neoClient *cntClient.Client
} }

View file

@ -2,61 +2,65 @@ package container
import ( import (
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
) )
// We would like to have batch size as big as possible,
// to reduce the number of round-trips and avoid creating sessions.
// The limit depends on 2 things:
// 1. VM limits: max 2048 items on stack.
// 2. JSON encoded size for the item with type = 128k.
// It turns out, that for container ID the second limit is hit first,
// 512 is big enough value and it is beautiful.
const batchSize = 512
// ContainersOf returns a list of container identifiers belonging // ContainersOf returns a list of container identifiers belonging
// to the specified user of FrostFS system. If idUser is nil, returns the list of all containers. // to the specified user of FrostFS system. If idUser is nil, returns the list of all containers.
// //
// If remote RPC does not support neo-go session API, fallback to List() method. // If remote RPC does not support neo-go session API, fallback to List() method.
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) { func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
var rawID []byte var cidList []cid.ID
var err error
cb := func(id cid.ID) error {
cidList = append(cidList, id)
return nil
}
if err = c.IterateContainersOf(idUser, cb); err != nil {
return nil, err
}
return cidList, nil
}
// iterateContainers iterates over a list of container identifiers
// belonging to the specified user of FrostFS system and executes
// `cb` on each element. If idUser is nil, calls it on the list of all containers.
func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item cid.ID) error) error {
var rawID []byte
if idUser != nil { if idUser != nil {
rawID = idUser.WalletBytes() rawID = idUser.WalletBytes()
} }
var cidList []cid.ID cnrHash := c.client.ContractAddress()
cb := func(item stackitem.Item) error { itemCb := func(item stackitem.Item) error {
rawID, err := client.BytesFromStackItem(item) id, err := getCIDfromStackItem(item)
if err != nil { if err != nil {
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err) return err
} }
if err = cb(id); err != nil {
var id cid.ID return err
err = id.Decode(rawID)
if err != nil {
return fmt.Errorf("decode container ID: %w", err)
} }
cidList = append(cidList, id)
return nil return nil
} }
// We would like to have batch size as big as possible, err := c.client.Morph().TestInvokeIterator(itemCb, batchSize, cnrHash, containersOfMethod, rawID)
// to reduce the number of round-trips and avoid creating sessions. if err != nil && errors.Is(err, unwrap.ErrNoSessionID) {
// The limit depends on 2 things: return c.iterate(idUser, cb)
// 1. VM limits: max 2048 items on stack.
// 2. JSON encoded size for the item with type = 128k.
// It turns out, that for container ID the second limit is hit first,
// 512 is big enough value and it is beautiful.
const batchSize = 512
cnrHash := c.client.ContractAddress()
err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID)
if err != nil {
if errors.Is(err, unwrap.ErrNoSessionID) {
return c.list(idUser)
}
return nil, err
} }
return cidList, nil return err
} }

View file

@ -6,15 +6,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
) )
// list returns a list of container identifiers belonging // iterate iterates through container identifiers belonging
// to the specified user of FrostFS system. The list is composed // to the specified user of FrostFS system. The iterate is composed
// through Container contract call. // through Container contract call.
// //
// Returns the identifiers of all FrostFS containers if pointer // Returns the identifiers of all FrostFS containers if pointer
// to user identifier is nil. // to user identifier is nil.
func (c *Client) list(idUser *user.ID) ([]cid.ID, error) { func (c *Client) iterate(idUser *user.ID, cb func(cid.ID) error) error {
var rawID []byte var rawID []byte
if idUser != nil { if idUser != nil {
@ -27,32 +28,41 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
res, err := c.client.TestInvoke(prm) res, err := c.client.TestInvoke(prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("test invoke (%s): %w", listMethod, err) return fmt.Errorf("test invoke (%s): %w", listMethod, err)
} else if ln := len(res); ln != 1 { } else if ln := len(res); ln != 1 {
return nil, fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln) return fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln)
} }
res, err = client.ArrayFromStackItem(res[0]) res, err = client.ArrayFromStackItem(res[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err) return fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
} }
cidList := make([]cid.ID, 0, len(res))
for i := range res { for i := range res {
rawID, err := client.BytesFromStackItem(res[i]) id, err := getCIDfromStackItem(res[i])
if err != nil { if err != nil {
return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err) return err
}
if err = cb(id); err != nil {
return err
}
}
return nil
}
func getCIDfromStackItem(item stackitem.Item) (cid.ID, error) {
rawID, err := client.BytesFromStackItem(item)
if err != nil {
return [32]byte{}, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
} }
var id cid.ID var id cid.ID
err = id.Decode(rawID) err = id.Decode(rawID)
if err != nil { if err != nil {
return nil, fmt.Errorf("decode container ID: %w", err) return [32]byte{}, fmt.Errorf("decode container ID: %w", err)
} }
return id, nil
cidList = append(cidList, id)
}
return cidList, nil
} }

View file

@ -30,6 +30,7 @@ type Reader interface {
// to the specified user of FrostFS system. Returns the identifiers // to the specified user of FrostFS system. Returns the identifiers
// of all FrostFS containers if pointer to owner identifier is nil. // of all FrostFS containers if pointer to owner identifier is nil.
ContainersOf(*user.ID) ([]cid.ID, error) ContainersOf(*user.ID) ([]cid.ID, error)
IterateContainersOf(*user.ID, func(cid.ID) error) error
} }
// Writer is an interface of container storage updater. // Writer is an interface of container storage updater.
@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil return res, nil
} }
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody() body := req.GetBody()
idV2 := body.GetOwnerID() idV2 := body.GetOwnerID()
if idV2 == nil { if idV2 == nil {
@ -215,20 +216,42 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
return fmt.Errorf("invalid user ID: %w", err) return fmt.Errorf("invalid user ID: %w", err)
} }
cnrs, err := s.rdr.ContainersOf(&id) resBody := new(container.ListStreamResponseBody)
if err != nil { r := new(container.ListStreamResponse)
r.SetBody(resBody)
var cidList []cid.ID
processCID := func(id cid.ID) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
cidList = append(cidList, id)
if len(cidList) == 512 { // 512 is batch size from pkg/morph/client/container/containers_of.go
r.GetBody().SetContainerIDs(getRefCIDList(cidList))
cidList = make([]cid.ID, 0, 512)
return stream.Send(r)
}
return nil
}
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
return err return err
} }
if len(cidList) > 0 {
r.GetBody().SetContainerIDs(getRefCIDList(cidList))
return stream.Send(r)
}
return nil
}
func getRefCIDList(cnrs []cid.ID) []refs.ContainerID {
cidList := make([]refs.ContainerID, len(cnrs)) cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs { for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i]) cnrs[i].WriteToV2(&cidList[i])
} }
return cidList
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
} }