Do not use session iterators when unnecessary #902
3 changed files with 55 additions and 12 deletions
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/management"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/management"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
||||||
|
@ -143,7 +144,12 @@ func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
sessionID, iter, err := unwrap.SessionIterator(inv.Call(nnsHash, "tokens"))
|
script, err := smartcontract.CreateCallAndPrefetchIteratorScript(nnsHash, "tokens", nnsMaxTokens)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create prefetch script: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
arr, sessionID, iter, err := unwrap.ArrayAndSessionIterator(inv.Run(script))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, unwrap.ErrNoSessionID) {
|
if errors.Is(err, unwrap.ErrNoSessionID) {
|
||||||
items, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens))
|
items, err := unwrap.Array(inv.CallAndExpandIterator(nnsHash, "tokens", nnsMaxTokens))
|
||||||
|
@ -160,6 +166,10 @@ func dumpCustomZoneHashes(cmd *cobra.Command, nnsHash util.Uint160, zone string,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
for i := range arr {
|
||||||
|
processItem(arr[i])
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = inv.TerminateSession(sessionID)
|
_ = inv.TerminateSession(sessionID)
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"github.com/google/uuid"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
|
@ -24,8 +25,10 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
|
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
|
@ -204,16 +207,27 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string,
|
||||||
return vub, nil
|
return vub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// defaultPrefetchBatchSize is the default number of items to prefetch.
|
||||||
|
// It is dependent on VM limits (2048 items on stack), the default works for simple items.
|
||||||
|
// For example, to iterate over 2 field structs, the limit should be divided by 3 = 1 (struct itself) + 2 (fields).
|
||||||
|
const defaultPrefetchBatchSize = vm.MaxStackSize - 16
|
||||||
|
|||||||
|
|
||||||
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
|
// TestInvokeIterator invokes contract method returning an iterator and executes cb on each element.
|
||||||
// If cb returns an error, the session is closed and this error is returned as-is.
|
// If cb returns an error, the session is closed and this error is returned as-is.
|
||||||
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
||||||
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, contract util.Uint160, method string, args ...interface{}) error {
|
// batchSize is the number of items to prefetch: if the number of items in the iterator is less than batchSize, no session will be created.
|
||||||
|
// The default batchSize is 2000 (VM is limited by having 2048 items on stack, so if each iterator item is simple, 2000 items won't hit the limit).
|
||||||
|
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, batchSize int, contract util.Uint160, method string, args ...interface{}) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
success := false
|
success := false
|
||||||
defer func() {
|
defer func() {
|
||||||
c.metrics.ObserveInvoke("TestInvokeIterator", contract.String(), method, success, time.Since(start))
|
c.metrics.ObserveInvoke("TestInvokeIterator", contract.String(), method, success, time.Since(start))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if batchSize <= 0 {
|
||||||
|
batchSize = defaultPrefetchBatchSize
|
||||||
|
}
|
||||||
|
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
defer c.switchLock.RUnlock()
|
defer c.switchLock.RUnlock()
|
||||||
|
|
||||||
|
@ -221,35 +235,54 @@ func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, contract util
|
||||||
return ErrConnectionLost
|
return ErrConnectionLost
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err := c.rpcActor.Call(contract, method, args...)
|
script, err := smartcontract.CreateCallAndPrefetchIteratorScript(contract, method, batchSize, args...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := c.rpcActor.Run(script)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if val.State != HaltState {
|
} else if val.State != HaltState {
|
||||||
return wrapFrostFSError(¬HaltStateError{state: val.State, exception: val.FaultException})
|
return wrapFrostFSError(¬HaltStateError{state: val.State, exception: val.FaultException})
|
||||||
}
|
}
|
||||||
|
|
||||||
sid, r, err := unwrap.SessionIterator(val, err)
|
arr, sid, r, err := unwrap.ArrayAndSessionIterator(val, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
for i := range arr {
|
||||||
|
if err := cb(arr[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sid == uuid.UUID{}) {
|
||||||
|
success = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = c.rpcActor.TerminateSession(sid)
|
_ = c.rpcActor.TerminateSession(sid)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
items, err := c.rpcActor.TraverseIterator(sid, &r, 0)
|
for {
|
||||||
for err == nil && len(items) != 0 {
|
items, err := c.rpcActor.TraverseIterator(sid, &r, batchSize)
|
||||||
for i := range items {
|
if err != nil {
|
||||||
if err = cb(items[i]); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
items, err = c.rpcActor.TraverseIterator(sid, &r, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
success = err == nil
|
for i := range items {
|
||||||
|
if err := cb(items[i]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if len(items) < batchSize {
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Do you need
? Do you need `shouldStop` flag at all?
```go
for {
items, err := c.rpcActor.TraverseIterator(sid, &r, batchSize)
/*...*/
if len(items) < batchSize {
break
}
}
```
?
fyrchik
commented
Fixed Fixed
|
|||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
success = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TestInvoke invokes contract method locally in neo-go node. This method should
|
// TestInvoke invokes contract method locally in neo-go node. This method should
|
||||||
// be used to read data from smart-contract.
|
// be used to read data from smart-contract.
|
||||||
|
|
|
@ -41,7 +41,7 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrHash := c.client.ContractAddress()
|
cnrHash := c.client.ContractAddress()
|
||||||
err := c.client.Morph().TestInvokeIterator(cb, cnrHash, containersOfMethod, rawID)
|
err := c.client.Morph().TestInvokeIterator(cb, 0, cnrHash, containersOfMethod, rawID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, unwrap.ErrNoSessionID) {
|
if errors.Is(err, unwrap.ErrNoSessionID) {
|
||||||
return c.List(idUser)
|
return c.List(idUser)
|
||||||
|
|
Loading…
Reference in a new issue
-16
is for bookkeeping, should be more than enough for "default" cases