package oracle import ( "errors" "net/http" "net/url" "sync" "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/wallet" "go.uber.org/zap" ) type ( // Oracle represents oracle module capable of talking // with the external world. Oracle struct { Config // This fields are readonly thus not protected by mutex. oracleHash util.Uint160 oracleResponse []byte oracleScript []byte verifyOffset int // mtx protects setting callbacks. mtx sync.RWMutex // accMtx protects account and oracle nodes. accMtx sync.RWMutex currAccount *wallet.Account oracleNodes keys.PublicKeys oracleSignContract []byte close chan struct{} requestCh chan request requestMap chan map[uint64]*state.OracleRequest // respMtx protects responses and pending maps. respMtx sync.RWMutex // running is false until Run() is invoked. running bool // pending contains requests for not yet started service. pending map[uint64]*state.OracleRequest // responses contains active not completely processed requests. responses map[uint64]*incompleteTx // removed contains ids of requests which won't be processed further due to expiration. removed map[uint64]bool wallet *wallet.Wallet } // Config contains oracle module parameters. Config struct { Log *zap.Logger Network netmode.Magic MainCfg config.OracleConfiguration Client HTTPClient Chain blockchainer.Blockchainer ResponseHandler Broadcaster OnTransaction TxCallback URIValidator URIValidator } // HTTPClient is an interface capable of doing oracle requests. HTTPClient interface { Do(*http.Request) (*http.Response, error) } // Broadcaster broadcasts oracle responses. Broadcaster interface { SendResponse(priv *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) Run() Shutdown() } defaultResponseHandler struct{} // TxCallback executes on new transactions when they are ready to be pooled. TxCallback = func(tx *transaction.Transaction) // URIValidator is used to check if provided URL is valid. URIValidator = func(*url.URL) error ) const ( // defaultRequestTimeout is default request timeout. defaultRequestTimeout = time.Second * 5 // defaultMaxTaskTimeout is default timeout for the request to be dropped if it can't be processed. defaultMaxTaskTimeout = time.Hour // defaultRefreshInterval is default timeout for the failed request to be reprocessed. defaultRefreshInterval = time.Minute * 3 ) // NewOracle returns new oracle instance. func NewOracle(cfg Config) (*Oracle, error) { o := &Oracle{ Config: cfg, close: make(chan struct{}), requestMap: make(chan map[uint64]*state.OracleRequest, 1), pending: make(map[uint64]*state.OracleRequest), responses: make(map[uint64]*incompleteTx), removed: make(map[uint64]bool), } if o.MainCfg.RequestTimeout == 0 { o.MainCfg.RequestTimeout = defaultRequestTimeout } if o.MainCfg.NeoFS.Timeout == 0 { o.MainCfg.NeoFS.Timeout = defaultRequestTimeout } if o.MainCfg.MaxConcurrentRequests == 0 { o.MainCfg.MaxConcurrentRequests = defaultMaxConcurrentRequests } o.requestCh = make(chan request, o.MainCfg.MaxConcurrentRequests) if o.MainCfg.MaxTaskTimeout == 0 { o.MainCfg.MaxTaskTimeout = defaultMaxTaskTimeout } if o.MainCfg.RefreshInterval == 0 { o.MainCfg.RefreshInterval = defaultRefreshInterval } var err error w := cfg.MainCfg.UnlockWallet if o.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil { return nil, err } haveAccount := false for _, acc := range o.wallet.Accounts { if err := acc.Decrypt(w.Password, o.wallet.Scrypt); err == nil { haveAccount = true break } } if !haveAccount { return nil, errors.New("no wallet account could be unlocked") } if o.Client == nil { var client http.Client client.Transport = &http.Transport{DisableKeepAlives: true} client.Timeout = o.MainCfg.RequestTimeout o.Client = &client } if o.ResponseHandler == nil { o.ResponseHandler = defaultResponseHandler{} } if o.OnTransaction == nil { o.OnTransaction = func(*transaction.Transaction) {} } if o.URIValidator == nil { o.URIValidator = defaultURIValidator } return o, nil } // Shutdown shutdowns Oracle. func (o *Oracle) Shutdown() { close(o.close) o.getBroadcaster().Shutdown() } // Run runs must be executed in a separate goroutine. func (o *Oracle) Run() { o.respMtx.Lock() if o.running { o.respMtx.Unlock() return } o.Log.Info("starting oracle service") o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it. o.pending = nil o.running = true o.respMtx.Unlock() for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ { go o.runRequestWorker() } tick := time.NewTicker(o.MainCfg.RefreshInterval) for { select { case <-o.close: tick.Stop() return case <-tick.C: var reprocess []uint64 o.respMtx.Lock() o.removed = make(map[uint64]bool) for id, incTx := range o.responses { incTx.RLock() since := time.Since(incTx.time) if since > o.MainCfg.MaxTaskTimeout { o.removed[id] = true } else if since > o.MainCfg.RefreshInterval { reprocess = append(reprocess, id) } incTx.RUnlock() } for id := range o.removed { delete(o.responses, id) } o.respMtx.Unlock() for _, id := range reprocess { o.requestCh <- request{ID: id} } case reqs := <-o.requestMap: for id, req := range reqs { o.requestCh <- request{ ID: id, Req: req, } } } } } // UpdateNativeContract updates native oracle contract info for tx verification. func (o *Oracle) UpdateNativeContract(script, resp []byte, h util.Uint160, verifyOffset int) { o.oracleScript = slice.Copy(script) o.oracleResponse = slice.Copy(resp) o.oracleHash = h o.verifyOffset = verifyOffset } func (o *Oracle) getOnTransaction() TxCallback { o.mtx.RLock() defer o.mtx.RUnlock() return o.OnTransaction } // SetOnTransaction sets callback to pool and broadcast tx. func (o *Oracle) SetOnTransaction(cb TxCallback) { o.mtx.Lock() defer o.mtx.Unlock() o.OnTransaction = cb } func (o *Oracle) getBroadcaster() Broadcaster { o.mtx.RLock() defer o.mtx.RUnlock() return o.ResponseHandler } // SetBroadcaster sets callback to broadcast response. func (o *Oracle) SetBroadcaster(b Broadcaster) { o.mtx.Lock() defer o.mtx.Unlock() o.ResponseHandler.Shutdown() o.ResponseHandler = b go b.Run() } // SendResponse implements Broadcaster interface. func (defaultResponseHandler) SendResponse(*keys.PrivateKey, *transaction.OracleResponse, []byte) { } // Run implements Broadcaster interface. func (defaultResponseHandler) Run() {} // Shutdown implements Broadcaster interface. func (defaultResponseHandler) Shutdown() {}