Merge pull request #2050 from nspcc-dev/fix-network-test-race

Fix some test failures
This commit is contained in:
Roman Khimov 2021-07-08 14:50:50 +03:00 committed by GitHub
commit 7bc7457fbe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 37 additions and 25 deletions

View file

@ -97,13 +97,14 @@ jobs:
- run: apk update && apk add git make curl tar - run: apk update && apk add git make curl tar
- checkout - checkout
- gomod - gomod
- setup_remote_docker - setup_remote_docker:
version: 20.10.6
- run: - run:
name: Install Docker client name: Install Docker client
command: | command: |
set -x set -x
VER="17.03.0-ce" VER="20.10.6"
curl -L -o /tmp/docker-$VER.tgz https://get.docker.com/builds/Linux/x86_64/docker-$VER.tgz curl -L -o /tmp/docker-$VER.tgz https://download.docker.com/linux/static/stable/x86_64/docker-$VER.tgz
tar -xz -C /tmp -f /tmp/docker-$VER.tgz tar -xz -C /tmp -f /tmp/docker-$VER.tgz
mv /tmp/docker/* /usr/bin mv /tmp/docker/* /usr/bin
- run: make image - run: make image

View file

@ -325,10 +325,14 @@ func (m *Management) updateWithData(ic *interop.Context, args []stackitem.Item)
// Update updates contract's script and/or manifest in the given DAO. // Update updates contract's script and/or manifest in the given DAO.
// It doesn't run _deploy method and doesn't emit notification. // It doesn't run _deploy method and doesn't emit notification.
func (m *Management) Update(d dao.DAO, hash util.Uint160, neff *nef.File, manif *manifest.Manifest) (*state.Contract, error) { func (m *Management) Update(d dao.DAO, hash util.Uint160, neff *nef.File, manif *manifest.Manifest) (*state.Contract, error) {
contract, err := m.GetContract(d, hash) var contract state.Contract
oldcontract, err := m.GetContract(d, hash)
if err != nil { if err != nil {
return nil, errors.New("contract doesn't exist") return nil, errors.New("contract doesn't exist")
} }
contract = *oldcontract // Make a copy, don't ruin (potentially) cached contract.
// if NEF was provided, update the contract script // if NEF was provided, update the contract script
if neff != nil { if neff != nil {
m.markUpdated(hash) m.markUpdated(hash)
@ -351,11 +355,11 @@ func (m *Management) Update(d dao.DAO, hash util.Uint160, neff *nef.File, manif
return nil, err return nil, err
} }
contract.UpdateCounter++ contract.UpdateCounter++
err = m.PutContractState(d, contract) err = m.PutContractState(d, &contract)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return contract, nil return &contract, nil
} }
// destroy is an implementation of destroy update method, it's run under // destroy is an implementation of destroy update method, it's run under

View file

@ -27,12 +27,13 @@ func newFakeTransp(s *Server) Transporter {
} }
func (ft *fakeTransp) Dial(addr string, timeout time.Duration) error { func (ft *fakeTransp) Dial(addr string, timeout time.Duration) error {
var ret error
if atomic.LoadInt32(&ft.retFalse) > 0 {
ret = errors.New("smth bad happened")
}
ft.dialCh <- addr ft.dialCh <- addr
if atomic.LoadInt32(&ft.retFalse) > 0 { return ret
return errors.New("smth bad happened")
}
return nil
} }
func (ft *fakeTransp) Accept() { func (ft *fakeTransp) Accept() {
if ft.started.Load() { if ft.started.Load() {
@ -56,7 +57,7 @@ func (ft *fakeTransp) Close() {
func TestDefaultDiscoverer(t *testing.T) { func TestDefaultDiscoverer(t *testing.T) {
ts := &fakeTransp{} ts := &fakeTransp{}
ts.dialCh = make(chan string) ts.dialCh = make(chan string)
d := NewDefaultDiscovery(nil, time.Second/2, ts) d := NewDefaultDiscovery(nil, time.Second/16, ts)
var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"} var set1 = []string{"1.1.1.1:10333", "2.2.2.2:10333"}
sort.Strings(set1) sort.Strings(set1)
@ -86,13 +87,9 @@ func TestDefaultDiscoverer(t *testing.T) {
t.Fatalf("timeout expecting for transport dial") t.Fatalf("timeout expecting for transport dial")
} }
} }
// Updated asynchronously. require.Eventually(t, func() bool { return len(d.UnconnectedPeers()) == 0 }, 2*time.Second, 50*time.Millisecond)
if len(d.UnconnectedPeers()) != 0 {
time.Sleep(time.Second)
}
sort.Strings(dialled) sort.Strings(dialled)
assert.Equal(t, 0, d.PoolCount()) assert.Equal(t, 0, d.PoolCount())
assert.Equal(t, 0, len(d.UnconnectedPeers()))
assert.Equal(t, 0, len(d.BadPeers())) assert.Equal(t, 0, len(d.BadPeers()))
assert.Equal(t, 0, len(d.GoodPeers())) assert.Equal(t, 0, len(d.GoodPeers()))
require.Equal(t, set1, dialled) require.Equal(t, set1, dialled)
@ -167,11 +164,7 @@ func TestDefaultDiscoverer(t *testing.T) {
assert.Equal(t, set1[i], dialledBad[i*connRetries+j]) assert.Equal(t, set1[i], dialledBad[i*connRetries+j])
} }
} }
// Updated asynchronously. require.Eventually(t, func() bool { return len(d.BadPeers()) == len(set1) }, 2*time.Second, 50*time.Millisecond)
if len(d.BadPeers()) != len(set1) {
time.Sleep(time.Second)
}
assert.Equal(t, len(set1), len(d.BadPeers()))
assert.Equal(t, 0, len(d.GoodPeers())) assert.Equal(t, 0, len(d.GoodPeers()))
assert.Equal(t, 0, len(d.UnconnectedPeers())) assert.Equal(t, 0, len(d.UnconnectedPeers()))

View file

@ -16,6 +16,7 @@ type TCPTransport struct {
listener net.Listener listener net.Listener
bindAddr string bindAddr string
lock sync.RWMutex lock sync.RWMutex
quit bool
} }
var reClosedNetwork = regexp.MustCompile(".* use of closed network connection") var reClosedNetwork = regexp.MustCompile(".* use of closed network connection")
@ -50,16 +51,24 @@ func (t *TCPTransport) Accept() {
} }
t.lock.Lock() t.lock.Lock()
if t.quit {
t.lock.Unlock()
l.Close()
return
}
t.listener = l t.listener = l
t.lock.Unlock() t.lock.Unlock()
for { for {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
t.log.Warn("TCP accept error", zap.Error(err)) t.lock.Lock()
if t.isCloseError(err) { quit := t.quit
t.lock.Unlock()
if t.isCloseError(err) && quit {
break break
} }
t.log.Warn("TCP accept error", zap.Error(err))
continue continue
} }
p := NewTCPPeer(conn, t.server) p := NewTCPPeer(conn, t.server)
@ -83,6 +92,7 @@ func (t *TCPTransport) Close() {
if t.listener != nil { if t.listener != nil {
t.listener.Close() t.listener.Close()
} }
t.quit = true
t.lock.Unlock() t.lock.Unlock()
} }

View file

@ -21,6 +21,10 @@ const testOverflow = false
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) { func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) {
for { for {
err := ws.SetReadDeadline(time.Now().Add(time.Second)) err := ws.SetReadDeadline(time.Now().Add(time.Second))
if isFinished.Load() {
require.Error(t, err)
break
}
require.NoError(t, err) require.NoError(t, err)
_, body, err := ws.ReadMessage() _, body, err := ws.ReadMessage()
if isFinished.Load() { if isFinished.Load() {
@ -89,11 +93,11 @@ func TestSubscriptions(t *testing.T) {
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"} var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }()
go rpcSrv.coreServer.Start(make(chan error)) go rpcSrv.coreServer.Start(make(chan error))
defer rpcSrv.coreServer.Shutdown() defer rpcSrv.coreServer.Shutdown()
defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }()
for _, feed := range subFeeds { for _, feed := range subFeeds {
s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed)) s := callSubscribe(t, c, respMsgs, fmt.Sprintf(`["%s"]`, feed))