rpc: simplify result.Invoke creation, remove needless deps

Change stack items before marshaling them which makes code in result package
much simpler and not requiring interop, iterator and storage dependencies that
clients shouldn't care about.

This also changes SessionBackedByMPT behavior, now instead of waiting for
traverseiterator call it'll rerun the script immediately if a new session is
created.
This commit is contained in:
Roman Khimov 2022-07-08 23:25:22 +03:00
parent 96c4e61063
commit 0c45ff8f51
2 changed files with 157 additions and 217 deletions

View file

@ -5,10 +5,7 @@ import (
"fmt" "fmt"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/nspcc-dev/neo-go/pkg/core/interop"
"github.com/nspcc-dev/neo-go/pkg/core/interop/iterator"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/storage/dboper" "github.com/nspcc-dev/neo-go/pkg/core/storage/dboper"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/vm/invocations" "github.com/nspcc-dev/neo-go/pkg/vm/invocations"
@ -26,49 +23,15 @@ type Invoke struct {
Notifications []state.NotificationEvent Notifications []state.NotificationEvent
Transaction *transaction.Transaction Transaction *transaction.Transaction
Diagnostics *InvokeDiag Diagnostics *InvokeDiag
maxIteratorResultItems int
Session uuid.UUID Session uuid.UUID
finalize func()
registerIterator RegisterIterator
} }
// RegisterIterator is a callback used to register new iterator on the server side.
type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) (uuid.UUID, error)
// InvokeDiag is an additional diagnostic data for invocation. // InvokeDiag is an additional diagnostic data for invocation.
type InvokeDiag struct { type InvokeDiag struct {
Changes []dboper.Operation `json:"storagechanges"` Changes []dboper.Operation `json:"storagechanges"`
Invocations []*invocations.Tree `json:"invokedcontracts"` Invocations []*invocations.Tree `json:"invokedcontracts"`
} }
// NewInvoke returns a new Invoke structure with the given fields set.
func NewInvoke(ic *interop.Context, script []byte, faultException string, registerIterator RegisterIterator, maxIteratorResultItems int) *Invoke {
var diag *InvokeDiag
tree := ic.VM.GetInvocationTree()
if tree != nil {
diag = &InvokeDiag{
Invocations: tree.Calls,
Changes: storage.BatchToOperations(ic.DAO.GetBatch()),
}
}
notifications := ic.Notifications
if notifications == nil {
notifications = make([]state.NotificationEvent, 0)
}
return &Invoke{
State: ic.VM.State().String(),
GasConsumed: ic.VM.GasConsumed(),
Script: script,
Stack: ic.VM.Estack().ToArray(),
FaultException: faultException,
Notifications: notifications,
Diagnostics: diag,
finalize: ic.Finalize,
maxIteratorResultItems: maxIteratorResultItems,
registerIterator: registerIterator,
}
}
type invokeAux struct { type invokeAux struct {
State string `json:"state"` State string `json:"state"`
GasConsumed int64 `json:"gasconsumed,string"` GasConsumed int64 `json:"gasconsumed,string"`
@ -107,13 +70,26 @@ type Iterator struct {
Truncated bool Truncated bool
} }
// Finalize releases resources occupied by Iterators created at the script invocation. // MarshalJSON implements the json.Marshaler.
// This method will be called automatically on Invoke marshalling or by the Server's func (r Iterator) MarshalJSON() ([]byte, error) {
// sessions handler. var iaux iteratorAux
func (r *Invoke) Finalize() { iaux.Type = stackitem.InteropT.String()
if r.finalize != nil { if r.ID != nil {
r.finalize() iaux.Interface = iteratorInterfaceName
iaux.ID = r.ID.String()
} else {
value := make([]json.RawMessage, len(r.Values))
for i := range r.Values {
var err error
value[i], err = stackitem.ToJSONWithTypes(r.Values[i])
if err != nil {
return nil, err
} }
}
iaux.Value = value
iaux.Truncated = r.Truncated
}
return json.Marshal(iaux)
} }
// MarshalJSON implements the json.Marshaler. // MarshalJSON implements the json.Marshaler.
@ -123,69 +99,26 @@ func (r Invoke) MarshalJSON() ([]byte, error) {
err error err error
faultSep string faultSep string
arr = make([]json.RawMessage, len(r.Stack)) arr = make([]json.RawMessage, len(r.Stack))
sessionsEnabled = r.registerIterator != nil
sessionID string
) )
if len(r.FaultException) != 0 { if len(r.FaultException) != 0 {
faultSep = " / " faultSep = " / "
} }
arrloop:
for i := range arr { for i := range arr {
var data []byte var data []byte
if (r.Stack[i].Type() == stackitem.InteropT) && iterator.IsIterator(r.Stack[i]) {
if sessionsEnabled { iter, ok := r.Stack[i].Value().(Iterator)
if sessionID == "" { if (r.Stack[i].Type() == stackitem.InteropT) && ok {
sessionID = uuid.NewString() data, err = json.Marshal(iter)
}
iteratorID, err := r.registerIterator(sessionID, r.Stack[i], i, r.finalize)
if err != nil {
// Call finalizer immediately, there can't be race between server and marshaller because session wasn't added to server's session pool.
r.Finalize()
return nil, fmt.Errorf("failed to register iterator session: %w", err)
}
data, err = json.Marshal(iteratorAux{
Type: stackitem.InteropT.String(),
Interface: iteratorInterfaceName,
ID: iteratorID.String(),
})
if err != nil {
r.FaultException += fmt.Sprintf("%sjson error: failed to marshal iterator: %v", faultSep, err)
break
}
} else {
iteratorValues, truncated := iterator.ValuesTruncated(r.Stack[i], r.maxIteratorResultItems)
value := make([]json.RawMessage, len(iteratorValues))
for j := range iteratorValues {
value[j], err = stackitem.ToJSONWithTypes(iteratorValues[j])
if err != nil {
r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err)
break arrloop
}
}
data, err = json.Marshal(iteratorAux{
Type: stackitem.InteropT.String(),
Value: value,
Truncated: truncated,
})
if err != nil {
r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err)
break
}
}
} else { } else {
data, err = stackitem.ToJSONWithTypes(r.Stack[i]) data, err = stackitem.ToJSONWithTypes(r.Stack[i])
}
if err != nil { if err != nil {
r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err)
break break
} }
}
arr[i] = data arr[i] = data
} }
if !sessionsEnabled || sessionID == "" {
// Call finalizer manually if iterators are disabled or there's no unnested iterators on estack.
defer r.Finalize()
}
if err == nil { if err == nil {
st, err = json.Marshal(arr) st, err = json.Marshal(arr)
if err != nil { if err != nil {
@ -196,6 +129,10 @@ arrloop:
if r.Transaction != nil { if r.Transaction != nil {
txbytes = r.Transaction.Bytes() txbytes = r.Transaction.Bytes()
} }
var sessionID string
if r.Session != (uuid.UUID{}) {
sessionID = r.Session.String()
}
aux := &invokeAux{ aux := &invokeAux{
GasConsumed: r.GasConsumed, GasConsumed: r.GasConsumed,
Script: r.Script, Script: r.Script,

View file

@ -32,6 +32,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -101,29 +102,14 @@ type (
// or from historic MPT-based invocation. In the second case, iteratorIdentifiers are supposed // or from historic MPT-based invocation. In the second case, iteratorIdentifiers are supposed
// to be filled during the first `traverseiterator` call using corresponding params. // to be filled during the first `traverseiterator` call using corresponding params.
iteratorIdentifiers []*iteratorIdentifier iteratorIdentifiers []*iteratorIdentifier
// params stores invocation params for historic MPT-based iterator traversing. It is nil in case
// of default non-MPT-based sessions mechanism enabled.
params *invocationParams
timer *time.Timer timer *time.Timer
finalize func() finalize func()
} }
// iteratorIdentifier represents Iterator on the server side, holding iterator ID, Iterator stackitem // iteratorIdentifier represents Iterator on the server side, holding iterator ID and Iterator stackitem.
// and iterator index on stack.
iteratorIdentifier struct { iteratorIdentifier struct {
ID string ID string
// Item represents Iterator stackitem. It is nil if SessionBackedByMPT is set to true and no `traverseiterator` // Item represents Iterator stackitem.
// call was called for the corresponding session.
Item stackitem.Item Item stackitem.Item
// StackIndex represents Iterator stackitem index on the stack. It can be used only for SessionBackedByMPT configuration.
StackIndex int
}
// invocationParams is a set of parameters used for invoke* calls.
invocationParams struct {
Trigger trigger.Type
Script []byte
ContractScriptHash util.Uint160
Transaction *transaction.Transaction
NextBlockHeight uint32
} }
) )
@ -350,9 +336,7 @@ func (s *Server) Shutdown() {
for _, session := range s.sessions { for _, session := range s.sessions {
// Concurrent iterator traversal may still be in process, thus need to protect iteratorIdentifiers access. // Concurrent iterator traversal may still be in process, thus need to protect iteratorIdentifiers access.
session.iteratorsLock.Lock() session.iteratorsLock.Lock()
if session.finalize != nil {
session.finalize() session.finalize()
}
if !session.timer.Stop() { if !session.timer.Stop() {
<-session.timer.C <-session.timer.C
} }
@ -2036,17 +2020,26 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash
if err != nil { if err != nil {
faultException = err.Error() faultException = err.Error()
} }
var registerIterator result.RegisterIterator items := ic.VM.Estack().ToArray()
if s.config.SessionEnabled { sess := s.postProcessExecStack(items)
registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) (uuid.UUID, error) { var id uuid.UUID
iterID := uuid.New()
s.sessionsLock.Lock() if sess != nil {
sess, ok := s.sessions[sessionID] // b == nil only when we're not using MPT-backed storage, therefore
if !ok { // the second attempt won't stop here.
if len(s.sessions) >= s.config.SessionPoolSize { if s.config.SessionBackedByMPT && b == nil {
return uuid.UUID{}, errors.New("max capacity reached") ic.Finalize()
b, err = s.getFakeNextBlock(ic.Block.Index)
if err != nil {
return nil, response.NewInternalServerError(fmt.Sprintf("unable to prepare block for historic call: %s", err))
} }
timer := time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { // Rerun with MPT-backed storage.
return s.runScriptInVM(t, script, contractScriptHash, tx, b, verbose)
}
id = uuid.New()
sessionID := id.String()
sess.finalize = ic.Finalize
sess.timer = time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() {
s.sessionsLock.Lock() s.sessionsLock.Lock()
defer s.sessionsLock.Unlock() defer s.sessionsLock.Unlock()
if len(s.sessions) == 0 { if len(s.sessions) == 0 {
@ -2057,43 +2050,87 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash
return return
} }
sess.iteratorsLock.Lock() sess.iteratorsLock.Lock()
if sess.finalize != nil {
sess.finalize() sess.finalize()
}
delete(s.sessions, sessionID) delete(s.sessions, sessionID)
sess.iteratorsLock.Unlock() sess.iteratorsLock.Unlock()
}) })
sess = &session{ s.sessionsLock.Lock()
finalize: finalize, if len(s.sessions) >= s.config.SessionPoolSize {
timer: timer, ic.Finalize()
s.sessionsLock.Unlock()
return nil, response.NewInternalServerError("max session capacity reached")
} }
if s.config.SessionBackedByMPT {
sess.params = &invocationParams{
Trigger: t,
Script: script,
ContractScriptHash: contractScriptHash,
Transaction: tx,
NextBlockHeight: ic.Block.Index,
}
// Call finalizer manually if MPT-based iterator sessions are enabled. If disabled, then register finalizator.
if finalize != nil {
finalize()
sess.finalize = nil
}
item = nil
}
}
sess.iteratorIdentifiers = append(sess.iteratorIdentifiers, &iteratorIdentifier{
ID: iterID.String(),
Item: item,
StackIndex: stackIndex,
})
s.sessions[sessionID] = sess s.sessions[sessionID] = sess
s.sessionsLock.Unlock() s.sessionsLock.Unlock()
return iterID, nil } else {
ic.Finalize()
}
var diag *result.InvokeDiag
tree := ic.VM.GetInvocationTree()
if tree != nil {
diag = &result.InvokeDiag{
Invocations: tree.Calls,
Changes: storage.BatchToOperations(ic.DAO.GetBatch()),
} }
} }
return result.NewInvoke(ic, script, faultException, registerIterator, s.config.MaxIteratorResultItems), nil notifications := ic.Notifications
if notifications == nil {
notifications = make([]state.NotificationEvent, 0)
}
res := &result.Invoke{
State: ic.VM.State().String(),
GasConsumed: ic.VM.GasConsumed(),
Script: script,
Stack: items,
FaultException: faultException,
Notifications: notifications,
Diagnostics: diag,
Session: id,
}
return res, nil
}
// postProcessExecStack changes iterator interop items according to the server configuration.
// It does modifications in-place, but it returns a session if any iterator was registered.
func (s *Server) postProcessExecStack(stack []stackitem.Item) *session {
var sess session
for i, v := range stack {
var id uuid.UUID
stack[i], id = s.registerOrDumpIterator(v)
if id != (uuid.UUID{}) {
sess.iteratorIdentifiers = append(sess.iteratorIdentifiers, &iteratorIdentifier{
ID: id.String(),
Item: v,
})
}
}
if len(sess.iteratorIdentifiers) != 0 {
return &sess
}
return nil
}
// registerOrDumpIterator changes iterator interop stack items into result.Iterator
// interop stack items and returns a uuid for it if sessions are enabled. All the other stack
// items are not changed.
func (s *Server) registerOrDumpIterator(item stackitem.Item) (stackitem.Item, uuid.UUID) {
var iterID uuid.UUID
if (item.Type() != stackitem.InteropT) || !iterator.IsIterator(item) {
return item, iterID
}
var resIterator result.Iterator
if s.config.SessionEnabled {
iterID = uuid.New()
resIterator.ID = &iterID
} else {
resIterator.Values, resIterator.Truncated = iterator.ValuesTruncated(item, s.config.MaxIteratorResultItems)
}
return stackitem.NewInterop(resIterator), iterID
} }
func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *response.Error) { func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *response.Error) {
@ -2134,41 +2171,9 @@ func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *respon
var ( var (
iIDStr = iID.String() iIDStr = iID.String()
iVals []stackitem.Item iVals []stackitem.Item
respErr *response.Error
) )
for _, it := range session.iteratorIdentifiers { for _, it := range session.iteratorIdentifiers {
if iIDStr == it.ID { if iIDStr == it.ID {
// If SessionBackedByMPT is enabled, then use MPT-backed historic call to retrieve and traverse iterator.
// Otherwise, iterator stackitem is ready and can be used.
if s.config.SessionBackedByMPT && it.Item == nil {
var (
b *block.Block
ic *interop.Context
)
b, err = s.getFakeNextBlock(session.params.NextBlockHeight)
if err != nil {
session.iteratorsLock.Unlock()
return nil, response.NewInternalServerError(fmt.Sprintf("unable to prepare block for historic call: %s", err))
}
ic, respErr = s.prepareInvocationContext(session.params.Trigger, session.params.Script, session.params.ContractScriptHash, session.params.Transaction, b, false)
if respErr != nil {
session.iteratorsLock.Unlock()
return nil, respErr
}
_ = ic.VM.Run() // No error check because FAULTed invocations could also contain iterator on stack.
stack := ic.VM.Estack().ToArray()
// Fill in the whole set of iterators for the current session in order not to repeat test invocation one more time for other session iterators.
for _, itID := range session.iteratorIdentifiers {
j := itID.StackIndex
if (stack[j].Type() != stackitem.InteropT) || !iterator.IsIterator(stack[j]) {
session.iteratorsLock.Unlock()
return nil, response.NewInternalServerError(fmt.Sprintf("inconsistent historic call result: expected %s, got %s at stack position #%d", stackitem.InteropT, stack[j].Type(), j))
}
session.iteratorIdentifiers[j].Item = stack[j]
}
session.finalize = ic.Finalize
}
iVals = iterator.Values(it.Item, count) iVals = iterator.Values(it.Item, count)
break break
} }
@ -2201,9 +2206,7 @@ func (s *Server) terminateSession(reqParams params.Params) (interface{}, *respon
// Iterators access Seek channel under the hood; finalizer closes this channel, thus, // Iterators access Seek channel under the hood; finalizer closes this channel, thus,
// we need to perform finalisation under iteratorsLock. // we need to perform finalisation under iteratorsLock.
session.iteratorsLock.Lock() session.iteratorsLock.Lock()
if session.finalize != nil {
session.finalize() session.finalize()
}
if !session.timer.Stop() { if !session.timer.Stop() {
<-session.timer.C <-session.timer.C
} }