From 25d734cbad027d78d714bf0f23cc63238947fb6e Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 7 Oct 2020 10:48:41 +0300 Subject: [PATCH] oracle: process requests concurrently --- pkg/config/oracle_config.go | 13 +++++++------ pkg/services/oracle/oracle.go | 15 ++++++++++++++- pkg/services/oracle/request.go | 25 +++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/pkg/config/oracle_config.go b/pkg/config/oracle_config.go index 9c2904c3c..62665896f 100644 --- a/pkg/config/oracle_config.go +++ b/pkg/config/oracle_config.go @@ -4,10 +4,11 @@ import "time" // OracleConfiguration is a config for the oracle module. type OracleConfiguration struct { - Enabled bool `yaml:"Enabled"` - AllowPrivateHost bool `yaml:"AllowPrivateHost"` - Nodes []string `yaml:"Nodes"` - RequestTimeout time.Duration `yaml:"RequestTimeout"` - ResponseTimeout time.Duration `yaml:"ResponseTimeout"` - UnlockWallet Wallet `yaml:"UnlockWallet"` + Enabled bool `yaml:"Enabled"` + AllowPrivateHost bool `yaml:"AllowPrivateHost"` + Nodes []string `yaml:"Nodes"` + MaxConcurrentRequests int `yaml:"MaxConcurrentRequests"` + RequestTimeout time.Duration `yaml:"RequestTimeout"` + ResponseTimeout time.Duration `yaml:"ResponseTimeout"` + UnlockWallet Wallet `yaml:"UnlockWallet"` } diff --git a/pkg/services/oracle/oracle.go b/pkg/services/oracle/oracle.go index 4a3fde10d..264636f88 100644 --- a/pkg/services/oracle/oracle.go +++ b/pkg/services/oracle/oracle.go @@ -34,6 +34,7 @@ type ( oracleSignContract []byte close chan struct{} + requestCh chan request requestMap chan map[uint64]*state.OracleRequest // respMtx protects responses map. @@ -93,6 +94,10 @@ func NewOracle(cfg Config) (*Oracle, error) { if o.MainCfg.RequestTimeout == 0 { o.MainCfg.RequestTimeout = defaultRequestTimeout } + if o.MainCfg.MaxConcurrentRequests == 0 { + o.MainCfg.MaxConcurrentRequests = defaultMaxConcurrentRequests + } + o.requestCh = make(chan request, o.MainCfg.MaxConcurrentRequests) var err error w := cfg.MainCfg.UnlockWallet @@ -136,12 +141,20 @@ func (o *Oracle) Shutdown() { // Run runs must be executed in a separate goroutine. func (o *Oracle) Run() { + for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ { + go o.runRequestWorker() + } for { select { case <-o.close: return case reqs := <-o.requestMap: - o.ProcessRequestsInternal(reqs) + for id, req := range reqs { + o.requestCh <- request{ + ID: id, + Req: req, + } + } } } } diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index 06143341a..3a4f7d5db 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -12,6 +12,31 @@ import ( "go.uber.org/zap" ) +const defaultMaxConcurrentRequests = 10 + +type request struct { + ID uint64 + Req *state.OracleRequest +} + +func (o *Oracle) runRequestWorker() { + for { + select { + case <-o.close: + return + case req := <-o.requestCh: + acc := o.getAccount() + if acc == nil { + continue + } + err := o.processRequest(acc.PrivateKey(), req.ID, req.Req) + if err != nil { + o.Log.Debug("can't process request", zap.Uint64("id", req.ID), zap.Error(err)) + } + } + } +} + // RemoveRequests removes all data associated with requests // which have been processed by oracle contract. func (o *Oracle) RemoveRequests(ids []uint64) {