container: Reduce iterations through container list #1577

Merged
fyrchik merged 3 commits from elebedeva/frostfs-node:feat/container-batching into master 2024-12-28 10:05:35 +00:00
6 changed files with 95 additions and 54 deletions

View file

@ -6,7 +6,7 @@ const (
subsection = "container" subsection = "container"
listStreamSubsection = "list_stream" listStreamSubsection = "list_stream"
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once. // ContainerBatchSizeDefault represents the maximum amount of containers to send via stream at once.
ContainerBatchSizeDefault = 1000 ContainerBatchSizeDefault = 1000
) )

View file

@ -222,6 +222,7 @@ type morphContainerReader struct {
lister interface { lister interface {
ContainersOf(*user.ID) ([]cid.ID, error) ContainersOf(*user.ID) ([]cid.ID, error)
IterateContainersOf(*user.ID, func(cid.ID) error) error
} }
} }
@ -237,6 +238,10 @@ func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id) return x.lister.ContainersOf(id)
} }
func (x *morphContainerReader) IterateContainersOf(id *user.ID, processCID func(cid.ID) error) error {
return x.lister.IterateContainersOf(id, processCID)
}
type morphContainerWriter struct { type morphContainerWriter struct {
neoClient *cntClient.Client neoClient *cntClient.Client
} }

View file

