neoneo-go/pkg/services/oracle/oracle.go

242 lines
6 KiB
Go
Raw Normal View History

2020-09-25 14:39:11 +00:00
package oracle
import (
"errors"
2020-09-25 14:39:11 +00:00
"net/http"
"net/url"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/state"
2020-09-25 14:39:11 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"go.uber.org/zap"
)
type (
// Oracle represents oracle module capable of talking
// with the external world.
Oracle struct {
Config
// mtx protects setting callbacks.
mtx sync.RWMutex
// accMtx protects account and oracle nodes.
accMtx sync.RWMutex
currAccount *wallet.Account
oracleNodes keys.PublicKeys
oracleSignContract []byte
close chan struct{}
2020-10-07 07:48:41 +00:00
requestCh chan request
requestMap chan map[uint64]*state.OracleRequest
2020-09-25 14:39:11 +00:00
// respMtx protects responses map.
respMtx sync.RWMutex
responses map[uint64]*incompleteTx
2020-10-09 07:44:31 +00:00
// removed contains ids of requests which won't be processed further due to expiration.
removed map[uint64]bool
2020-09-25 14:39:11 +00:00
wallet *wallet.Wallet
}
// Config contains oracle module parameters.
Config struct {
Log *zap.Logger
Network netmode.Magic
MainCfg config.OracleConfiguration
2020-09-25 14:39:11 +00:00
Client HTTPClient
Chain blockchainer.Blockchainer
ResponseHandler Broadcaster
OnTransaction TxCallback
URIValidator URIValidator
OracleScript []byte
OracleResponse []byte
OracleHash util.Uint160
}
// HTTPClient is an interface capable of doing oracle requests.
HTTPClient interface {
Get(string) (*http.Response, error)
}
// Broadcaster broadcasts oracle responses.
Broadcaster interface {
SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte)
2020-10-08 11:50:10 +00:00
Run()
Shutdown()
2020-09-25 14:39:11 +00:00
}
defaultResponseHandler struct{}
// TxCallback executes on new transactions when they are ready to be pooled.
TxCallback = func(tx *transaction.Transaction)
// URIValidator is used to check if provided URL is valid.
URIValidator = func(*url.URL) error
)
const (
// defaultRequestTimeout is default request timeout.
defaultRequestTimeout = time.Second * 5
2020-10-09 07:44:31 +00:00
// defaultMaxTaskTimeout is default timeout for the request to be dropped if it can't be processed.
defaultMaxTaskTimeout = time.Hour
// defaultRefreshInterval is default timeout for the failed request to be reprocessed.
defaultRefreshInterval = time.Minute * 3
2020-09-25 14:39:11 +00:00
)
// NewOracle returns new oracle instance.
func NewOracle(cfg Config) (*Oracle, error) {
o := &Oracle{
Config: cfg,
close: make(chan struct{}),
requestMap: make(chan map[uint64]*state.OracleRequest, 1),
responses: make(map[uint64]*incompleteTx),
2020-10-09 07:44:31 +00:00
removed: make(map[uint64]bool),
}
if o.MainCfg.RequestTimeout == 0 {
o.MainCfg.RequestTimeout = defaultRequestTimeout
2020-09-25 14:39:11 +00:00
}
2020-10-07 07:48:41 +00:00
if o.MainCfg.MaxConcurrentRequests == 0 {
o.MainCfg.MaxConcurrentRequests = defaultMaxConcurrentRequests
}
o.requestCh = make(chan request, o.MainCfg.MaxConcurrentRequests)
2020-10-09 07:44:31 +00:00
if o.MainCfg.MaxTaskTimeout == 0 {
o.MainCfg.MaxTaskTimeout = defaultMaxTaskTimeout
}
if o.MainCfg.RefreshInterval == 0 {
o.MainCfg.RefreshInterval = defaultRefreshInterval
}
2020-09-25 14:39:11 +00:00
var err error
w := cfg.MainCfg.UnlockWallet
2020-09-25 14:39:11 +00:00
if o.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil {
return nil, err
}
haveAccount := false
for _, acc := range o.wallet.Accounts {
if err := acc.Decrypt(w.Password); err == nil {
haveAccount = true
break
}
}
if !haveAccount {
return nil, errors.New("no wallet account could be unlocked")
}
2020-09-25 14:39:11 +00:00
if o.Client == nil {
var client http.Client
client.Transport = &http.Transport{DisableKeepAlives: true}
client.Timeout = o.MainCfg.RequestTimeout
2020-09-25 14:39:11 +00:00
o.Client = &client
}
if o.ResponseHandler == nil {
o.ResponseHandler = defaultResponseHandler{}
}
if o.OnTransaction == nil {
o.OnTransaction = func(*transaction.Transaction) {}
}
if o.URIValidator == nil {
o.URIValidator = defaultURIValidator
}
return o, nil
}
// Shutdown shutdowns Oracle.
func (o *Oracle) Shutdown() {
close(o.close)
2020-10-08 11:50:10 +00:00
o.getBroadcaster().Shutdown()
}
// Run runs must be executed in a separate goroutine.
func (o *Oracle) Run() {
2020-10-07 07:48:41 +00:00
for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ {
go o.runRequestWorker()
}
2020-10-09 07:44:31 +00:00
tick := time.NewTicker(o.MainCfg.RefreshInterval)
for {
select {
case <-o.close:
2020-10-09 07:44:31 +00:00
tick.Stop()
return
2020-10-09 07:44:31 +00:00
case <-tick.C:
var reprocess []uint64
services: fix Oracle responces mutex Solves the following problem: === RUN TestOracleFull logger.go:130: 2021-02-04T09:25:16.305Z INFO P2PNotaryRequestPayloadPool size is not set or wrong, setting default value {"P2PNotaryRequestPayloadPoolSize": 1000} logger.go:130: 2021-02-04T09:25:16.306Z INFO no storage version found! creating genesis block logger.go:130: 2021-02-04T09:25:27.687Z DEBUG done processing headers {"headerIndex": 1, "blockHeight": 0, "took": "2.413398ms"} logger.go:130: 2021-02-04T09:25:27.696Z DEBUG done processing headers {"headerIndex": 2, "blockHeight": 1, "took": "1.138196ms"} logger.go:130: 2021-02-04T09:25:28.680Z INFO blockchain persist completed {"persistedBlocks": 2, "persistedKeys": 173, "headerHeight": 2, "blockHeight": 2, "took": "166.793µs"} fatal error: sync: Unlock of unlocked RWMutex goroutine 6157 [running]: runtime.throw(0x115dfdb, 0x20) /usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc000297ca0 sp=0xc000297c70 pc=0x44f432 sync.throw(0x115dfdb, 0x20) /usr/local/go/src/runtime/panic.go:1102 +0x35 fp=0xc000297cc0 sp=0xc000297ca0 pc=0x44f3b5 sync.(*RWMutex).Unlock(0xc000135300) /usr/local/go/src/sync/rwmutex.go:129 +0xf3 fp=0xc000297d00 sp=0xc000297cc0 pc=0x4a1ac3 github.com/nspcc-dev/neo-go/pkg/services/oracle.(*Oracle).Run(0xc000135180) /go/src/github.com/nspcc-dev/neo-go/pkg/services/oracle/oracle.go:189 +0x82b fp=0xc000297fd8 sp=0xc000297d00 pc=0xe13b0b runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc000297fe0 sp=0xc000297fd8 pc=0x4834d1 created by github.com/nspcc-dev/neo-go/pkg/core.TestOracleFull /go/src/github.com/nspcc-dev/neo-go/pkg/core/oracle_test.go:276 +0x3f1
2021-02-04 09:42:48 +00:00
o.respMtx.Lock()
2020-10-09 07:44:31 +00:00
o.removed = make(map[uint64]bool)
for id, incTx := range o.responses {
incTx.RLock()
since := time.Since(incTx.time)
if since > o.MainCfg.MaxTaskTimeout {
o.removed[id] = true
} else if since > o.MainCfg.RefreshInterval {
reprocess = append(reprocess, id)
}
incTx.RUnlock()
}
for id := range o.removed {
delete(o.responses, id)
}
o.respMtx.Unlock()
for _, id := range reprocess {
o.requestCh <- request{ID: id}
}
case reqs := <-o.requestMap:
2020-10-07 07:48:41 +00:00
for id, req := range reqs {
o.requestCh <- request{
ID: id,
Req: req,
}
}
}
}
}
func (o *Oracle) getOnTransaction() TxCallback {
o.mtx.RLock()
defer o.mtx.RUnlock()
return o.OnTransaction
}
// SetOnTransaction sets callback to pool and broadcast tx.
func (o *Oracle) SetOnTransaction(cb TxCallback) {
o.mtx.Lock()
defer o.mtx.Unlock()
o.OnTransaction = cb
}
2020-09-25 14:39:11 +00:00
func (o *Oracle) getBroadcaster() Broadcaster {
o.mtx.RLock()
defer o.mtx.RUnlock()
return o.ResponseHandler
}
// SetBroadcaster sets callback to broadcast response.
func (o *Oracle) SetBroadcaster(b Broadcaster) {
o.mtx.Lock()
defer o.mtx.Unlock()
2020-10-08 11:50:10 +00:00
o.ResponseHandler.Shutdown()
2020-09-25 14:39:11 +00:00
o.ResponseHandler = b
2020-10-08 11:50:10 +00:00
go b.Run()
2020-09-25 14:39:11 +00:00
}
// SendResponse implements Broadcaster interface.
func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) {
}
2020-10-08 11:50:10 +00:00
// Run implements Broadcaster interface.
func (defaultResponseHandler) Run() {}
// Shutdown implements Broadcaster interface.
func (defaultResponseHandler) Shutdown() {}