From 0c45ff8f51d4b07e04b9b51f398c5c813ef7e1e2 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 8 Jul 2022 23:25:22 +0300 Subject: [PATCH] rpc: simplify result.Invoke creation, remove needless deps Change stack items before marshaling them which makes code in result package much simpler and not requiring interop, iterator and storage dependencies that clients shouldn't care about. This also changes SessionBackedByMPT behavior, now instead of waiting for traverseiterator call it'll rerun the script immediately if a new session is created. --- pkg/rpc/response/result/invoke.go | 151 ++++++-------------- pkg/rpc/server/server.go | 223 +++++++++++++++--------------- 2 files changed, 157 insertions(+), 217 deletions(-) diff --git a/pkg/rpc/response/result/invoke.go b/pkg/rpc/response/result/invoke.go index e51cb4042..2cd78bb13 100644 --- a/pkg/rpc/response/result/invoke.go +++ b/pkg/rpc/response/result/invoke.go @@ -5,10 +5,7 @@ import ( "fmt" "github.com/google/uuid" - "github.com/nspcc-dev/neo-go/pkg/core/interop" - "github.com/nspcc-dev/neo-go/pkg/core/interop/iterator" "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage/dboper" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/vm/invocations" @@ -18,57 +15,23 @@ import ( // Invoke represents a code invocation result and is used by several RPC calls // that invoke functions, scripts and generic bytecode. type Invoke struct { - State string - GasConsumed int64 - Script []byte - Stack []stackitem.Item - FaultException string - Notifications []state.NotificationEvent - Transaction *transaction.Transaction - Diagnostics *InvokeDiag - maxIteratorResultItems int - Session uuid.UUID - finalize func() - registerIterator RegisterIterator + State string + GasConsumed int64 + Script []byte + Stack []stackitem.Item + FaultException string + Notifications []state.NotificationEvent + Transaction *transaction.Transaction + Diagnostics *InvokeDiag + Session uuid.UUID } -// RegisterIterator is a callback used to register new iterator on the server side. -type RegisterIterator func(sessionID string, item stackitem.Item, id int, finalize func()) (uuid.UUID, error) - // InvokeDiag is an additional diagnostic data for invocation. type InvokeDiag struct { Changes []dboper.Operation `json:"storagechanges"` Invocations []*invocations.Tree `json:"invokedcontracts"` } -// NewInvoke returns a new Invoke structure with the given fields set. -func NewInvoke(ic *interop.Context, script []byte, faultException string, registerIterator RegisterIterator, maxIteratorResultItems int) *Invoke { - var diag *InvokeDiag - tree := ic.VM.GetInvocationTree() - if tree != nil { - diag = &InvokeDiag{ - Invocations: tree.Calls, - Changes: storage.BatchToOperations(ic.DAO.GetBatch()), - } - } - notifications := ic.Notifications - if notifications == nil { - notifications = make([]state.NotificationEvent, 0) - } - return &Invoke{ - State: ic.VM.State().String(), - GasConsumed: ic.VM.GasConsumed(), - Script: script, - Stack: ic.VM.Estack().ToArray(), - FaultException: faultException, - Notifications: notifications, - Diagnostics: diag, - finalize: ic.Finalize, - maxIteratorResultItems: maxIteratorResultItems, - registerIterator: registerIterator, - } -} - type invokeAux struct { State string `json:"state"` GasConsumed int64 `json:"gasconsumed,string"` @@ -107,85 +70,55 @@ type Iterator struct { Truncated bool } -// Finalize releases resources occupied by Iterators created at the script invocation. -// This method will be called automatically on Invoke marshalling or by the Server's -// sessions handler. -func (r *Invoke) Finalize() { - if r.finalize != nil { - r.finalize() +// MarshalJSON implements the json.Marshaler. +func (r Iterator) MarshalJSON() ([]byte, error) { + var iaux iteratorAux + iaux.Type = stackitem.InteropT.String() + if r.ID != nil { + iaux.Interface = iteratorInterfaceName + iaux.ID = r.ID.String() + } else { + value := make([]json.RawMessage, len(r.Values)) + for i := range r.Values { + var err error + value[i], err = stackitem.ToJSONWithTypes(r.Values[i]) + if err != nil { + return nil, err + } + } + iaux.Value = value + iaux.Truncated = r.Truncated } + return json.Marshal(iaux) } // MarshalJSON implements the json.Marshaler. func (r Invoke) MarshalJSON() ([]byte, error) { var ( - st json.RawMessage - err error - faultSep string - arr = make([]json.RawMessage, len(r.Stack)) - sessionsEnabled = r.registerIterator != nil - sessionID string + st json.RawMessage + err error + faultSep string + arr = make([]json.RawMessage, len(r.Stack)) ) if len(r.FaultException) != 0 { faultSep = " / " } -arrloop: for i := range arr { var data []byte - if (r.Stack[i].Type() == stackitem.InteropT) && iterator.IsIterator(r.Stack[i]) { - if sessionsEnabled { - if sessionID == "" { - sessionID = uuid.NewString() - } - iteratorID, err := r.registerIterator(sessionID, r.Stack[i], i, r.finalize) - if err != nil { - // Call finalizer immediately, there can't be race between server and marshaller because session wasn't added to server's session pool. - r.Finalize() - return nil, fmt.Errorf("failed to register iterator session: %w", err) - } - data, err = json.Marshal(iteratorAux{ - Type: stackitem.InteropT.String(), - Interface: iteratorInterfaceName, - ID: iteratorID.String(), - }) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: failed to marshal iterator: %v", faultSep, err) - break - } - } else { - iteratorValues, truncated := iterator.ValuesTruncated(r.Stack[i], r.maxIteratorResultItems) - value := make([]json.RawMessage, len(iteratorValues)) - for j := range iteratorValues { - value[j], err = stackitem.ToJSONWithTypes(iteratorValues[j]) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) - break arrloop - } - } - data, err = json.Marshal(iteratorAux{ - Type: stackitem.InteropT.String(), - Value: value, - Truncated: truncated, - }) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) - break - } - } + + iter, ok := r.Stack[i].Value().(Iterator) + if (r.Stack[i].Type() == stackitem.InteropT) && ok { + data, err = json.Marshal(iter) } else { data, err = stackitem.ToJSONWithTypes(r.Stack[i]) - if err != nil { - r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) - break - } + } + if err != nil { + r.FaultException += fmt.Sprintf("%sjson error: %v", faultSep, err) + break } arr[i] = data } - if !sessionsEnabled || sessionID == "" { - // Call finalizer manually if iterators are disabled or there's no unnested iterators on estack. - defer r.Finalize() - } if err == nil { st, err = json.Marshal(arr) if err != nil { @@ -196,6 +129,10 @@ arrloop: if r.Transaction != nil { txbytes = r.Transaction.Bytes() } + var sessionID string + if r.Session != (uuid.UUID{}) { + sessionID = r.Session.String() + } aux := &invokeAux{ GasConsumed: r.GasConsumed, Script: r.Script, diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 0e0b86e47..28ccfa246 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -101,29 +102,14 @@ type ( // or from historic MPT-based invocation. In the second case, iteratorIdentifiers are supposed // to be filled during the first `traverseiterator` call using corresponding params. iteratorIdentifiers []*iteratorIdentifier - // params stores invocation params for historic MPT-based iterator traversing. It is nil in case - // of default non-MPT-based sessions mechanism enabled. - params *invocationParams - timer *time.Timer - finalize func() + timer *time.Timer + finalize func() } - // iteratorIdentifier represents Iterator on the server side, holding iterator ID, Iterator stackitem - // and iterator index on stack. + // iteratorIdentifier represents Iterator on the server side, holding iterator ID and Iterator stackitem. iteratorIdentifier struct { ID string - // Item represents Iterator stackitem. It is nil if SessionBackedByMPT is set to true and no `traverseiterator` - // call was called for the corresponding session. + // Item represents Iterator stackitem. Item stackitem.Item - // StackIndex represents Iterator stackitem index on the stack. It can be used only for SessionBackedByMPT configuration. - StackIndex int - } - // invocationParams is a set of parameters used for invoke* calls. - invocationParams struct { - Trigger trigger.Type - Script []byte - ContractScriptHash util.Uint160 - Transaction *transaction.Transaction - NextBlockHeight uint32 } ) @@ -350,9 +336,7 @@ func (s *Server) Shutdown() { for _, session := range s.sessions { // Concurrent iterator traversal may still be in process, thus need to protect iteratorIdentifiers access. session.iteratorsLock.Lock() - if session.finalize != nil { - session.finalize() - } + session.finalize() if !session.timer.Stop() { <-session.timer.C } @@ -2036,64 +2020,117 @@ func (s *Server) runScriptInVM(t trigger.Type, script []byte, contractScriptHash if err != nil { faultException = err.Error() } - var registerIterator result.RegisterIterator - if s.config.SessionEnabled { - registerIterator = func(sessionID string, item stackitem.Item, stackIndex int, finalize func()) (uuid.UUID, error) { - iterID := uuid.New() + items := ic.VM.Estack().ToArray() + sess := s.postProcessExecStack(items) + var id uuid.UUID + + if sess != nil { + // b == nil only when we're not using MPT-backed storage, therefore + // the second attempt won't stop here. + if s.config.SessionBackedByMPT && b == nil { + ic.Finalize() + b, err = s.getFakeNextBlock(ic.Block.Index) + if err != nil { + return nil, response.NewInternalServerError(fmt.Sprintf("unable to prepare block for historic call: %s", err)) + } + // Rerun with MPT-backed storage. + return s.runScriptInVM(t, script, contractScriptHash, tx, b, verbose) + } + id = uuid.New() + sessionID := id.String() + sess.finalize = ic.Finalize + sess.timer = time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { s.sessionsLock.Lock() + defer s.sessionsLock.Unlock() + if len(s.sessions) == 0 { + return + } sess, ok := s.sessions[sessionID] if !ok { - if len(s.sessions) >= s.config.SessionPoolSize { - return uuid.UUID{}, errors.New("max capacity reached") - } - timer := time.AfterFunc(time.Second*time.Duration(s.config.SessionExpirationTime), func() { - s.sessionsLock.Lock() - defer s.sessionsLock.Unlock() - if len(s.sessions) == 0 { - return - } - sess, ok := s.sessions[sessionID] - if !ok { - return - } - sess.iteratorsLock.Lock() - if sess.finalize != nil { - sess.finalize() - } - delete(s.sessions, sessionID) - sess.iteratorsLock.Unlock() - }) - sess = &session{ - finalize: finalize, - timer: timer, - } - if s.config.SessionBackedByMPT { - sess.params = &invocationParams{ - Trigger: t, - Script: script, - ContractScriptHash: contractScriptHash, - Transaction: tx, - NextBlockHeight: ic.Block.Index, - } - // Call finalizer manually if MPT-based iterator sessions are enabled. If disabled, then register finalizator. - if finalize != nil { - finalize() - sess.finalize = nil - } - item = nil - } + return } - sess.iteratorIdentifiers = append(sess.iteratorIdentifiers, &iteratorIdentifier{ - ID: iterID.String(), - Item: item, - StackIndex: stackIndex, - }) - s.sessions[sessionID] = sess + sess.iteratorsLock.Lock() + sess.finalize() + delete(s.sessions, sessionID) + sess.iteratorsLock.Unlock() + }) + s.sessionsLock.Lock() + if len(s.sessions) >= s.config.SessionPoolSize { + ic.Finalize() s.sessionsLock.Unlock() - return iterID, nil + return nil, response.NewInternalServerError("max session capacity reached") + } + s.sessions[sessionID] = sess + s.sessionsLock.Unlock() + } else { + ic.Finalize() + } + var diag *result.InvokeDiag + tree := ic.VM.GetInvocationTree() + if tree != nil { + diag = &result.InvokeDiag{ + Invocations: tree.Calls, + Changes: storage.BatchToOperations(ic.DAO.GetBatch()), } } - return result.NewInvoke(ic, script, faultException, registerIterator, s.config.MaxIteratorResultItems), nil + notifications := ic.Notifications + if notifications == nil { + notifications = make([]state.NotificationEvent, 0) + } + res := &result.Invoke{ + State: ic.VM.State().String(), + GasConsumed: ic.VM.GasConsumed(), + Script: script, + Stack: items, + FaultException: faultException, + Notifications: notifications, + Diagnostics: diag, + Session: id, + } + + return res, nil +} + +// postProcessExecStack changes iterator interop items according to the server configuration. +// It does modifications in-place, but it returns a session if any iterator was registered. +func (s *Server) postProcessExecStack(stack []stackitem.Item) *session { + var sess session + + for i, v := range stack { + var id uuid.UUID + + stack[i], id = s.registerOrDumpIterator(v) + if id != (uuid.UUID{}) { + sess.iteratorIdentifiers = append(sess.iteratorIdentifiers, &iteratorIdentifier{ + ID: id.String(), + Item: v, + }) + } + } + if len(sess.iteratorIdentifiers) != 0 { + return &sess + } + return nil +} + +// registerOrDumpIterator changes iterator interop stack items into result.Iterator +// interop stack items and returns a uuid for it if sessions are enabled. All the other stack +// items are not changed. +func (s *Server) registerOrDumpIterator(item stackitem.Item) (stackitem.Item, uuid.UUID) { + var iterID uuid.UUID + + if (item.Type() != stackitem.InteropT) || !iterator.IsIterator(item) { + return item, iterID + } + var resIterator result.Iterator + + if s.config.SessionEnabled { + iterID = uuid.New() + resIterator.ID = &iterID + } else { + resIterator.Values, resIterator.Truncated = iterator.ValuesTruncated(item, s.config.MaxIteratorResultItems) + } + return stackitem.NewInterop(resIterator), iterID } func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *response.Error) { @@ -2132,43 +2169,11 @@ func (s *Server) traverseIterator(reqParams params.Params) (interface{}, *respon s.sessionsLock.Unlock() var ( - iIDStr = iID.String() - iVals []stackitem.Item - respErr *response.Error + iIDStr = iID.String() + iVals []stackitem.Item ) for _, it := range session.iteratorIdentifiers { if iIDStr == it.ID { - // If SessionBackedByMPT is enabled, then use MPT-backed historic call to retrieve and traverse iterator. - // Otherwise, iterator stackitem is ready and can be used. - if s.config.SessionBackedByMPT && it.Item == nil { - var ( - b *block.Block - ic *interop.Context - ) - b, err = s.getFakeNextBlock(session.params.NextBlockHeight) - if err != nil { - session.iteratorsLock.Unlock() - return nil, response.NewInternalServerError(fmt.Sprintf("unable to prepare block for historic call: %s", err)) - } - ic, respErr = s.prepareInvocationContext(session.params.Trigger, session.params.Script, session.params.ContractScriptHash, session.params.Transaction, b, false) - if respErr != nil { - session.iteratorsLock.Unlock() - return nil, respErr - } - _ = ic.VM.Run() // No error check because FAULTed invocations could also contain iterator on stack. - stack := ic.VM.Estack().ToArray() - - // Fill in the whole set of iterators for the current session in order not to repeat test invocation one more time for other session iterators. - for _, itID := range session.iteratorIdentifiers { - j := itID.StackIndex - if (stack[j].Type() != stackitem.InteropT) || !iterator.IsIterator(stack[j]) { - session.iteratorsLock.Unlock() - return nil, response.NewInternalServerError(fmt.Sprintf("inconsistent historic call result: expected %s, got %s at stack position #%d", stackitem.InteropT, stack[j].Type(), j)) - } - session.iteratorIdentifiers[j].Item = stack[j] - } - session.finalize = ic.Finalize - } iVals = iterator.Values(it.Item, count) break } @@ -2201,9 +2206,7 @@ func (s *Server) terminateSession(reqParams params.Params) (interface{}, *respon // Iterators access Seek channel under the hood; finalizer closes this channel, thus, // we need to perform finalisation under iteratorsLock. session.iteratorsLock.Lock() - if session.finalize != nil { - session.finalize() - } + session.finalize() if !session.timer.Stop() { <-session.timer.C }