@ -210,7 +210,7 @@ func (c *Client) Invoke(ctx context.Context, contract util.Uint160, fee fixedn.F
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element. // TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
// If cb returns an error, the session is closed and this error is returned as-is. // If cb returns an error, the session is closed and this error is returned as-is.
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned. // If the remote neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created. // batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
// The default batchSize is 100, the default limit from neo-go. // The default batchSize is 100, the default limit from neo-go.
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error { func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {

View file

@ -2,9 +2,7 @@ package container
import ( import (
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap" "github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
fyrchik marked this conversation as resolved Outdated

This dependency should not be here.
pkg/morph/* communicates with neo-go, pkg/services/container/* implements buisness logic.

This dependency should not be here. `pkg/morph/*` communicates with neo-go, `pkg/services/container/*` implements buisness logic.

Ok, fixed.

Ok, fixed.
@ -16,27 +14,36 @@ import (
// //
// If remote RPC does not support neo-go session API, fallback to List() method. // If remote RPC does not support neo-go session API, fallback to List() method.
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) { func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
var rawID []byte var cidList []cid.ID
var err error
cb := func(id cid.ID) error {
fyrchik marked this conversation as resolved Outdated

If prm.Stream != nil what are the return values for this function?

If `prm.Stream != nil` what are the return values for this function?

nil, error or nil, nil

`nil, error` or `nil, nil`

That by itself is a good reason to have a separate function.
The code could be reused (ContainersOf can use IterateContainersOf)

That by itself is a good reason to have a separate function. The code could be reused (`ContainersOf` can use `IterateContainersOf`)

Moved iteration over containers from ContainersOf to separate function and added new function which can perform container batch processing.

Moved iteration over containers from `ContainersOf` to separate function and added new function which can perform container batch processing.
cidList = append(cidList, id)
return nil
}
if err = c.IterateContainersOf(idUser, cb); err != nil {
return nil, err
}
return cidList, nil
}
// iterateContainers iterates over a list of container identifiers
// belonging to the specified user of FrostFS system and executes
// `cb` on each element. If idUser is nil, calls it on the list of all containers.
func (c *Client) IterateContainersOf(idUser *user.ID, cb func(item cid.ID) error) error {
var rawID []byte
if idUser != nil { if idUser != nil {
rawID = idUser.WalletBytes() rawID = idUser.WalletBytes()
} }
var cidList []cid.ID itemCb := func(item stackitem.Item) error {
Review

This line was right before the TestInvokeIterator previously, why has it moved?

This line was right before the `TestInvokeIterator` previously, why has it moved?
Review

Accidentally. Moved it back.

Accidentally. Moved it back.
cb := func(item stackitem.Item) error { id, err := getCIDfromStackItem(item)
rawID, err := client.BytesFromStackItem(item)
if err != nil { if err != nil {
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err) return err
} }
if err = cb(id); err != nil {
fyrchik marked this conversation as resolved Outdated

Unrelated to the commit.

Unrelated to the commit.
var id cid.ID return err
err = id.Decode(rawID)
if err != nil {
return fmt.Errorf("decode container ID: %w", err)
} }
cidList = append(cidList, id)
return nil return nil
} }
@ -50,13 +57,10 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
const batchSize = 512 const batchSize = 512
cnrHash := c.client.ContractAddress() cnrHash := c.client.ContractAddress()
err := c.client.Morph().TestInvokeIterator(cb, batchSize, cnrHash, containersOfMethod, rawID) err := c.client.Morph().TestInvokeIterator(itemCb, batchSize, cnrHash, containersOfMethod, rawID)
if err != nil { if err != nil && errors.Is(err, unwrap.ErrNoSessionID) {
if errors.Is(err, unwrap.ErrNoSessionID) { return c.iterate(idUser, cb)
return c.list(idUser)
}
return nil, err
} }
return cidList, nil return err
} }

View file

@ -6,15 +6,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
) )
// list returns a list of container identifiers belonging // iterate iterates through a list of container identifiers belonging
// to the specified user of FrostFS system. The list is composed // to the specified user of FrostFS system. The list is composed
// through Container contract call. // through Container contract call.
// //
// Returns the identifiers of all FrostFS containers if pointer // Iterates through the identifiers of all FrostFS containers if pointer
// to user identifier is nil. // to user identifier is nil.
func (c *Client) list(idUser *user.ID) ([]cid.ID, error) { func (c *Client) iterate(idUser *user.ID, cb func(cid.ID) error) error {
var rawID []byte var rawID []byte
if idUser != nil { if idUser != nil {
@ -27,32 +28,41 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
res, err := c.client.TestInvoke(prm) res, err := c.client.TestInvoke(prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("test invoke (%s): %w", listMethod, err) return fmt.Errorf("test invoke (%s): %w", listMethod, err)
} else if ln := len(res); ln != 1 { } else if ln := len(res); ln != 1 {
return nil, fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln) return fmt.Errorf("unexpected stack item count (%s): %d", listMethod, ln)
} }
res, err = client.ArrayFromStackItem(res[0]) res, err = client.ArrayFromStackItem(res[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err) return fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
} }
cidList := make([]cid.ID, 0, len(res))
for i := range res { for i := range res {
rawID, err := client.BytesFromStackItem(res[i]) id, err := getCIDfromStackItem(res[i])
if err != nil { if err != nil {
return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err) return err
}
if err = cb(id); err != nil {
return err
}
}
return nil
}
func getCIDfromStackItem(item stackitem.Item) (cid.ID, error) {
rawID, err := client.BytesFromStackItem(item)
if err != nil {
return cid.ID{}, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
aarifullin marked this conversation as resolved Outdated

[32]byte{} -> cid.ID{}? :)

`[32]byte{}` -> `cid.ID{}`? :)

Fixed!

Fixed!

Fixed!

Fixed!
} }
var id cid.ID var id cid.ID
err = id.Decode(rawID) err = id.Decode(rawID)
if err != nil { if err != nil {
return nil, fmt.Errorf("decode container ID: %w", err) return cid.ID{}, fmt.Errorf("decode container ID: %w", err)
} }
return id, nil
cidList = append(cidList, id)
}
return cidList, nil
} }

View file

@ -30,6 +30,7 @@ type Reader interface {
// to the specified user of FrostFS system. Returns the identifiers // to the specified user of FrostFS system. Returns the identifiers
// of all FrostFS containers if pointer to owner identifier is nil. // of all FrostFS containers if pointer to owner identifier is nil.
ContainersOf(*user.ID) ([]cid.ID, error) ContainersOf(*user.ID) ([]cid.ID, error)
IterateContainersOf(*user.ID, func(cid.ID) error) error
fyrchik marked this conversation as resolved Outdated
  1. WithProcessing is something unfamiliar, Iterate is more common.
  2. func([]cid.ID) error looks like unnecessary complexity, why not func(cid.ID) error?
1. `WithProcessing` is something unfamiliar, `Iterate` is more common. 2. `func([]cid.ID) error` looks like unnecessary complexity, why not `func(cid.ID) error`?

The idea was to make it obvious from the method signature that just like ContainersOf returns cid list, new method allows to perform some action on this cid list. E.g. if in func([]cid.ID) error I would append passed cid list to my list, I would get the same result as if I used ContainersOf - []cid.ID and error.

The idea was to make it obvious from the method signature that just like `ContainersOf` returns `cid list`, new method allows to perform some action on this `cid list`. E.g. if in `func([]cid.ID) error` I would append passed `cid list` to my list, I would get the same result as if I used `ContainersOf` - `[]cid.ID` and `error`.

I did as you suggested, code looks more clean now.

I did as you suggested, code looks more clean now.
} }
// Writer is an interface of container storage updater. // Writer is an interface of container storage updater.
@ -201,7 +202,7 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil return res, nil
} }
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
fyrchik marked this conversation as resolved Outdated

ListStream signature doesn't have any context.

func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {

How had the context appeared here?

`ListStream` signature doesn't have any context. ``` func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error { func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error { ``` How had the context appeared here?

That's because ContainerServiceClient requires context and ContainerServiceServer does not.
Please see https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/branch/master/api/container/grpc/service_grpc.pb.go

type ContainerServiceClient interface {
  // ...
  	ListStream(ctx context.Context, in *ListStreamRequest, opts ...grpc.CallOption) (ContainerService_ListStreamClient, error)
}
type ContainerServiceServer interface {
  // ...
  	ListStream(*ListStreamRequest, ContainerService_ListStreamServer) error
}
That's because `ContainerServiceClient` requires context and `ContainerServiceServer` does not. Please see https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/branch/master/api/container/grpc/service_grpc.pb.go ```go type ContainerServiceClient interface { // ... ListStream(ctx context.Context, in *ListStreamRequest, opts ...grpc.CallOption) (ContainerService_ListStreamClient, error) } ``` ```go type ContainerServiceServer interface { // ... ListStream(*ListStreamRequest, ContainerService_ListStreamServer) error } ```
body := req.GetBody() body := req.GetBody()
idV2 := body.GetOwnerID() idV2 := body.GetOwnerID()
if idV2 == nil { if idV2 == nil {
@ -215,20 +216,41 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
return fmt.Errorf("invalid user ID: %w", err) return fmt.Errorf("invalid user ID: %w", err)
} }
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody) resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse) r := new(container.ListStreamResponse)
r.SetBody(resBody) r.SetBody(resBody)
var cidList []refs.ContainerID
// Amount of containers to send at once.
const batchSize = 1000
aarifullin marked this conversation as resolved Outdated
ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream)

->

ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream)

->

case <-ctx.Done()

:)

```go ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) ``` -> ```go ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) ``` -> ```go case <-ctx.Done() ``` :)

Fixed, thanks!

Fixed, thanks!
processCID := func(id cid.ID) error {
select {
fyrchik marked this conversation as resolved Outdated

Woulb be nice to check if ctx is cancelled inside this cb.

Woulb be nice to check if ctx is cancelled inside this cb.

Added context check.

Added context check.
case <-ctx.Done():
return ctx.Err()
elebedeva marked this conversation as resolved Outdated

What should we use as batch size here?
Or we could send ids one by one but for a large number of containers it seems like a bad idea.

What should we use as `batch size` here? Or we could send ids one by one but for a large number of containers it seems like a bad idea.
fyrchik marked this conversation as resolved Outdated

This batch size should be different, though.
It is more related to the transport level.
I would use a separate constant here.

This batch size should be different, though. It is more related to the transport level. I would use a separate constant here.

Added new local constant.

Added new local constant.
default:
}
fyrchik marked this conversation as resolved Outdated

After getRefCIDList the old cidList is no longer used, so you can just have cidList = cidList[:0] here.
This saves us an allocation, removes magic constant and allows us to reuse memory.

After `getRefCIDList` the old `cidList` is no longer used, so you can just have `cidList = cidList[:0]` here. This saves us an allocation, removes magic constant and allows us to reuse memory.

Also, you can have var cidList []refs.ContainerID here, and parse each ID separately.
This saves us another slice allocation for conversion stage.

Also, you can have `var cidList []refs.ContainerID` here, and parse each ID separately. This saves us another slice allocation for conversion stage.

You're right, too many unnecessary actions here. Fixed!

You're right, too many unnecessary actions here. Fixed!

Nice. That's what I actually expected from list container streaming 👍

Nice. That's what I actually expected from list container streaming 👍
var refID refs.ContainerID
id.WriteToV2(&refID)
cidList = append(cidList, refID)
if len(cidList) == batchSize {
r.GetBody().SetContainerIDs(cidList)
cidList = cidList[:0]
return stream.Send(r) return stream.Send(r)
}
return nil
}
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
return err
}
if len(cidList) > 0 {
r.GetBody().SetContainerIDs(cidList)
return stream.Send(r)
}
return nil
} }