oracle: only process new requests after initial sync

If an oracle node is resynchronized from the genesis the service receives all
requests from all blocks via AddRequests() invoked from the native
contract. Almost all of them are long obsolete and need to be removed, native
oracle contract will try to do that with RemoveRequests() calls, but they
won't change anything.

So queue up all "initial" requests in special map and manage it directly
before the module is Run() which happens after synchronization
completion. Then process any requests that are still active and work with new
blocks as usual.
This commit is contained in:
Roman Khimov 2021-07-19 21:51:31 +03:00
parent 0bad413e23
commit 3a93977b7b
3 changed files with 100 additions and 10 deletions

View file

@ -9,6 +9,7 @@ import (
"net/url"
"path"
"strings"
"sync"
"testing"
"time"
@ -54,7 +55,7 @@ func getTestOracle(t *testing.T, bc *Blockchain, walletPath, pass string) (
m := make(map[uint64]*responseWithSig)
ch := make(chan *transaction.Transaction, 5)
orcCfg := getOracleConfig(t, bc, walletPath, pass)
orcCfg.ResponseHandler = saveToMapBroadcaster{m}
orcCfg.ResponseHandler = &saveToMapBroadcaster{m: m}
orcCfg.OnTransaction = saveTxToChan(ch)
orcCfg.URIValidator = func(u *url.URL) error {
if strings.HasPrefix(u.Host, "private") {
@ -298,18 +299,74 @@ func TestOracleFull(t *testing.T) {
require.True(t, txes[0].HasAttribute(transaction.OracleResponseT))
}
type saveToMapBroadcaster struct {
m map[uint64]*responseWithSig
func TestNotYetRunningOracle(t *testing.T) {
bc := initTestChain(t, nil, nil)
acc, orc, _, _ := getTestOracle(t, bc, "./testdata/oracle2.json", "two")
mp := bc.GetMemPool()
orc.OnTransaction = func(tx *transaction.Transaction) { _ = mp.Add(tx, bc) }
bc.SetOracle(orc)
cs := getOracleContractState(bc.contracts.Oracle.Hash, bc.contracts.Std.Hash)
require.NoError(t, bc.contracts.Management.PutContractState(bc.dao, cs))
go bc.Run()
bc.setNodesByRole(t, true, noderoles.Oracle, keys.PublicKeys{acc.PrivateKey().PublicKey()})
var req state.OracleRequest
var reqs = make(map[uint64]*state.OracleRequest)
for i := uint64(0); i < 3; i++ {
reqs[i] = &req
}
orc.AddRequests(reqs) // 0, 1, 2 added to pending.
var ids = []uint64{0, 1}
orc.RemoveRequests(ids) // 0, 1 removed from pending, 2 left.
reqs = make(map[uint64]*state.OracleRequest)
for i := uint64(3); i < 5; i++ {
reqs[i] = &req
}
orc.AddRequests(reqs) // 3, 4 added to pending -> 2, 3, 4 in pending.
ids = []uint64{3}
orc.RemoveRequests(ids) // 3 removed from pending -> 2, 4 in pending.
go orc.Run()
t.Cleanup(orc.Shutdown)
require.Eventually(t, func() bool { return mp.Count() == 2 },
time.Second*3, time.Millisecond*200)
txes := mp.GetVerifiedTransactions()
require.Len(t, txes, 2)
var txids []uint64
for _, tx := range txes {
for _, attr := range tx.Attributes {
if attr.Type == transaction.OracleResponseT {
resp := attr.Value.(*transaction.OracleResponse)
txids = append(txids, resp.ID)
}
}
}
require.Len(t, txids, 2)
require.Contains(t, txids, uint64(2))
require.Contains(t, txids, uint64(4))
}
func (b saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
type saveToMapBroadcaster struct {
mtx sync.RWMutex
m map[uint64]*responseWithSig
}
func (b *saveToMapBroadcaster) SendResponse(_ *keys.PrivateKey, resp *transaction.OracleResponse, txSig []byte) {
b.mtx.Lock()
defer b.mtx.Unlock()
b.m[resp.ID] = &responseWithSig{
resp: resp,
txSig: txSig,
}
}
func (saveToMapBroadcaster) Run() {}
func (saveToMapBroadcaster) Shutdown() {}
func (*saveToMapBroadcaster) Run() {}
func (*saveToMapBroadcaster) Shutdown() {}
type responseWithSig struct {
resp *transaction.OracleResponse

View file

@ -43,8 +43,13 @@ type (
requestCh chan request
requestMap chan map[uint64]*state.OracleRequest
// respMtx protects responses map.
respMtx sync.RWMutex
// respMtx protects responses and pending maps.
respMtx sync.RWMutex
// running is false until Run() is invoked.
running bool
// pending contains requests for not yet started service.
pending map[uint64]*state.OracleRequest
// responses contains active not completely processed requests.
responses map[uint64]*incompleteTx
// removed contains ids of requests which won't be processed further due to expiration.
removed map[uint64]bool
@ -102,6 +107,7 @@ func NewOracle(cfg Config) (*Oracle, error) {
close: make(chan struct{}),
requestMap: make(chan map[uint64]*state.OracleRequest, 1),
pending: make(map[uint64]*state.OracleRequest),
responses: make(map[uint64]*incompleteTx),
removed: make(map[uint64]bool),
}
@ -165,7 +171,18 @@ func (o *Oracle) Shutdown() {
// Run runs must be executed in a separate goroutine.
func (o *Oracle) Run() {
o.respMtx.Lock()
if o.running {
o.respMtx.Unlock()
return
}
o.Log.Info("starting oracle service")
o.requestMap <- o.pending // Guaranteed to not block, only AddRequests sends to it.
o.pending = nil
o.running = true
o.respMtx.Unlock()
for i := 0; i < o.MainCfg.MaxConcurrentRequests; i++ {
go o.runRequestWorker()
}

View file

@ -46,8 +46,14 @@ func (o *Oracle) runRequestWorker() {
func (o *Oracle) RemoveRequests(ids []uint64) {
o.respMtx.Lock()
defer o.respMtx.Unlock()
for _, id := range ids {
delete(o.responses, id)
if !o.running {
for _, id := range ids {
delete(o.pending, id)
}
} else {
for _, id := range ids {
delete(o.responses, id)
}
}
}
@ -57,6 +63,16 @@ func (o *Oracle) AddRequests(reqs map[uint64]*state.OracleRequest) {
return
}
o.respMtx.Lock()
if !o.running {
for id, r := range reqs {
o.pending[id] = r
}
o.respMtx.Unlock()
return
}
o.respMtx.Unlock()
select {
case o.requestMap <- reqs:
default: