From 0a160ee93b16a0e7596abc3134615aa6d6243055 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 27 Apr 2023 18:49:19 +0300 Subject: [PATCH] *: use CompareAndSwap instead of CAS for atomics go.uber.org/atomic deprecated CAS methods in version 1.10 (that introduced CompareAndSwap), so we need to fix it. Signed-off-by: Roman Khimov --- pkg/consensus/consensus.go | 4 ++-- pkg/network/bqueue/queue.go | 2 +- pkg/network/server.go | 6 +++--- pkg/rpcclient/wsclient.go | 2 +- pkg/services/metrics/metrics.go | 4 ++-- pkg/services/notary/notary.go | 4 ++-- pkg/services/rpcsrv/server.go | 4 ++-- pkg/services/rpcsrv/subscription_test.go | 14 +++++++------- pkg/services/stateroot/validators.go | 4 ++-- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 7cd1e52d0..9a29f8c2d 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -276,7 +276,7 @@ func (s *service) Name() string { } func (s *service) Start() { - if s.started.CAS(false, true) { + if s.started.CompareAndSwap(false, true) { s.log.Info("starting consensus service") b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block! s.lastTimestamp = b.Timestamp @@ -288,7 +288,7 @@ func (s *service) Start() { // Shutdown implements the Service interface. func (s *service) Shutdown() { - if s.started.CAS(true, false) { + if s.started.CompareAndSwap(true, false) { s.log.Info("stopping consensus service") close(s.quit) <-s.finished diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 1cf8ec548..c80c6e04a 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -153,7 +153,7 @@ func (bq *Queue) LastQueued() (uint32, int) { // Discard stops the queue and prevents it from accepting more blocks to enqueue. func (bq *Queue) Discard() { - if bq.discarded.CAS(false, true) { + if bq.discarded.CompareAndSwap(false, true) { bq.queueLock.Lock() close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost diff --git a/pkg/network/server.go b/pkg/network/server.go index 1cb62d637..4e5fc6363 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -541,7 +541,7 @@ func (s *Server) tryStartServices() { return } - if s.IsInSync() && s.syncReached.CAS(false, true) { + if s.IsInSync() && s.syncReached.CompareAndSwap(false, true) { s.log.Info("node reached synchronized state, starting services") if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. @@ -1277,14 +1277,14 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato old := lastRequestedHeight.Load() if old <= currHeight { needHeight = currHeight + 1 - if !lastRequestedHeight.CAS(old, needHeight) { + if !lastRequestedHeight.CompareAndSwap(old, needHeight) { continue } } else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) { needHeight = currHeight + 1 if peerHeight > old+payload.MaxHashesCount { needHeight = old + payload.MaxHashesCount - if !lastRequestedHeight.CAS(old, needHeight) { + if !lastRequestedHeight.CompareAndSwap(old, needHeight) { continue } } diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index b77e03f4b..3ec07198f 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -509,7 +509,7 @@ func NewWS(ctx context.Context, endpoint string, opts WSOptions) (*WSClient, err // Close closes connection to the remote side rendering this client instance // unusable. func (c *WSClient) Close() { - if c.closeCalled.CAS(false, true) { + if c.closeCalled.CompareAndSwap(false, true) { c.setCloseErr(errConnClosedByUser) // Closing shutdown channel sends a signal to wsWriter to break out of the // loop. In doing so it does ws.Close() closing the network connection diff --git a/pkg/services/metrics/metrics.go b/pkg/services/metrics/metrics.go index e814e1eff..883c1fabd 100644 --- a/pkg/services/metrics/metrics.go +++ b/pkg/services/metrics/metrics.go @@ -35,7 +35,7 @@ func NewService(name string, httpServers []*http.Server, cfg config.BasicService // Start runs http service with the exposed endpoint on the configured port. func (ms *Service) Start() error { if ms.config.Enabled { - if !ms.started.CAS(false, true) { + if !ms.started.CompareAndSwap(false, true) { ms.log.Info("service already started") return nil } @@ -66,7 +66,7 @@ func (ms *Service) ShutDown() { if !ms.config.Enabled { return } - if !ms.started.CAS(true, false) { + if !ms.started.CompareAndSwap(true, false) { return } for _, srv := range ms.http { diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index e3b6bea96..d2a47e2ae 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -171,7 +171,7 @@ func (n *Notary) Name() string { // Start runs a Notary module in a separate goroutine. // The Notary only starts once, subsequent calls to Start are no-op. func (n *Notary) Start() { - if !n.started.CAS(false, true) { + if !n.started.CompareAndSwap(false, true) { return } n.Config.Log.Info("starting notary service") @@ -221,7 +221,7 @@ drainLoop: // to Shutdown on the same instance are no-op. The instance that was stopped can // not be started again by calling Start (use a new instance if needed). func (n *Notary) Shutdown() { - if !n.started.CAS(true, false) { + if !n.started.CompareAndSwap(true, false) { return } n.Config.Log.Info("stopping notary service") diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 116f56c29..b57d227a5 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -342,7 +342,7 @@ func (s *Server) Start() { s.log.Info("RPC server is not enabled") return } - if !s.started.CAS(false, true) { + if !s.started.CompareAndSwap(false, true) { s.log.Info("RPC server already started") return } @@ -397,7 +397,7 @@ func (s *Server) Start() { // that was stopped can not be started again by calling Start (use a new // instance if needed). func (s *Server) Shutdown() { - if !s.started.CAS(true, false) { + if !s.started.CompareAndSwap(true, false) { return } // Signal to websocket writer routines and handleSubEvents. diff --git a/pkg/services/rpcsrv/subscription_test.go b/pkg/services/rpcsrv/subscription_test.go index ba654a5b3..52ff0c6fb 100644 --- a/pkg/services/rpcsrv/subscription_test.go +++ b/pkg/services/rpcsrv/subscription_test.go @@ -156,7 +156,7 @@ func TestSubscriptions(t *testing.T) { for _, id := range subIDs { callUnsubscribe(t, c, respMsgs, id) } - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() } @@ -312,7 +312,7 @@ func TestFilteredSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, subID) callUnsubscribe(t, c, respMsgs, blockSubID) - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() }) } @@ -408,7 +408,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) { callUnsubscribe(t, c, respMsgs, subID) }) } - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() } @@ -448,7 +448,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) { require.Equal(t, 3, int(primary)) } callUnsubscribe(t, c, respMsgs, blockSubID) - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() } @@ -477,7 +477,7 @@ func TestMaxSubscriptions(t *testing.T) { } } - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() } @@ -519,7 +519,7 @@ func TestBadSubUnsub(t *testing.T) { t.Run("subscribe", testF(t, subCases)) t.Run("unsubscribe", testF(t, unsubCases)) - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() } @@ -614,6 +614,6 @@ func TestSubscriptionOverflow(t *testing.T) { // `Missed` is the last event and there is nothing afterwards. require.Equal(t, 0, len(respMsgs)) - finishedFlag.CAS(false, true) + finishedFlag.CompareAndSwap(false, true) c.Close() } diff --git a/pkg/services/stateroot/validators.go b/pkg/services/stateroot/validators.go index 4e266cb1c..a62e75892 100644 --- a/pkg/services/stateroot/validators.go +++ b/pkg/services/stateroot/validators.go @@ -25,7 +25,7 @@ func (s *service) Name() string { // Start runs service instance in a separate goroutine. // The service only starts once, subsequent calls to Start are no-op. func (s *service) Start() { - if !s.started.CAS(false, true) { + if !s.started.CompareAndSwap(false, true) { return } s.log.Info("starting state validation service") @@ -68,7 +68,7 @@ drainloop: // to Shutdown on the same instance are no-op. The instance that was stopped can // not be started again by calling Start (use a new instance if needed). func (s *service) Shutdown() { - if !s.started.CAS(true, false) { + if !s.started.CompareAndSwap(true, false) { return } s.log.Info("stopping state validation service")