container: Reduce iterations through container list #1577
|
@ -6,7 +6,7 @@ const (
|
||||||
subsection = "container"
|
subsection = "container"
|
||||||
listStreamSubsection = "list_stream"
|
listStreamSubsection = "list_stream"
|
||||||
|
|
||||||
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
|
// ContainerBatchSizeDefault represents the maximum amount of containers to send via stream at once.
|
||||||
ContainerBatchSizeDefault = 1000
|
ContainerBatchSizeDefault = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -222,6 +222,7 @@ type morphContainerReader struct {
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
|
||||||
|
return x.lister.IterateContainersOf(id, processCID)
|
||||||
|
}
|
||||||
|
|
||||||
type morphContainerWriter struct {
|
type morphContainerWriter struct {
|
||||||
neoClient *cntClient.Client
|
neoClient *cntClient.Client
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,7 +210,7 @@ func (c *Client) Invoke(ctx context.Context, contract util.Uint160, fee fixedn.F
|
||||||
|
|
||||||
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
|
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
|
||||||
// If cb returns an error, the session is closed and this error is returned as-is.
|
// If cb returns an error, the session is closed and this error is returned as-is.
|
||||||
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
// If the remote neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
||||||
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
|
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
|
||||||
// The default batchSize is 100, the default limit from neo-go.
|
// The default batchSize is 100, the default limit from neo-go.
|
||||||
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
|
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
|
||||||
|
|
|
@ -2,9 +2,7 @@ package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
@ -16,27 +14,36 @@ import (
|
||||||
//
|
//
|
||||||
// If remote RPC does not support neo-go session API, fallback to List() method.
|
// If remote RPC does not support neo-go session API, fallback to List() method.
|
||||||
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
var rawID []byte
|
var cidList []cid.ID
|
||||||
|
var err error
|
||||||
|
|
||||||
|
cb := func(id cid.ID) error {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
If If `prm.Stream != nil` what are the return values for this function?
elebedeva
commented
`nil, error` or `nil, nil`
fyrchik
commented
That by itself is a good reason to have a separate function. That by itself is a good reason to have a separate function.
The code could be reused (`ContainersOf` can use `IterateContainersOf`)
elebedeva
commented
Moved iteration over containers from Moved iteration over containers from `ContainersOf` to separate function and added new function which can perform container batch processing.
|
|||||||
|
cidList = append(cidList, id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err = c.IterateContainersOf(idUser, cb); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cidList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// iterateContainers iterates over a list of container identifiers
|
||||||
|
// belonging to the specified user of FrostFS system and executes
|
||||||
|
// `cb` on each element. If idUser is nil, calls it on the list of all containers.
|
||||||
|
func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item cid.ID) error) error {
|
||||||
|
var rawID []byte
|
||||||
if idUser != nil {
|
if idUser != nil {
|
||||||
rawID = idUser.WalletBytes()
|
rawID = idUser.WalletBytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
var cidList []cid.ID
|
itemCb := func(item stackitem.Item) error {
|
||||||
fyrchik
commented
This line was right before the This line was right before the `TestInvokeIterator` previously, why has it moved?
elebedeva
commented
Accidentally. Moved it back. Accidentally. Moved it back.
|
|||||||
cb := func(item stackitem.Item) error {
|
id, err := getCIDfromStackItem(item)
|
||||||
rawID, err := client.BytesFromStackItem(item)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
|
return err
|
||||||
}
|
}
|
||||||
|
if err = cb(id); err != nil {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Unrelated to the commit. Unrelated to the commit.
|
|||||||
var id cid.ID
|
return err
|
||||||
|
|
||||||
err = id.Decode(rawID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("decode container ID: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList = append(cidList, id)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,13 +57,10 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
const batchSize = 512
|
const batchSize = 512
|
||||||
|
|
||||||
cnrHash := c.client.ContractAddress()
|
cnrHash := c.client.ContractAddress()
|
||||||
err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID)
|
err := c.client.Morph().TestInvokeIterator(itemCb, batchSize, cnrHash, containersOfMethod, rawID)
|
||||||
if err != nil {
|
if err != nil && errors.Is(err, unwrap.ErrNoSessionID) {
|
||||||
if errors.Is(err, unwrap.ErrNoSessionID) {
|
return c.iterate(idUser, cb)
|
||||||
return c.list(idUser)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cidList, nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,15 +6,16 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||||
)
|
)
|
||||||
|
|
||||||
// list returns a list of container identifiers belonging
|
// iterate iterates through a list of container identifiers belonging
|
||||||
// to the specified user of FrostFS system. The list is composed
|
// to the specified user of FrostFS system. The list is composed
|
||||||
// through Container contract call.
|
// through Container contract call.
|
||||||
//
|
//
|
||||||
// Returns the identifiers of all FrostFS containers if pointer
|
// Iterates through the identifiers of all FrostFS containers if pointer
|
||||||
// to user identifier is nil.
|
// to user identifier is nil.
|
||||||
func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
|
func (c *Client) iterate(idUser *user.ID, cb func(cid.ID) error) error {
|
||||||
var rawID []byte
|
var rawID []byte
|
||||||
|
|
||||||
if idUser != nil {
|
if idUser != nil {
|
||||||
|
@ -27,32 +28,41 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
|
||||||
|
|
||||||
res, err := c.client.TestInvoke(prm)
|
res, err := c.client.TestInvoke(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("test invoke (%s): %w", listMethod, err)
|
return fmt.Errorf("test invoke (%s): %w", listMethod, err)
|
||||||
} else if ln := len(res); ln != 1 {
|
} else if ln := len(res); ln != 1 {
|
||||||
return nil, fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln)
|
return fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = client.ArrayFromStackItem(res[0])
|
res, err = client.ArrayFromStackItem(res[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
|
return fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList := make([]cid.ID, 0, len(res))
|
|
||||||
for i := range res {
|
for i := range res {
|
||||||
rawID, err := client.BytesFromStackItem(res[i])
|
id, err := getCIDfromStackItem(res[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var id cid.ID
|
if err = cb(id); err != nil {
|
||||||
|
return err
|
||||||
err = id.Decode(rawID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("decode container ID: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList = append(cidList, id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cidList, nil
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCIDfromStackItem(item stackitem.Item) (cid.ID, error) {
|
||||||
|
rawID, err := client.BytesFromStackItem(item)
|
||||||
|
if err != nil {
|
||||||
|
return cid.ID{}, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
`[32]byte{}` -> `cid.ID{}`? :)
elebedeva
commented
Fixed! Fixed!
elebedeva
commented
Fixed! Fixed!
|
|||||||
|
}
|
||||||
|
|
||||||
|
var id cid.ID
|
||||||
|
|
||||||
|
err = id.Decode(rawID)
|
||||||
|
if err != nil {
|
||||||
|
return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ type Reader interface {
|
||||||
// to the specified user of FrostFS system. Returns the identifiers
|
// to the specified user of FrostFS system. Returns the identifiers
|
||||||
// of all FrostFS containers if pointer to owner identifier is nil.
|
// of all FrostFS containers if pointer to owner identifier is nil.
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*user.ID) ([]cid.ID, error)
|
||||||
|
IterateContainersOf(*user.ID, func(cid.ID) error) error
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
1. `WithProcessing` is something unfamiliar, `Iterate` is more common.
2. `func([]cid.ID) error` looks like unnecessary complexity, why not `func(cid.ID) error`?
elebedeva
commented
The idea was to make it obvious from the method signature that just like The idea was to make it obvious from the method signature that just like `ContainersOf` returns `cid list`, new method allows to perform some action on this `cid list`. E.g. if in `func([]cid.ID) error` I would append passed `cid list` to my list, I would get the same result as if I used `ContainersOf` - `[]cid.ID` and `error`.
elebedeva
commented
I did as you suggested, code looks more clean now. I did as you suggested, code looks more clean now.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Writer is an interface of container storage updater.
|
// Writer is an interface of container storage updater.
|
||||||
|
@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
How had the context appeared here? `ListStream` signature doesn't have any context.
```
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
```
How had the context appeared here?
elebedeva
commented
That's because
That's because `ContainerServiceClient` requires context and `ContainerServiceServer` does not.
Please see https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/branch/master/api/container/grpc/service_grpc.pb.go
```go
type ContainerServiceClient interface {
// ...
ListStream(ctx context.Context, in *ListStreamRequest, opts ...grpc.CallOption) (ContainerService_ListStreamClient, error)
}
```
```go
type ContainerServiceServer interface {
// ...
ListStream(*ListStreamRequest, ContainerService_ListStreamServer) error
}
```
|
|||||||
body := req.GetBody()
|
body := req.GetBody()
|
||||||
idV2 := body.GetOwnerID()
|
idV2 := body.GetOwnerID()
|
||||||
if idV2 == nil {
|
if idV2 == nil {
|
||||||
|
@ -215,20 +216,41 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
|
||||||
return fmt.Errorf("invalid user ID: %w", err)
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrs, err := s.rdr.ContainersOf(&id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cidList := make([]refs.ContainerID, len(cnrs))
|
|
||||||
for i := range cnrs {
|
|
||||||
cnrs[i].WriteToV2(&cidList[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
resBody := new(container.ListStreamResponseBody)
|
resBody := new(container.ListStreamResponseBody)
|
||||||
resBody.SetContainerIDs(cidList)
|
|
||||||
r := new(container.ListStreamResponse)
|
r := new(container.ListStreamResponse)
|
||||||
r.SetBody(resBody)
|
r.SetBody(resBody)
|
||||||
|
|
||||||
return stream.Send(r)
|
var cidList []refs.ContainerID
|
||||||
|
|
||||||
|
// Amount of containers to send at once.
|
||||||
|
const batchSize = 1000
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
->
->
:) ```go
ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream)
```
->
```go
ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream)
```
->
```go
case <-ctx.Done()
```
:)
elebedeva
commented
Fixed, thanks! Fixed, thanks!
|
|||||||
|
|
||||||
|
processCID := func(id cid.ID) error {
|
||||||
|
select {
|
||||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Woulb be nice to check if ctx is cancelled inside this cb. Woulb be nice to check if ctx is cancelled inside this cb.
elebedeva
commented
Added context check. Added context check.
|
|||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
elebedeva marked this conversation as resolved
Outdated
elebedeva
commented
What should we use as What should we use as `batch size` here?
Or we could send ids one by one but for a large number of containers it seems like a bad idea.
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This batch size should be different, though. This batch size should be different, though.
It is more related to the transport level.
I would use a separate constant here.
elebedeva
commented
Added new local constant. Added new local constant.
|
|||||||
|
default:
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
After After `getRefCIDList` the old `cidList` is no longer used, so you can just have `cidList = cidList[:0]` here.
This saves us an allocation, removes magic constant and allows us to reuse memory.
fyrchik
commented
Also, you can have Also, you can have `var cidList []refs.ContainerID` here, and parse each ID separately.
This saves us another slice allocation for conversion stage.
elebedeva
commented
You're right, too many unnecessary actions here. Fixed! You're right, too many unnecessary actions here. Fixed!
|
|||||||
|
|
||||||
aarifullin
commented
Nice. That's what I actually expected from list container streaming 👍 Nice. That's what I actually expected from list container streaming 👍
|
|||||||
|
var refID refs.ContainerID
|
||||||
|
id.WriteToV2(&refID)
|
||||||
|
cidList = append(cidList, refID)
|
||||||
|
if len(cidList) == batchSize {
|
||||||
|
r.GetBody().SetContainerIDs(cidList)
|
||||||
|
cidList = cidList[:0]
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cidList) > 0 {
|
||||||
|
r.GetBody().SetContainerIDs(cidList)
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
This dependency should not be here.
pkg/morph/*
communicates with neo-go,pkg/services/container/*
implements buisness logic.Ok, fixed.