From 7e16bea126dc3423f5b1499521b25f767a3ba113 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 25 Sep 2020 17:39:11 +0300 Subject: [PATCH] network: implement Oracle module --- pkg/core/native/oracle.go | 3 +- pkg/core/oracle_test.go | 314 ++++++++++++++++++++++ pkg/services/oracle/network.go | 56 ++++ pkg/services/oracle/network_test.go | 18 ++ pkg/services/oracle/nodes.go | 69 +++++ pkg/services/oracle/oracle.go | 126 +++++++++ pkg/services/oracle/request.go | 117 ++++++++ pkg/services/oracle/response.go | 149 ++++++++++ pkg/services/oracle/testdata/oracle1.json | 1 + pkg/services/oracle/testdata/oracle2.json | 1 + pkg/services/oracle/transaction.go | 107 ++++++++ 11 files changed, 959 insertions(+), 2 deletions(-) create mode 100644 pkg/core/oracle_test.go create mode 100644 pkg/services/oracle/network.go create mode 100644 pkg/services/oracle/network_test.go create mode 100644 pkg/services/oracle/nodes.go create mode 100644 pkg/services/oracle/oracle.go create mode 100644 pkg/services/oracle/request.go create mode 100644 pkg/services/oracle/response.go create mode 100644 pkg/services/oracle/testdata/oracle1.json create mode 100644 pkg/services/oracle/testdata/oracle2.json create mode 100644 pkg/services/oracle/transaction.go diff --git a/pkg/core/native/oracle.go b/pkg/core/native/oracle.go index 9c84570b5..460b9222c 100644 --- a/pkg/core/native/oracle.go +++ b/pkg/core/native/oracle.go @@ -69,8 +69,7 @@ func newOracle() *Oracle { o := &Oracle{ContractMD: *interop.NewContractMD(nativenames.Oracle, oracleContractID)} w := io.NewBufBinWriter() - emit.Int(w.BinWriter, 0) - emit.Opcodes(w.BinWriter, opcode.NEWARRAY) + emit.Opcodes(w.BinWriter, opcode.NEWARRAY0) emit.Int(w.BinWriter, int64(callflag.All)) emit.String(w.BinWriter, "finish") emit.Bytes(w.BinWriter, o.Hash.BytesBE()) diff --git a/pkg/core/oracle_test.go b/pkg/core/oracle_test.go new file mode 100644 index 000000000..f6888f291 --- /dev/null +++ b/pkg/core/oracle_test.go @@ -0,0 +1,314 @@ +package core + +import ( + "bytes" + "errors" + gio "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strings" + "testing" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/native" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/services/oracle" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +const oracleModulePath = "../services/oracle/" + +func getTestOracle(t *testing.T, bc *Blockchain, walletPath, pass string) ( + *wallet.Account, + *oracle.Oracle, + map[uint64]*responseWithSig, + chan *transaction.Transaction) { + + m := make(map[uint64]*responseWithSig) + ch := make(chan *transaction.Transaction, 5) + orcCfg := oracle.Config{ + Log: zaptest.NewLogger(t), + Network: netmode.UnitTestNet, + Wallet: config.Wallet{ + Path: path.Join(oracleModulePath, walletPath), + Password: pass, + }, + Chain: bc, + Client: newDefaultHTTPClient(), + ResponseHandler: saveToMapBroadcaster{m}, + OnTransaction: saveTxToChan(ch), + URIValidator: func(u *url.URL) error { + if strings.HasPrefix(u.Host, "private") { + return errors.New("private network") + } + return nil + }, + OracleScript: bc.contracts.Oracle.NEF.Script, + OracleResponse: bc.contracts.Oracle.GetOracleResponseScript(), + OracleHash: bc.contracts.Oracle.Hash, + } + orc, err := oracle.NewOracle(orcCfg) + require.NoError(t, err) + + w, err := wallet.NewWalletFromFile(path.Join(oracleModulePath, walletPath)) + require.NoError(t, err) + require.NoError(t, w.Accounts[0].Decrypt(pass)) + return w.Accounts[0], orc, m, ch +} + +// Compatibility test from C# code. +// https://github.com/neo-project/neo-modules/blob/master/tests/Neo.Plugins.OracleService.Tests/UT_OracleService.cs#L61 +func TestCreateResponseTx(t *testing.T) { + bc := newTestChain(t) + defer bc.Close() + + require.Equal(t, int64(30), bc.GetBaseExecFee()) + require.Equal(t, int64(1000), bc.FeePerByte()) + acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle1.json", "one") + req := &state.OracleRequest{ + OriginalTxID: util.Uint256{}, + GasForResponse: 100000000, + URL: "https://127.0.0.1/test", + Filter: new(string), + CallbackContract: util.Uint160{}, + CallbackMethod: "callback", + UserData: []byte{}, + } + resp := &transaction.OracleResponse{ + ID: 1, + Code: transaction.Success, + Result: []byte{0}, + } + require.NoError(t, bc.contracts.Oracle.PutRequestInternal(1, req, bc.dao)) + orc.UpdateOracleNodes(keys.PublicKeys{acc.PrivateKey().PublicKey()}) + tx, err := orc.CreateResponseTx(int64(req.GasForResponse), 1, resp) + require.NoError(t, err) + assert.Equal(t, 167, tx.Size()) + assert.Equal(t, int64(2216640), tx.NetworkFee) + assert.Equal(t, int64(97783360), tx.SystemFee) +} + +func TestOracle(t *testing.T) { + bc := newTestChain(t) + defer bc.Close() + + oracleCtr := bc.contracts.Oracle + acc1, orc1, m1, ch1 := getTestOracle(t, bc, "./testdata/oracle1.json", "one") + acc2, orc2, m2, ch2 := getTestOracle(t, bc, "./testdata/oracle2.json", "two") + oracleNodes := keys.PublicKeys{acc1.PrivateKey().PublicKey(), acc2.PrivateKey().PublicKey()} + // Must be set in native contract for tx verification. + bc.setNodesByRole(t, true, native.RoleOracle, oracleNodes) + orc1.UpdateOracleNodes(oracleNodes.Copy()) + orc2.UpdateOracleNodes(oracleNodes.Copy()) + + cs := getOracleContractState(bc.contracts.Oracle.Hash) + require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs)) + + putOracleRequest(t, cs.Hash, bc, "http://get.1234", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.1234", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.timeout", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.notfound", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.forbidden", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://private.url", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.big", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.maxallowed", nil, "handle", []byte{}, 10_000_000) + putOracleRequest(t, cs.Hash, bc, "http://get.maxallowed", nil, "handle", []byte{}, 100_000_000) + + checkResp := func(t *testing.T, id uint64, resp *transaction.OracleResponse) *state.OracleRequest { + req, err := oracleCtr.GetRequestInternal(bc.dao, id) + require.NoError(t, err) + + reqs := map[uint64]*state.OracleRequest{id: req} + orc1.AddRequests(reqs) + require.NotNil(t, m1[id]) + require.Equal(t, resp, m1[id].resp) + require.Empty(t, ch1) + return req + } + + // Checks if tx is ready and valid. + checkEmitTx := func(t *testing.T, ch chan *transaction.Transaction) { + require.Len(t, ch, 1) + tx := <-ch + require.NoError(t, bc.verifyAndPoolTx(tx, bc.GetMemPool(), bc)) + } + + t.Run("NormalRequest", func(t *testing.T) { + resp := &transaction.OracleResponse{ + ID: 1, + Code: transaction.Success, + Result: []byte{1, 2, 3, 4}, + } + req := checkResp(t, 1, resp) + + reqs := map[uint64]*state.OracleRequest{1: req} + orc2.AddRequests(reqs) + require.Equal(t, resp, m2[1].resp) + require.Empty(t, ch2) + + orc1.AddResponse(acc2.PrivateKey().PublicKey(), m2[1].resp.ID, m2[1].txSig) + checkEmitTx(t, ch1) + + t.Run("FirstOtherThenMe", func(t *testing.T) { + const reqID = 2 + + resp := &transaction.OracleResponse{ + ID: reqID, + Code: transaction.Success, + Result: []byte{1, 2, 3, 4}, + } + req := checkResp(t, reqID, resp) + orc2.AddResponse(acc1.PrivateKey().PublicKey(), reqID, m1[reqID].txSig) + require.Empty(t, ch2) + + reqs := map[uint64]*state.OracleRequest{reqID: req} + orc2.AddRequests(reqs) + require.Equal(t, resp, m2[reqID].resp) + checkEmitTx(t, ch2) + }) + }) + t.Run("Invalid", func(t *testing.T) { + t.Run("Timeout", func(t *testing.T) { + checkResp(t, 3, &transaction.OracleResponse{ + ID: 3, + Code: transaction.Timeout, + }) + }) + t.Run("NotFound", func(t *testing.T) { + checkResp(t, 4, &transaction.OracleResponse{ + ID: 4, + Code: transaction.NotFound, + }) + }) + t.Run("Forbidden", func(t *testing.T) { + checkResp(t, 5, &transaction.OracleResponse{ + ID: 5, + Code: transaction.Forbidden, + }) + }) + t.Run("PrivateNetwork", func(t *testing.T) { + checkResp(t, 6, &transaction.OracleResponse{ + ID: 6, + Code: transaction.Forbidden, + }) + }) + t.Run("Big", func(t *testing.T) { + checkResp(t, 7, &transaction.OracleResponse{ + ID: 7, + Code: transaction.ResponseTooLarge, + }) + }) + t.Run("MaxAllowedSmallGAS", func(t *testing.T) { + checkResp(t, 8, &transaction.OracleResponse{ + ID: 8, + Code: transaction.InsufficientFunds, + }) + }) + }) + t.Run("MaxAllowedEnoughGAS", func(t *testing.T) { + checkResp(t, 9, &transaction.OracleResponse{ + ID: 9, + Code: transaction.Success, + Result: make([]byte, transaction.MaxOracleResultSize), + }) + }) +} + +type saveToMapBroadcaster struct { + m map[uint64]*responseWithSig +} + +func (b saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) { + b.m[resp.ID] = &responseWithSig{ + resp: resp, + txSig: txSig, + } +} + +type responseWithSig struct { + resp *transaction.OracleResponse + txSig []byte +} + +func saveTxToChan(ch chan *transaction.Transaction) oracle.TxCallback { + return func(tx *transaction.Transaction) { + ch <- tx + } +} + +type ( + // httpClient implements oracle.HTTPClient with + // mocked URL or responses. + httpClient struct { + responses map[string]testResponse + } + + testResponse struct { + code int + body []byte + } +) + +// Get implements oracle.HTTPClient interface. +func (c *httpClient) Get(url string) (*http.Response, error) { + resp, ok := c.responses[url] + if ok { + return &http.Response{ + StatusCode: resp.code, + Body: newResponseBody(resp.body), + }, nil + } + return nil, errors.New("error during request") +} + +func newDefaultHTTPClient() oracle.HTTPClient { + return &httpClient{ + responses: map[string]testResponse{ + "http://get.1234": { + code: http.StatusOK, + body: []byte{1, 2, 3, 4}, + }, + "http://get.4321": { + code: http.StatusOK, + body: []byte{4, 3, 2, 1}, + }, + "http://get.timeout": { + code: http.StatusRequestTimeout, + body: []byte{}, + }, + "http://get.notfound": { + code: http.StatusNotFound, + body: []byte{}, + }, + "http://get.forbidden": { + code: http.StatusForbidden, + body: []byte{}, + }, + "http://private.url": { + code: http.StatusOK, + body: []byte("passwords"), + }, + "http://get.big": { + code: http.StatusOK, + body: make([]byte, transaction.MaxOracleResultSize+1), + }, + "http://get.maxallowed": { + code: http.StatusOK, + body: make([]byte, transaction.MaxOracleResultSize), + }, + }, + } +} + +func newResponseBody(resp []byte) gio.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(resp)) +} diff --git a/pkg/services/oracle/network.go b/pkg/services/oracle/network.go new file mode 100644 index 000000000..3ab8e0705 --- /dev/null +++ b/pkg/services/oracle/network.go @@ -0,0 +1,56 @@ +package oracle + +import ( + "errors" + "net" + "net/url" +) + +// reservedCIDRs is a list of ip addresses for private networks. +// https://tools.ietf.org/html/rfc6890 +var reservedCIDRs = []string{ + // IPv4 + "10.0.0.0/8", + "100.64.0.0/10", + "172.16.0.0/12", + "192.0.0.0/24", + "192.168.0.0/16", + "198.18.0.0/15", + // IPv6 + "fc00::/7", +} + +var privateNets = make([]net.IPNet, 0, len(reservedCIDRs)) + +func init() { + for i := range reservedCIDRs { + _, ipNet, err := net.ParseCIDR(reservedCIDRs[i]) + if err != nil { + panic(err) + } + privateNets = append(privateNets, *ipNet) + } +} + +func defaultURIValidator(u *url.URL) error { + ip, err := net.ResolveIPAddr("ip", u.Hostname()) + if err != nil { + return err + } + if isReserved(ip.IP) { + return errors.New("IP is not global unicast") + } + return nil +} + +func isReserved(ip net.IP) bool { + if !ip.IsGlobalUnicast() { + return true + } + for i := range privateNets { + if privateNets[i].Contains(ip) { + return true + } + } + return false +} diff --git a/pkg/services/oracle/network_test.go b/pkg/services/oracle/network_test.go new file mode 100644 index 000000000..2e1791c5d --- /dev/null +++ b/pkg/services/oracle/network_test.go @@ -0,0 +1,18 @@ +package oracle + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIsReserved(t *testing.T) { + require.True(t, isReserved(net.IPv4zero)) + require.True(t, isReserved(net.IPv4(10, 0, 0, 1))) + require.True(t, isReserved(net.IPv4(192, 168, 0, 1))) + require.True(t, isReserved(net.IPv6interfacelocalallnodes)) + require.True(t, isReserved(net.IPv6loopback)) + + require.False(t, isReserved(net.IPv4(8, 8, 8, 8))) +} diff --git a/pkg/services/oracle/nodes.go b/pkg/services/oracle/nodes.go new file mode 100644 index 000000000..ee0434868 --- /dev/null +++ b/pkg/services/oracle/nodes.go @@ -0,0 +1,69 @@ +package oracle + +import ( + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/encoding/address" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/zap" +) + +// UpdateOracleNodes updates oracle nodes list. +func (o *Oracle) UpdateOracleNodes(oracleNodes keys.PublicKeys) { + o.accMtx.Lock() + defer o.accMtx.Unlock() + + old := o.oracleNodes + if isEqual := len(old) == len(oracleNodes); isEqual { + for i := range old { + if !old[i].Equal(oracleNodes[i]) { + isEqual = false + break + } + } + if isEqual { + return + } + } + + var acc *wallet.Account + for i := range oracleNodes { + acc = o.wallet.GetAccount(oracleNodes[i].GetScriptHash()) + if acc != nil { + if acc.PrivateKey() != nil { + break + } + err := acc.Decrypt(o.Wallet.Password) + if err != nil { + o.Log.Error("can't unlock account", + zap.String("address", address.Uint160ToString(acc.Contract.ScriptHash())), + zap.Error(err)) + o.currAccount = nil + return + } + break + } + } + + o.currAccount = acc + o.oracleSignContract, _ = smartcontract.CreateDefaultMultiSigRedeemScript(oracleNodes) + o.oracleNodes = oracleNodes +} + +func (o *Oracle) getAccount() *wallet.Account { + o.accMtx.RLock() + defer o.accMtx.RUnlock() + return o.currAccount +} + +func (o *Oracle) getOracleNodes() keys.PublicKeys { + o.accMtx.RLock() + defer o.accMtx.RUnlock() + return o.oracleNodes +} + +func (o *Oracle) getOracleSignContract() []byte { + o.accMtx.RLock() + defer o.accMtx.RUnlock() + return o.oracleSignContract +} diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go new file mode 100644 index 000000000..64682c196 --- /dev/null +++ b/pkg/services/oracle/oracle.go @@ -0,0 +1,126 @@ +package oracle + +import ( + "net/http" + "net/url" + "sync" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "go.uber.org/zap" +) + +type ( + // Oracle represents oracle module capable of talking + // with the external world. + Oracle struct { + Config + + // mtx protects setting callbacks. + mtx sync.RWMutex + + // accMtx protects account and oracle nodes. + accMtx sync.RWMutex + currAccount *wallet.Account + oracleNodes keys.PublicKeys + oracleSignContract []byte + + // respMtx protects responses map. + respMtx sync.RWMutex + responses map[uint64]*incompleteTx + + wallet *wallet.Wallet + } + + // Config contains oracle module parameters. + Config struct { + Log *zap.Logger + Network netmode.Magic + Wallet config.Wallet + Client HTTPClient + Chain blockchainer.Blockchainer + ResponseHandler Broadcaster + OnTransaction TxCallback + URIValidator URIValidator + OracleScript []byte + OracleResponse []byte + OracleHash util.Uint160 + } + + // HTTPClient is an interface capable of doing oracle requests. + HTTPClient interface { + Get(string) (*http.Response, error) + } + + // Broadcaster broadcasts oracle responses. + Broadcaster interface { + SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) + } + + defaultResponseHandler struct{} + + // TxCallback executes on new transactions when they are ready to be pooled. + TxCallback = func(tx *transaction.Transaction) + // URIValidator is used to check if provided URL is valid. + URIValidator = func(*url.URL) error +) + +const ( + // defaultRequestTimeout is default request timeout. + defaultRequestTimeout = time.Second * 5 +) + +// NewOracle returns new oracle instance. +func NewOracle(cfg Config) (*Oracle, error) { + o := &Oracle{ + Config: cfg, + + responses: make(map[uint64]*incompleteTx), + } + + var err error + w := cfg.Wallet + if o.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil { + return nil, err + } + + if o.Client == nil { + var client http.Client + client.Transport = &http.Transport{DisableKeepAlives: true} + client.Timeout = defaultRequestTimeout + o.Client = &client + } + if o.ResponseHandler == nil { + o.ResponseHandler = defaultResponseHandler{} + } + if o.OnTransaction == nil { + o.OnTransaction = func(*transaction.Transaction) {} + } + if o.URIValidator == nil { + o.URIValidator = defaultURIValidator + } + return o, nil +} + +func (o *Oracle) getBroadcaster() Broadcaster { + o.mtx.RLock() + defer o.mtx.RUnlock() + return o.ResponseHandler +} + +// SetBroadcaster sets callback to broadcast response. +func (o *Oracle) SetBroadcaster(b Broadcaster) { + o.mtx.Lock() + defer o.mtx.Unlock() + o.ResponseHandler = b +} + +// SendResponse implements Broadcaster interface. +func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) { +} diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go new file mode 100644 index 000000000..a3b5f232b --- /dev/null +++ b/pkg/services/oracle/request.go @@ -0,0 +1,117 @@ +package oracle + +import ( + "errors" + "net/http" + "net/url" + + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "go.uber.org/zap" +) + +// RemoveRequests removes all data associated with requests +// which have been processed by oracle contract. +func (o *Oracle) RemoveRequests(ids []uint64) { + o.respMtx.Lock() + defer o.respMtx.Unlock() + for _, id := range ids { + delete(o.responses, id) + } +} + +// AddRequests saves all requests in-fly for further processing. +func (o *Oracle) AddRequests(reqs map[uint64]*state.OracleRequest) { + acc := o.getAccount() + if acc == nil { + return + } + + // Process actual requests. + for id := range reqs { + if err := o.processRequest(acc.PrivateKey(), id, reqs[id]); err != nil { + o.Log.Debug("can't process request", zap.Error(err)) + } + } +} + +func (o *Oracle) processRequest(priv *keys.PrivateKey, id uint64, req *state.OracleRequest) error { + resp := &transaction.OracleResponse{ID: id} + u, err := url.ParseRequestURI(req.URL) + if err == nil { + err = o.URIValidator(u) + } + if err != nil { + resp.Code = transaction.Forbidden + } else if u.Scheme == "http" { + r, err := o.Client.Get(req.URL) + switch { + case err != nil: + resp.Code = transaction.Error + case r.StatusCode == http.StatusOK: + result, err := readResponse(r.Body, transaction.MaxOracleResultSize) + if err != nil { + if errors.Is(err, ErrResponseTooLarge) { + resp.Code = transaction.ResponseTooLarge + } else { + resp.Code = transaction.Error + } + break + } + resp.Code = transaction.Success + resp.Result = result + case r.StatusCode == http.StatusForbidden: + resp.Code = transaction.Forbidden + case r.StatusCode == http.StatusNotFound: + resp.Code = transaction.NotFound + case r.StatusCode == http.StatusRequestTimeout: + resp.Code = transaction.Timeout + default: + resp.Code = transaction.Error + } + } + + currentHeight := o.Chain.BlockHeight() + _, h, err := o.Chain.GetTransaction(req.OriginalTxID) + if err != nil { + if !errors.Is(err, storage.ErrKeyNotFound) { + return err + } + // The only reason tx can be not found is if it wasn't yet persisted from DAO. + h = currentHeight + } + tx, err := o.CreateResponseTx(int64(req.GasForResponse), h, resp) + if err != nil { + return err + } + backupTx, err := o.CreateResponseTx(int64(req.GasForResponse), h, &transaction.OracleResponse{ + ID: id, + Code: transaction.ConsensusUnreachable, + }) + if err != nil { + return err + } + + incTx := o.getResponse(id) + incTx.Lock() + incTx.tx = tx + incTx.backupTx = backupTx + incTx.reverifyTx() + + txSig := priv.Sign(tx.GetSignedPart()) + incTx.addResponse(priv.PublicKey(), txSig, false) + + backupSig := priv.Sign(backupTx.GetSignedPart()) + incTx.addResponse(priv.PublicKey(), backupSig, true) + + readyTx, ready := incTx.finalize(o.getOracleNodes()) + incTx.Unlock() + + o.getBroadcaster().SendResponse(priv, resp, txSig) + if ready { + o.OnTransaction(readyTx) + } + return nil +} diff --git a/pkg/services/oracle/response.go b/pkg/services/oracle/response.go new file mode 100644 index 000000000..bb9f0896a --- /dev/null +++ b/pkg/services/oracle/response.go @@ -0,0 +1,149 @@ +package oracle + +import ( + "encoding/hex" + "errors" + gio "io" + + "github.com/nspcc-dev/neo-go/pkg/core/fee" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/hash" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" + "github.com/nspcc-dev/neo-go/pkg/vm" + "go.uber.org/zap" +) + +func (o *Oracle) getResponse(reqID uint64) *incompleteTx { + o.respMtx.Lock() + defer o.respMtx.Unlock() + incTx, ok := o.responses[reqID] + if !ok { + incTx = newIncompleteTx() + o.responses[reqID] = incTx + } + return incTx +} + +// AddResponse processes oracle response from node pub. +// sig is response transaction signature. +func (o *Oracle) AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) { + incTx := o.getResponse(reqID) + + incTx.Lock() + isBackup := false + if incTx.tx != nil { + ok := pub.Verify(txSig, incTx.tx.GetSignedHash().BytesBE()) + if !ok { + ok = pub.Verify(txSig, incTx.backupTx.GetSignedHash().BytesBE()) + if !ok { + o.Log.Debug("invalid response signature", + zap.String("pub", hex.EncodeToString(pub.Bytes()))) + incTx.Unlock() + return + } + isBackup = true + } + } + incTx.addResponse(pub, txSig, isBackup) + readyTx, ready := incTx.finalize(o.getOracleNodes()) + incTx.Unlock() + + if ready { + o.OnTransaction(readyTx) + } +} + +// ErrResponseTooLarge is returned when response exceeds max allowed size. +var ErrResponseTooLarge = errors.New("too big response") + +func readResponse(rc gio.ReadCloser, limit int) ([]byte, error) { + defer rc.Close() + + buf := make([]byte, limit+1) + n, err := gio.ReadFull(rc, buf) + if err == gio.ErrUnexpectedEOF && n <= limit { + return buf[:n], nil + } + if err == nil || n > limit { + return nil, ErrResponseTooLarge + } + return nil, err +} + +// CreateResponseTx creates unsigned oracle response transaction. +func (o *Oracle) CreateResponseTx(gasForResponse int64, height uint32, resp *transaction.OracleResponse) (*transaction.Transaction, error) { + tx := transaction.New(o.Network, o.OracleResponse, 0) + tx.Nonce = uint32(resp.ID) + tx.ValidUntilBlock = height + transaction.MaxValidUntilBlockIncrement + tx.Attributes = []transaction.Attribute{{ + Type: transaction.OracleResponseT, + Value: resp, + }} + + oracleSignContract := o.getOracleSignContract() + tx.Signers = []transaction.Signer{ + { + Account: o.OracleHash, + Scopes: transaction.None, + }, + { + Account: hash.Hash160(oracleSignContract), + Scopes: transaction.None, + }, + } + tx.Scripts = []transaction.Witness{ + {}, // native contract witness is fixed, second witness is set later. + } + + // Calculate network fee. + size := io.GetVarSize(tx) + tx.Scripts = append(tx.Scripts, transaction.Witness{VerificationScript: oracleSignContract}) + + gasConsumed, ok := o.testVerify(tx) + if !ok { + return nil, errors.New("can't verify transaction") + } + tx.NetworkFee += gasConsumed + + netFee, sizeDelta := fee.Calculate(o.Chain.GetPolicer().GetBaseExecFee(), tx.Scripts[1].VerificationScript) + tx.NetworkFee += netFee + size += sizeDelta + + currNetFee := tx.NetworkFee + int64(size)*o.Chain.FeePerByte() + if currNetFee > gasForResponse { + attrSize := io.GetVarSize(tx.Attributes) + resp.Code = transaction.InsufficientFunds + resp.Result = nil + size = size - attrSize + io.GetVarSize(tx.Attributes) + } + tx.NetworkFee += int64(size) * o.Chain.FeePerByte() // 233 + + // Calculate system fee. + tx.SystemFee = gasForResponse - tx.NetworkFee + return tx, nil +} + +func (o *Oracle) testVerify(tx *transaction.Transaction) (int64, bool) { + v := o.Chain.GetTestVM(trigger.Verification, tx, nil) + v.GasLimit = o.Chain.GetPolicer().GetMaxVerificationGAS() + v.LoadScriptWithHash(o.OracleScript, o.OracleHash, callflag.ReadStates) + v.Estack().PushVal(manifest.MethodVerify) + + ok := isVerifyOk(v) + return v.GasConsumed(), ok +} + +func isVerifyOk(v *vm.VM) bool { + if err := v.Run(); err != nil { + return false + } + if v.Estack().Len() != 1 { + return false + } + ok, err := v.Estack().Pop().Item().TryBool() + return err == nil && ok +} diff --git a/pkg/services/oracle/testdata/oracle1.json b/pkg/services/oracle/testdata/oracle1.json new file mode 100644 index 000000000..ae817cdbf --- /dev/null +++ b/pkg/services/oracle/testdata/oracle1.json @@ -0,0 +1 @@ +{"version":"3.0","accounts":[{"address":"NMy1PN9GCdGc26YFG7JruYg7UBStw2pPKN","key":"6PYML6dDTMXJBD7ywRwiCAhseCPToWkMfvPUViuxiXM6s5oi7ggf4ho3AK","label":"","contract":{"script":"DCEDNxK01e1DnGA+TiGU3H4DKUuGliSz89/NuZCbVvA2u0wLQZVEDXg=","parameters":[{"name":"parameter0","type":"Signature"}],"deployed":false},"lock":false,"isdefault":false}],"scrypt":{"n":16384,"r":8,"p":8},"extra":{"Tokens":null}} diff --git a/pkg/services/oracle/testdata/oracle2.json b/pkg/services/oracle/testdata/oracle2.json new file mode 100644 index 000000000..d5d131b5f --- /dev/null +++ b/pkg/services/oracle/testdata/oracle2.json @@ -0,0 +1 @@ +{"version":"3.0","accounts":[{"address":"NU7QxQXULbmZU7kaWUaeF3r9v3zimU42bV","key":"6PYKv77p5wihN64XaPB5Nbci1sCLV5CrzSu8GKv7UHXHRtytfLt8zfrMgT","label":"","contract":{"script":"DCEDEXzwIl4Jhvsj98GYIPFFiedeb1QdP8T79uSBSDNsiswLQZVEDXg=","parameters":[{"name":"parameter0","type":"Signature"}],"deployed":false},"lock":false,"isdefault":false}],"scrypt":{"n":16384,"r":8,"p":8},"extra":{"Tokens":null}} diff --git a/pkg/services/oracle/transaction.go b/pkg/services/oracle/transaction.go new file mode 100644 index 000000000..7f64d5bd3 --- /dev/null +++ b/pkg/services/oracle/transaction.go @@ -0,0 +1,107 @@ +package oracle + +import ( + "sync" + + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/vm/emit" +) + +type ( + incompleteTx struct { + sync.RWMutex + // tx is oracle response transaction. + tx *transaction.Transaction + // sigs contains signature from every oracle node. + sigs map[string]*txSignature + // backupTx is backup transaction. + backupTx *transaction.Transaction + // backupSigs contains signatures of backup tx. + backupSigs map[string]*txSignature + } + + txSignature struct { + // pub is cached public key. + pub *keys.PublicKey + // ok is true if signature was verified. + ok bool + // sig is tx signature. + sig []byte + } +) + +func newIncompleteTx() *incompleteTx { + return &incompleteTx{ + sigs: make(map[string]*txSignature), + backupSigs: make(map[string]*txSignature), + } +} + +func (t *incompleteTx) reverifyTx() { + txHash := t.tx.GetSignedHash() + backupHash := t.backupTx.GetSignedHash() + for pub, sig := range t.sigs { + if !sig.ok { + sig.ok = sig.pub.Verify(sig.sig, txHash.BytesBE()) + if !sig.ok && sig.pub.Verify(sig.sig, backupHash.BytesBE()) { + t.backupSigs[pub] = &txSignature{ + pub: sig.pub, + ok: true, + sig: sig.sig, + } + } + } + } +} + +func (t *incompleteTx) addResponse(pub *keys.PublicKey, sig []byte, isBackup bool) { + tx, sigs := t.tx, t.sigs + if isBackup { + tx, sigs = t.backupTx, t.backupSigs + } + sigs[string(pub.Bytes())] = &txSignature{ + pub: pub, + ok: tx != nil, + sig: sig, + } + +} + +// finalize checks is either main or backup tx has sufficient number of signatures and returns +// tx and bool value indicating if it is ready to be broadcasted. +func (t *incompleteTx) finalize(oracleNodes keys.PublicKeys) (*transaction.Transaction, bool) { + if finalizeTx(oracleNodes, t.tx, t.sigs) { + return t.tx, true + } + return t.backupTx, finalizeTx(oracleNodes, t.backupTx, t.backupSigs) +} + +func finalizeTx(oracleNodes keys.PublicKeys, tx *transaction.Transaction, txSigs map[string]*txSignature) bool { + if tx == nil { + return false + } + m := smartcontract.GetDefaultHonestNodeCount(len(oracleNodes)) + sigs := make([][]byte, 0, m) + for _, pub := range oracleNodes { + sig, ok := txSigs[string(pub.Bytes())] + if ok && sig.ok { + sigs = append(sigs, sig.sig) + if len(sigs) == m { + break + } + } + } + if len(sigs) != m { + return false + } + + w := io.NewBufBinWriter() + for i := range sigs { + emit.Bytes(w.BinWriter, sigs[i]) + } + tx.Scripts[1].InvocationScript = w.Bytes() + return true +}