2020-09-25 14:39:11 +00:00
|
|
|
package oracle
|
|
|
|
|
|
|
|
import (
|
2020-09-28 11:58:04 +00:00
|
|
|
"errors"
|
2020-09-25 14:39:11 +00:00
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
|
2020-09-28 11:58:04 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
2020-09-25 14:39:11 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
2021-07-18 13:32:10 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
2020-09-25 14:39:11 +00:00
|
|
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
|
|
|
// Oracle represents oracle module capable of talking
|
|
|
|
// with the external world.
|
|
|
|
Oracle struct {
|
|
|
|
Config
|
|
|
|
|
2021-02-15 14:06:00 +00:00
|
|
|
// This fields are readonly thus not protected by mutex.
|
|
|
|
oracleHash util.Uint160
|
|
|
|
oracleResponse []byte
|
|
|
|
oracleScript []byte
|
|
|
|
verifyOffset int
|
|
|
|
|
2020-09-25 14:39:11 +00:00
|
|
|
// mtx protects setting callbacks.
|
|
|
|
mtx sync.RWMutex
|
|
|
|
|
|
|
|
// accMtx protects account and oracle nodes.
|
|
|
|
accMtx sync.RWMutex
|
|
|
|
currAccount *wallet.Account
|
|
|
|
oracleNodes keys.PublicKeys
|
|
|
|
oracleSignContract []byte
|
|
|
|
|
2020-09-28 11:58:04 +00:00
|
|
|
close chan struct{}
|
2020-10-07 07:48:41 +00:00
|
|
|
requestCh chan request
|
2020-09-28 11:58:04 +00:00
|
|
|
requestMap chan map[uint64]*state.OracleRequest
|
|
|
|
|
2021-07-19 18:51:31 +00:00
|
|
|
// respMtx protects responses and pending maps.
|
|
|
|
respMtx sync.RWMutex
|
|
|
|
// running is false until Run() is invoked.
|
|
|
|
running bool
|
|
|
|
// pending contains requests for not yet started service.
|
|
|
|
pending map[uint64]*state.OracleRequest
|
|
|
|
// responses contains active not completely processed requests.
|
2020-09-25 14:39:11 +00:00
|
|
|
responses map[uint64]*incompleteTx
|
2020-10-09 07:44:31 +00:00
|
|
|
// removed contains ids of requests which won't be processed further due to expiration.
|
|
|
|
removed map[uint64]bool
|
2020-09-25 14:39:11 +00:00
|
|
|
|
|
|
|
wallet *wallet.Wallet
|
|
|
|
}
|
|
|
|
|
|
|
|
// Config contains oracle module parameters.
|
|
|
|
Config struct {
|
|
|
|
Log *zap.Logger
|
|
|
|
Network netmode.Magic
|
2020-09-28 11:58:04 +00:00
|
|
|
MainCfg config.OracleConfiguration
|
2020-09-25 14:39:11 +00:00
|
|
|
Client HTTPClient
|
|
|
|
Chain blockchainer.Blockchainer
|
|
|
|
ResponseHandler Broadcaster
|
|
|
|
OnTransaction TxCallback
|
|
|
|
URIValidator URIValidator
|
|
|
|
}
|
|
|
|
|
|
|
|
// HTTPClient is an interface capable of doing oracle requests.
|
|
|
|
HTTPClient interface {
|
2021-05-07 13:36:16 +00:00
|
|
|
Do(*http.Request) (*http.Response, error)
|
2020-09-25 14:39:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Broadcaster broadcasts oracle responses.
|
|
|
|
Broadcaster interface {
|
|
|
|
SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte)
|
2020-10-08 11:50:10 +00:00
|
|
|
Run()
|
|
|
|
Shutdown()
|
2020-09-25 14:39:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
defaultResponseHandler struct{}
|
|
|
|
|
|
|
|
// TxCallback executes on new transactions when they are ready to be pooled.
|
|
|
|
TxCallback = func(tx *transaction.Transaction)
|
|
|
|
// URIValidator is used to check if provided URL is valid.
|
|
|
|
URIValidator = func(*url.URL) error
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
// defaultRequestTimeout is default request timeout.
|
|
|
|
defaultRequestTimeout = time.Second * 5
|
2020-10-09 07:44:31 +00:00
|
|
|
|
|
|
|
// defaultMaxTaskTimeout is default timeout for the request to be dropped if it can't be processed.
|
|
|
|
defaultMaxTaskTimeout = time.Hour
|
|
|
|
|
|
|
|
// defaultRefreshInterval is default timeout for the failed request to be reprocessed.
|
|
|
|
defaultRefreshInterval = time.Minute * 3
|
2020-09-25 14:39:11 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// NewOracle returns new oracle instance.
|
|
|
|
func NewOracle(cfg Config) (*Oracle, error) {
|
|
|
|
o := &Oracle{
|
|
|
|
Config: cfg,
|
|
|
|
|
2020-09-28 11:58:04 +00:00
|
|
|
close: make(chan struct{}),
|
|
|
|
requestMap: make(chan map[uint64]*state.OracleRequest, 1),
|
2021-07-19 18:51:31 +00:00
|
|
|
pending: make(map[uint64]*state.OracleRequest),
|
2020-09-28 11:58:04 +00:00
|
|
|
responses: make(map[uint64]*incompleteTx),
|
2020-10-09 07:44:31 +00:00
|
|
|
removed: make(map[uint64]bool),
|
2020-09-28 11:58:04 +00:00
|
|
|
}
|
|
|
|
if o.MainCfg.RequestTimeout == 0 {
|
|
|
|
o.MainCfg.RequestTimeout = defaultRequestTimeout
|
2020-09-25 14:39:11 +00:00
|
|
|
}
|
2021-04-06 14:08:04 +00:00
|
|
|
if o.MainCfg.NeoFS.Timeout == 0 {
|
|
|
|
o.MainCfg.NeoFS.Timeout = defaultRequestTimeout
|
|
|
|
}
|
2020-10-07 07:48:41 +00:00
|
|
|
if o.MainCfg.MaxConcurrentRequests == 0 {
|
|
|
|
o.MainCfg.MaxConcurrentRequests = defaultMaxConcurrentRequests
|
|
|
|
}
|
|
|
|
o.requestCh = make(chan request, o.MainCfg.MaxConcurrentRequests)
|
2020-10-09 07:44:31 +00:00
|
|
|
if o.MainCfg.MaxTaskTimeout == 0 {
|
|
|
|
o.MainCfg.MaxTaskTimeout = defaultMaxTaskTimeout
|
|
|
|
}
|
|
|
|
if o.MainCfg.RefreshInterval == 0 {
|
|
|
|
o.MainCfg.RefreshInterval = defaultRefreshInterval
|
|
|
|
}
|
2020-09-25 14:39:11 +00:00
|
|
|
|
|
|
|
var err error
|
2020-09-28 11:58:04 +00:00
|
|
|
w := cfg.MainCfg.UnlockWallet
|
2020-09-25 14:39:11 +00:00
|
|
|
if o.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-09-28 11:58:04 +00:00
|
|
|
haveAccount := false
|
|
|
|
for _, acc := range o.wallet.Accounts {
|
2021-06-04 11:27:22 +00:00
|
|
|
if err := acc.Decrypt(w.Password, o.wallet.Scrypt); err == nil {
|
2020-09-28 11:58:04 +00:00
|
|
|
haveAccount = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !haveAccount {
|
|
|
|
return nil, errors.New("no wallet account could be unlocked")
|
|
|
|
}
|
|
|
|
|
2020-09-25 14:39:11 +00:00
|
|
|
if o.Client == nil {
|
|
|
|
var client http.Client
|
|
|
|
client.Transport = &http.Transport{DisableKeepAlives: true}
|
2020-09-28 11:58:04 +00:00
|
|
|
client.Timeout = o.MainCfg.RequestTimeout
|
2020-09-25 14:39:11 +00:00
|
|
|
o.Client = &client
|
|
|
|
}
|
|
|
|
if o.ResponseHandler == nil {
|
|
|
|
o.ResponseHandler = defaultResponseHandler{}
|
|
|
|
}
|
|
|
|
if o.OnTransaction == nil {
|
|
|
|
o.OnTransaction = func(*transaction.Transaction) {}
|
|
|
|
}
|
|
|
|
if o.URIValidator == nil {
|
|
|
|
o.URIValidator = defaultURIValidator
|
|
|
|
}
|
|
|
|
return o, nil
|
|
|
|
}
|
|
|
|
|
2020-09-28 11:58:04 +00:00
|
|
|
// Shutdown shutdowns Oracle.
|
|
|
|
func (o *Oracle) Shutdown() {
|
|
|
|
close(o.close)
|
2020-10-08 11:50:10 +00:00
|
|
|
o.getBroadcaster().Shutdown()
|
2020-09-28 11:58:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Run runs must be executed in a separate goroutine.
|
|
|
|
func (o *Oracle) Run() {
|
2021-07-19 18:51:31 +00:00
|
|
|
o.respMtx.Lock()
|
|
|
|
if o.running {
|
|
|
|
o.respMtx.Unlock()
|
|
|
|
return
|
|
|
|
}
|
2021-04-02 10:13:26 +00:00
|
|
|
o.Log.Info("starting oracle service")
|
2021-07-19 18:51:31 +00:00
|
|
|
|
|
|
|
o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it.
|
|
|
|
o.pending = nil
|
|
|
|
o.running = true
|
|
|
|
o.respMtx.Unlock()
|
|
|
|
|
2020-10-07 07:48:41 +00:00
|
|
|
for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ {
|
|
|
|
go o.runRequestWorker()
|
|
|
|
}
|
2020-10-09 07:44:31 +00:00
|
|
|
|
|
|
|
tick := time.NewTicker(o.MainCfg.RefreshInterval)
|
2020-09-28 11:58:04 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-o.close:
|
2020-10-09 07:44:31 +00:00
|
|
|
tick.Stop()
|
2020-09-28 11:58:04 +00:00
|
|
|
return
|
2020-10-09 07:44:31 +00:00
|
|
|
case <-tick.C:
|
|
|
|
var reprocess []uint64
|
services: fix Oracle responces mutex
Solves the following problem:
=== RUN TestOracleFull
logger.go:130: 2021-02-04T09:25:16.305Z INFO P2PNotaryRequestPayloadPool size is not set or wrong, setting default value {"P2PNotaryRequestPayloadPoolSize": 1000}
logger.go:130: 2021-02-04T09:25:16.306Z INFO no storage version found! creating genesis block
logger.go:130: 2021-02-04T09:25:27.687Z DEBUG done processing headers {"headerIndex": 1, "blockHeight": 0, "took": "2.413398ms"}
logger.go:130: 2021-02-04T09:25:27.696Z DEBUG done processing headers {"headerIndex": 2, "blockHeight": 1, "took": "1.138196ms"}
logger.go:130: 2021-02-04T09:25:28.680Z INFO blockchain persist completed {"persistedBlocks": 2, "persistedKeys": 173, "headerHeight": 2, "blockHeight": 2, "took": "166.793µs"}
fatal error: sync: Unlock of unlocked RWMutex
goroutine 6157 [running]:
runtime.throw(0x115dfdb, 0x20)
/usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc000297ca0 sp=0xc000297c70 pc=0x44f432
sync.throw(0x115dfdb, 0x20)
/usr/local/go/src/runtime/panic.go:1102 +0x35 fp=0xc000297cc0 sp=0xc000297ca0 pc=0x44f3b5
sync.(*RWMutex).Unlock(0xc000135300)
/usr/local/go/src/sync/rwmutex.go:129 +0xf3 fp=0xc000297d00 sp=0xc000297cc0 pc=0x4a1ac3
github.com/nspcc-dev/neo-go/pkg/services/oracle.(*Oracle).Run(0xc000135180)
/go/src/github.com/nspcc-dev/neo-go/pkg/services/oracle/oracle.go:189 +0x82b fp=0xc000297fd8 sp=0xc000297d00 pc=0xe13b0b
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc000297fe0 sp=0xc000297fd8 pc=0x4834d1
created by github.com/nspcc-dev/neo-go/pkg/core.TestOracleFull
/go/src/github.com/nspcc-dev/neo-go/pkg/core/oracle_test.go:276 +0x3f1
2021-02-04 09:42:48 +00:00
|
|
|
o.respMtx.Lock()
|
2020-10-09 07:44:31 +00:00
|
|
|
o.removed = make(map[uint64]bool)
|
|
|
|
for id, incTx := range o.responses {
|
|
|
|
incTx.RLock()
|
|
|
|
since := time.Since(incTx.time)
|
|
|
|
if since > o.MainCfg.MaxTaskTimeout {
|
|
|
|
o.removed[id] = true
|
|
|
|
} else if since > o.MainCfg.RefreshInterval {
|
|
|
|
reprocess = append(reprocess, id)
|
|
|
|
}
|
|
|
|
incTx.RUnlock()
|
|
|
|
}
|
|
|
|
for id := range o.removed {
|
|
|
|
delete(o.responses, id)
|
|
|
|
}
|
|
|
|
o.respMtx.Unlock()
|
|
|
|
|
|
|
|
for _, id := range reprocess {
|
|
|
|
o.requestCh <- request{ID: id}
|
|
|
|
}
|
2020-09-28 11:58:04 +00:00
|
|
|
case reqs := <-o.requestMap:
|
2020-10-07 07:48:41 +00:00
|
|
|
for id, req := range reqs {
|
|
|
|
o.requestCh <- request{
|
|
|
|
ID: id,
|
|
|
|
Req: req,
|
|
|
|
}
|
|
|
|
}
|
2020-09-28 11:58:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-15 14:06:00 +00:00
|
|
|
// UpdateNativeContract updates native oracle contract info for tx verification.
|
|
|
|
func (o *Oracle) UpdateNativeContract(script, resp []byte, h util.Uint160, verifyOffset int) {
|
2021-07-18 13:32:10 +00:00
|
|
|
o.oracleScript = slice.Copy(script)
|
|
|
|
o.oracleResponse = slice.Copy(resp)
|
2021-02-15 14:06:00 +00:00
|
|
|
|
|
|
|
o.oracleHash = h
|
|
|
|
o.verifyOffset = verifyOffset
|
|
|
|
}
|
|
|
|
|
2020-09-28 11:58:04 +00:00
|
|
|
func (o *Oracle) getOnTransaction() TxCallback {
|
|
|
|
o.mtx.RLock()
|
|
|
|
defer o.mtx.RUnlock()
|
|
|
|
return o.OnTransaction
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetOnTransaction sets callback to pool and broadcast tx.
|
|
|
|
func (o *Oracle) SetOnTransaction(cb TxCallback) {
|
|
|
|
o.mtx.Lock()
|
|
|
|
defer o.mtx.Unlock()
|
|
|
|
o.OnTransaction = cb
|
|
|
|
}
|
|
|
|
|
2020-09-25 14:39:11 +00:00
|
|
|
func (o *Oracle) getBroadcaster() Broadcaster {
|
|
|
|
o.mtx.RLock()
|
|
|
|
defer o.mtx.RUnlock()
|
|
|
|
return o.ResponseHandler
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetBroadcaster sets callback to broadcast response.
|
|
|
|
func (o *Oracle) SetBroadcaster(b Broadcaster) {
|
|
|
|
o.mtx.Lock()
|
|
|
|
defer o.mtx.Unlock()
|
2020-10-08 11:50:10 +00:00
|
|
|
o.ResponseHandler.Shutdown()
|
2020-09-25 14:39:11 +00:00
|
|
|
o.ResponseHandler = b
|
2020-10-08 11:50:10 +00:00
|
|
|
go b.Run()
|
2020-09-25 14:39:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SendResponse implements Broadcaster interface.
|
|
|
|
func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) {
|
|
|
|
}
|
2020-10-08 11:50:10 +00:00
|
|
|
|
|
|
|
// Run implements Broadcaster interface.
|
|
|
|
func (defaultResponseHandler) Run() {}
|
|
|
|
|
|
|
|
// Shutdown implements Broadcaster interface.
|
|
|
|
func (defaultResponseHandler) Shutdown() {}
|