From 926dd2079281a2a04513a6eec0dfdaad6046448b Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 26 Feb 2019 20:28:38 +0300 Subject: [PATCH] Fix possible data race in pkg/stall (#163) fix #162 --- pkg/peer/stall/stall.go | 33 +++++++++++++++++---------------- pkg/peer/stall/stall_test.go | 13 +++++++++++-- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pkg/peer/stall/stall.go b/pkg/peer/stall/stall.go index 85bf11858..052a22eda 100644 --- a/pkg/peer/stall/stall.go +++ b/pkg/peer/stall/stall.go @@ -17,7 +17,7 @@ type Detector struct { responseTime time.Duration tickInterval time.Duration - lock sync.Mutex + lock *sync.RWMutex responses map[command.Type]time.Time // The detector is embedded into a peer and the peer watches this quit chan @@ -35,7 +35,7 @@ func NewDetector(rTime time.Duration, tickerInterval time.Duration) *Detector { d := &Detector{ responseTime: rTime, tickInterval: tickerInterval, - lock: sync.Mutex{}, + lock: new(sync.RWMutex), responses: map[command.Type]time.Time{}, Quitch: make(chan struct{}), } @@ -46,24 +46,27 @@ func NewDetector(rTime time.Duration, tickerInterval time.Duration) *Detector { func (d *Detector) loop() { ticker := time.NewTicker(d.tickInterval) -loop: + defer func() { + d.Quit() + d.DeleteAll() + ticker.Stop() + }() + for { select { case <-ticker.C: now := time.Now() - for _, deadline := range d.responses { + d.lock.RLock() + resp := d.responses + d.lock.RUnlock() + for _, deadline := range resp { if now.After(deadline) { fmt.Println("Deadline passed") - ticker.Stop() - break loop + return } } - } } - d.Quit() - d.DeleteAll() - ticker.Stop() } // Quit is a concurrent safe way to call the Quit channel @@ -114,17 +117,16 @@ func (d *Detector) DeleteAll() { // and their deadlines func (d *Detector) GetMessages() map[command.Type]time.Time { var resp map[command.Type]time.Time - d.lock.Lock() + d.lock.RLock() resp = d.responses - d.lock.Unlock() + d.lock.RUnlock() return resp } // when a message is added, we will add a deadline for // expected response func (d *Detector) addMessage(cmd command.Type) []command.Type { - - cmds := []command.Type{} + var cmds []command.Type switch cmd { case command.GetHeaders: @@ -151,8 +153,7 @@ func (d *Detector) addMessage(cmd command.Type) []command.Type { // if receive a message, we will delete it from pending func (d *Detector) removeMessage(cmd command.Type) []command.Type { - - cmds := []command.Type{} + var cmds []command.Type switch cmd { case command.Block: diff --git a/pkg/peer/stall/stall_test.go b/pkg/peer/stall/stall_test.go index 5b6a2b755..b86412b2a 100644 --- a/pkg/peer/stall/stall_test.go +++ b/pkg/peer/stall/stall_test.go @@ -1,6 +1,7 @@ package stall import ( + "sync" "testing" "time" @@ -29,6 +30,7 @@ func TestAddRemoveMessage(t *testing.T) { } type mockPeer struct { + lock *sync.RWMutex online bool detector *Detector } @@ -43,7 +45,9 @@ loop: } } // cleanup + mp.lock.Lock() mp.online = false + mp.lock.Unlock() } func TestDeadlineWorks(t *testing.T) { @@ -51,16 +55,19 @@ func TestDeadlineWorks(t *testing.T) { tickerInterval := 1 * time.Second d := NewDetector(responseTime, tickerInterval) - mp := mockPeer{online: true, detector: d} + mp := mockPeer{online: true, detector: d, lock: new(sync.RWMutex)} go mp.loop() d.AddMessage(command.GetAddr) time.Sleep(responseTime + 1*time.Second) k := make(map[command.Type]time.Time) + d.lock.RLock() assert.Equal(t, k, d.responses) + d.lock.RUnlock() + mp.lock.RLock() assert.Equal(t, false, mp.online) - + mp.lock.RUnlock() } func TestDeadlineShouldNotBeEmpty(t *testing.T) { responseTime := 10 * time.Second @@ -71,5 +78,7 @@ func TestDeadlineShouldNotBeEmpty(t *testing.T) { time.Sleep(1 * time.Second) k := make(map[command.Type]time.Time) + d.lock.RLock() assert.NotEqual(t, k, d.responses) + d.lock.RUnlock() }