diff --git a/pkg/rpc/response/result/invoke.go b/pkg/rpc/response/result/invoke.go index e51cb4042..2cd78bb13 100644 --- a/pkg/rpc/response/result/invoke.go +++ b/pkg/rpc/response/result/invoke.go @@ -5,10 +5,7 @@ import ( "fmt" "github.com/google/uuid" - "github.com/nspcc-dev/neo-go/pkg/core/interop" - "github.com/nspcc-dev/neo-go/pkg/core/interop/iterator" "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage/dboper" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/vm/invocations" @@ -18,57 +15,23 @@ import ( // Invoke represents a code invocation result and is used by several RPC calls // that invoke functions, scripts and generic bytecode. type Invoke struct { - State string - GasConsumed int64 - Script []byte - Stack []stackitem.Item - FaultException string - Notifications []state.NotificationEvent - Transaction *transaction.Transaction - Diagnostics *InvokeDiag - maxIteratorResultItems int - Session uuid.UUID - finalize func() - registerIterator RegisterIterator + State string + GasConsumed int64 + Script []byte + Stack []stackitem.Item + FaultException string + Notifications []state.NotificationEvent + Transaction *transaction.Transaction + Diagnostics *InvokeDiag + Session uuid.UUID } -// RegisterIterator is a callback used to register new iterator on the server side. -type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) (uuid.UUID, error) - // InvokeDiag is an additional diagnostic data for invocation. type InvokeDiag struct { Changes []dboper.Operation `json:"storagechanges"` Invocations []*invocations.Tree `json:"invokedcontracts"` } -// NewInvoke returns a new Invoke structure with the given fields set. -func NewInvoke(ic *interop.Context, script []byte, faultException string, registerIterator RegisterIterator, maxIteratorResultItems int) *Invoke { - var diag *InvokeDiag - tree := ic.VM.GetInvocationTree() - if tree != nil { - diag = &InvokeDiag{ - Invocations: tree.Calls, - Changes: storage.BatchToOperations(ic.DAO.GetBatch()), - } - } - notifications := ic.Notifications - if notifications == nil { - notifications = make([]state.NotificationEvent, 0) - } - return &Invoke{ - State: ic.VM.State().String(), - GasConsumed: ic.VM.GasConsumed(), - Script: script, - Stack: ic.VM.Estack().ToArray(), - FaultException: faultException, - Notifications: notifications, - Diagnostics: diag, - finalize: ic.Finalize, - maxIteratorResultItems: maxIteratorResultItems, - registerIterator: registerIterator, - } -} - type invokeAux struct { State string `json:"state"` GasConsumed int64 `json:"gasconsumed,string"` @@ -107,85 +70,55 @@ type Iterator struct { Truncated bool } -// Finalize releases resources occupied by Iterators created at the script invocation. -// This method will be called automatically on Invoke marshalling or by the Server's -// sessions handler. -func (r *Invoke) Finalize() { - if r.finalize != nil { - r.finalize() +// MarshalJSON implements the json.Marshaler. +func (r Iterator) MarshalJSON() ([]byte, error) { + var iaux iteratorAux + iaux.Type = stackitem.InteropT.String() + if r.ID != nil { + iaux.Interface = iteratorInterfaceName + iaux.ID = r.ID.String() + } else { + value := make([]json.RawMessage, len(r.Values)) + for i := range r.Values { + var err error + value[i], err = stackitem.ToJSONWithTypes(r.Values[i]) + if err != nil { + return nil, err + } + } + iaux.Value = value + iaux.Truncated = r.Truncated } + return json.Marshal(iaux) } // MarshalJSON implements the json.Marshaler. func (r Invoke) MarshalJSON() ([]byte, error) { var ( - st json.RawMessage - err error - faultSep string - arr = make([]json.RawMessage, len(r.Stack)) - sessionsEnabled = r.registerIterator != nil - sessionID string + st json.RawMessage + err error + faultSep string + arr = make([]json.RawMessage, len(r.Stack)) ) if len(r.FaultException) != 0 { faultSep = " / " } -arrloop: for i := range arr { var data []byte - if (r.Stack[i].Type() == stackitem.InteropT) && iterator.IsIterator(r.Stack[i]) { - if sessionsEnabled { - if sessionID == "" { - sessionID = uuid.NewString() - } - iteratorID, err := r.registerIterator(sessionID, r.Stack[i], i, r.finalize) - if err != nil { - // Call finalizer immediately, there can't be race between server and marshaller because session wasn't added to server's session pool. - r.Finalize() - return nil, fmt.Errorf("failed to register iterator session: %w", err) - } - data, err = json.Marshal(iteratorAux{ - Type: stackitem.InteropT.String(), - Interface: iteratorInterfaceName, - ID: iteratorID.String(), - }) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: failed to marshal iterator: %v", faultSep, err) - break - } - } else { - iteratorValues, truncated := iterator.ValuesTruncated(r.Stack[i], r.maxIteratorResultItems) - value := make([]json.RawMessage, len(iteratorValues)) - for j := range iteratorValues { - value[j], err = stackitem.ToJSONWithTypes(iteratorValues[j]) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) - break arrloop - } - } - data, err = json.Marshal(iteratorAux{ - Type: stackitem.InteropT.String(), - Value: value, - Truncated: truncated, - }) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) - break - } - } + + iter, ok := r.Stack[i].Value().(Iterator) + if (r.Stack[i].Type() == stackitem.InteropT) && ok { + data, err = json.Marshal(iter) } else { data, err = stackitem.ToJSONWithTypes(r.Stack[i]) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) - break - } + } + if err != nil { + r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) + break } arr[i] = data } - if !sessionsEnabled || sessionID == "" { - // Call finalizer manually if iterators are disabled or there's no unnested iterators on estack. - defer r.Finalize() - } if err == nil { st, err = json.Marshal(arr) if err != nil { @@ -196,6 +129,10 @@ arrloop: if r.Transaction != nil { txbytes = r.Transaction.Bytes() } + var sessionID string + if r.Session != (uuid.UUID{}) { + sessionID = r.Session.String() + } aux := &invokeAux{ GasConsumed: r.GasConsumed, Script: r.Script, diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 0e0b86e47..28ccfa246 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -101,29 +102,14 @@ type ( // or from historic MPT-based invocation. In the second case, iteratorIdentifiers are supposed // to be filled during the first `traverseiterator` call using corresponding params. iteratorIdentifiers []*iteratorIdentifier - // params stores invocation params for historic MPT-based iterator traversing. It is nil in case - // of default non-MPT-based sessions mechanism enabled. - params *invocationParams - timer *time.Timer - finalize func() + timer *time.Timer + finalize func() } - // iteratorIdentifier represents Iterator on the server side, holding iterator ID, Iterator stackitem - // and iterator index on stack. + // iteratorIdentifier represents Iterator on the server side, holding iterator ID and Iterator stackitem. iteratorIdentifier struct { ID string - // Item represents Iterator stackitem. It is nil if SessionBackedByMPT is set to true and no `traverseiterator` - // call was called for the corresponding session. + // Item represents Iterator stackitem. Item stackitem.Item - // StackIndex represents Iterator stackitem index on the stack. It can be used only for SessionBackedByMPT configuration. - StackIndex int - } - // invocationParams is a set of parameters used for invoke* calls. - invocationParams struct { - Trigger trigger.Type - Script []byte - ContractScriptHash util.Uint160 - Transaction *transaction.Transaction - NextBlockHeight uint32 } ) @@ -350,9 +336,7 @@ func (s *Server) Shutdown() { for _, session := range s.sessions { // Concurrent iterator traversal may still be in process, thus need to protect iteratorIdentifiers access. session.iteratorsLock.Lock() - if session.finalize != nil { - session.finalize() - } + session.finalize() if !session.timer.Stop() { <-session.timer.C } @@ -2036,64 +2020,117 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash if err != nil { faultException = err.Error() } - var registerIterator result.RegisterIterator - if s.config.SessionEnabled { - registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) (uuid.UUID, error) { - iterID := uuid.New() + items := ic.VM.Estack().ToArray() + sess := s.postProcessExecStack(items) + var id uuid.UUID + + if sess != nil { + // b == nil only when we're not using MPT-backed storage, therefore + // the second attempt won't stop here. + if s.config.SessionBackedByMPT && b == nil { + ic.Finalize() + b, err = s.getFakeNextBlock(ic.Block.Index) + if err != nil { + return nil, response.NewInternalServerError(fmt.Sprintf("unable to prepare block for historic call: %s", err)) + } + // Rerun with MPT-backed storage. + return s.runScriptInVM(t, script, contractScriptHash, tx, b, verbose) + } + id = uuid.New() + sessionID := id.String() + sess.finalize = ic.Finalize + sess.timer = time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { s.sessionsLock.Lock() + defer s.sessionsLock.Unlock() + if len(s.sessions) == 0 { + return + } sess, ok := s.sessions[sessionID] if !ok { - if len(s.sessions) >= s.config.SessionPoolSize { - return uuid.UUID{}, errors.New("max capacity reached") - } - timer := time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { - s.sessionsLock.Lock() - defer s.sessionsLock.Unlock() - if len(s.sessions) == 0 { - return - } - sess, ok := s.sessions[sessionID] - if !ok { - return - } - sess.iteratorsLock.Lock() - if sess.finalize != nil { - sess.finalize() - } - delete(s.sessions, sessionID) - sess.iteratorsLock.Unlock() - }) - sess = &session{ - finalize: finalize, - timer: timer, - } - if s.config.SessionBackedByMPT { - sess.params = &invocationParams{ - Trigger: t, - Script: script, - ContractScriptHash: contractScriptHash, - Transaction: tx, - NextBlockHeight: ic.Block.Index, - } - // Call finalizer manually if MPT-based iterator sessions are enabled. If disabled, then register finalizator. - if finalize != nil { - finalize() - sess.finalize = nil - } - item = nil - } + return } - sess.iteratorIdentifiers = append(sess.iteratorIdentifiers, &iteratorIdentifier{ - ID: iterID.String(), - Item: item, - StackIndex: stackIndex, - }) - s.sessions[sessionID] = sess + sess.iteratorsLock.Lock() + sess.finalize() + delete(s.sessions, sessionID) + sess.iteratorsLock.Unlock() + }) + s.sessionsLock.Lock() + if len(s.sessions) >= s.config.SessionPoolSize { + ic.Finalize() s.sessionsLock.Unlock() - return iterID, nil + return nil, response.NewInternalServerError("max session capacity reached") + } + s.sessions[sessionID] = sess + s.sessionsLock.Unlock() + } else { + ic.Finalize() + } + var diag *result.InvokeDiag + tree := ic.VM.GetInvocationTree() + if tree != nil { + diag = &result.InvokeDiag{ + Invocations: tree.Calls, + Changes: storage.BatchToOperations(ic.DAO.GetBatch()), } } - return result.NewInvoke(ic, script, faultException, registerIterator, s.config.MaxIteratorResultItems), nil + notifications := ic.Notifications + if notifications == nil { + notifications = make([]state.NotificationEvent, 0) + } + res := &result.Invoke{ + State: ic.VM.State().String(), + GasConsumed: ic.VM.GasConsumed(), + Script: script, + Stack: items, + FaultException: faultException, + Notifications: notifications, + Diagnostics: diag, + Session: id, + } + + return res, nil +} + +// postProcessExecStack changes iterator interop items according to the server configuration. +// It does modifications in-place, but it returns a session if any iterator was registered. +func (s *Server) postProcessExecStack(stack []stackitem.Item) *session { + var sess session + + for i, v := range stack { + var id uuid.UUID + + stack[i], id = s.registerOrDumpIterator(v) + if id != (uuid.UUID{}) { + sess.iteratorIdentifiers = append(sess.iteratorIdentifiers, &iteratorIdentifier{ + ID: id.String(), + Item: v, + }) + } + } + if len(sess.iteratorIdentifiers) != 0 { + return &sess + } + return nil +} + +// registerOrDumpIterator changes iterator interop stack items into result.Iterator +// interop stack items and returns a uuid for it if sessions are enabled. All the other stack +// items are not changed. +func (s *Server) registerOrDumpIterator(item stackitem.Item) (stackitem.Item, uuid.UUID) { + var iterID uuid.UUID + + if (item.Type() != stackitem.InteropT) || !iterator.IsIterator(item) { + return item, iterID + } + var resIterator result.Iterator + + if s.config.SessionEnabled { + iterID = uuid.New() + resIterator.ID = &iterID + } else { + resIterator.Values, resIterator.Truncated = iterator.ValuesTruncated(item, s.config.MaxIteratorResultItems) + } + return stackitem.NewInterop(resIterator), iterID } func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *response.Error) { @@ -2132,43 +2169,11 @@ func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *respon s.sessionsLock.Unlock() var ( - iIDStr = iID.String() - iVals []stackitem.Item - respErr *response.Error + iIDStr = iID.String() + iVals []stackitem.Item ) for _, it := range session.iteratorIdentifiers { if iIDStr == it.ID { - // If SessionBackedByMPT is enabled, then use MPT-backed historic call to retrieve and traverse iterator. - // Otherwise, iterator stackitem is ready and can be used. - if s.config.SessionBackedByMPT && it.Item == nil { - var ( - b *block.Block - ic *interop.Context - ) - b, err = s.getFakeNextBlock(session.params.NextBlockHeight) - if err != nil { - session.iteratorsLock.Unlock() - return nil, response.NewInternalServerError(fmt.Sprintf("unable to prepare block for historic call: %s", err)) - } - ic, respErr = s.prepareInvocationContext(session.params.Trigger, session.params.Script, session.params.ContractScriptHash, session.params.Transaction, b, false) - if respErr != nil { - session.iteratorsLock.Unlock() - return nil, respErr - } - _ = ic.VM.Run() // No error check because FAULTed invocations could also contain iterator on stack. - stack := ic.VM.Estack().ToArray() - - // Fill in the whole set of iterators for the current session in order not to repeat test invocation one more time for other session iterators. - for _, itID := range session.iteratorIdentifiers { - j := itID.StackIndex - if (stack[j].Type() != stackitem.InteropT) || !iterator.IsIterator(stack[j]) { - session.iteratorsLock.Unlock() - return nil, response.NewInternalServerError(fmt.Sprintf("inconsistent historic call result: expected %s, got %s at stack position #%d", stackitem.InteropT, stack[j].Type(), j)) - } - session.iteratorIdentifiers[j].Item = stack[j] - } - session.finalize = ic.Finalize - } iVals = iterator.Values(it.Item, count) break } @@ -2201,9 +2206,7 @@ func (s *Server) terminateSession(reqParams params.Params) (interface{}, *respon // Iterators access Seek channel under the hood; finalizer closes this channel, thus, // we need to perform finalisation under iteratorsLock. session.iteratorsLock.Lock() - if session.finalize != nil { - session.finalize() - } + session.finalize() if !session.timer.Stop() { <-session.timer.C }