From f4ac75423c1231662a79669cdd9cb9956b93b0d3 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 22 Jul 2022 11:06:16 +0300 Subject: [PATCH] [#283] pool: Store methods status in slice Signed-off-by: Denis Kirillov --- pool/mock_test.go | 2 +- pool/pool.go | 141 ++++++++++++++++++++++++++++++------------- pool/pool_test.go | 13 +--- pool/sampler_test.go | 2 +- pool/statistic.go | 4 +- 5 files changed, 106 insertions(+), 56 deletions(-) diff --git a/pool/mock_test.go b/pool/mock_test.go index 0286f6da..3af6db9c 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -32,7 +32,7 @@ type mockClient struct { func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient { return &mockClient{ key: key, - clientStatusMonitor: newTestStatusMonitor(addr), + clientStatusMonitor: newClientStatusMonitor(addr, 10), } } diff --git a/pool/pool.go b/pool/pool.go index 6d9d9c31..e33925dd 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -60,7 +60,7 @@ type clientStatus interface { address() string currentErrorRate() uint32 overallErrorRate() uint64 - methodsStatus() map[string]methodStatus + methodsStatus() []statusSnapshot } type clientStatusMonitor struct { @@ -71,33 +71,106 @@ type clientStatusMonitor struct { mu sync.RWMutex // protect counters currentErrorCount uint32 overallErrorCount uint64 - methods map[string]methodStatus + methods []*methodStatus } type methodStatus struct { - name string + name string + mu sync.RWMutex // protect counters + statusSnapshot +} + +type statusSnapshot struct { allTime uint64 allRequests uint64 } +// MethodIndex index of method in list of statuses in clientStatusMonitor. +type MethodIndex int + const ( - methodBalanceGet = "balanceGet" - methodContainerPut = "containerPut" - methodContainerGet = "containerGet" - methodContainerList = "containerList" - methodContainerDelete = "containerDelete" - methodContainerEACL = "containerEACL" - methodContainerSetEACL = "containerSetEACL" - methodEndpointInfo = "endpointInfo" - methodNetworkInfo = "networkInfo" - methodObjectPut = "objectPut" - methodObjectDelete = "objectDelete" - methodObjectGet = "objectGet" - methodObjectHead = "objectHead" - methodObjectRange = "objectRange" - methodSessionCreate = "sessionCreate" + methodBalanceGet MethodIndex = iota + methodContainerPut + methodContainerGet + methodContainerList + methodContainerDelete + methodContainerEACL + methodContainerSetEACL + methodEndpointInfo + methodNetworkInfo + methodObjectPut + methodObjectDelete + methodObjectGet + methodObjectHead + methodObjectRange + methodSessionCreate ) +// String implements fmt.Stringer. +func (m MethodIndex) String() string { + switch m { + case methodBalanceGet: + return "balanceGet" + case methodContainerPut: + return "containerPut" + case methodContainerGet: + return "containerGet" + case methodContainerList: + return "containerList" + case methodContainerDelete: + return "containerDelete" + case methodContainerEACL: + return "containerEACL" + case methodContainerSetEACL: + return "containerSetEACL" + case methodEndpointInfo: + return "endpointInfo" + case methodNetworkInfo: + return "networkInfo" + case methodObjectPut: + return "objectPut" + case methodObjectDelete: + return "objectDelete" + case methodObjectGet: + return "objectGet" + case methodObjectHead: + return "objectHead" + case methodObjectRange: + return "objectRange" + case methodSessionCreate: + return "sessionCreate" + default: + return "unknown" + } +} + +func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor { + methods := make([]*methodStatus, methodSessionCreate+1) + for i := methodBalanceGet; i <= methodSessionCreate; i++ { + methods[i] = &methodStatus{name: i.String()} + } + + return clientStatusMonitor{ + addr: addr, + healthy: atomic.NewBool(true), + errorThreshold: errorThreshold, + methods: methods, + } +} + +func (m *methodStatus) snapshot() statusSnapshot { + m.mu.RLock() + defer m.mu.RUnlock() + return m.statusSnapshot +} + +func (m *methodStatus) incRequests(elapsed time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.allTime += uint64(elapsed) + m.allRequests++ +} + // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { client sdkClient.Client @@ -139,12 +212,8 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) { prmInit.SetResponseInfoCallback(prm.responseInfoCallback) res := &clientWrapper{ - key: prm.key, - clientStatusMonitor: clientStatusMonitor{ - addr: prm.address, - healthy: atomic.NewBool(true), - errorThreshold: prm.errorThreshold, - }, + key: prm.key, + clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold), } res.client.Init(prmInit) @@ -656,28 +725,18 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 { return c.overallErrorCount } -func (c *clientStatusMonitor) methodsStatus() map[string]methodStatus { - c.mu.RLock() - defer c.mu.RUnlock() - - result := make(map[string]methodStatus) - for key, val := range c.methods { - result[key] = val +func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { + result := make([]statusSnapshot, len(c.methods)) + for i, val := range c.methods { + result[i] = val.snapshot() } return result } -func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method string) { - c.mu.Lock() - defer c.mu.Unlock() - methodStat, ok := c.methods[method] - if !ok { - methodStat.name = method - } - methodStat.allTime += uint64(elapsed) - methodStat.allRequests++ - c.methods[method] = methodStat +func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method MethodIndex) { + methodStat := c.methods[method] + methodStat.incRequests(elapsed) } func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { diff --git a/pool/pool_test.go b/pool/pool_test.go index 9a7f7bbf..33706750 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -17,7 +17,6 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -508,16 +507,8 @@ func TestWaitPresence(t *testing.T) { }) } -func newTestStatusMonitor(addr string) clientStatusMonitor { - return clientStatusMonitor{ - addr: addr, - healthy: atomic.NewBool(true), - errorThreshold: 10, - } -} - func TestStatusMonitor(t *testing.T) { - monitor := newTestStatusMonitor("") + monitor := newClientStatusMonitor("", 10) monitor.errorThreshold = 3 count := 10 @@ -530,7 +521,7 @@ func TestStatusMonitor(t *testing.T) { } func TestHandleError(t *testing.T) { - monitor := newTestStatusMonitor("") + monitor := newClientStatusMonitor("", 10) for i, tc := range []struct { status apistatus.Status diff --git a/pool/sampler_test.go b/pool/sampler_test.go index ddd37216..f83a8118 100644 --- a/pool/sampler_test.go +++ b/pool/sampler_test.go @@ -63,7 +63,7 @@ func newNetmapMock(name string, needErr bool) *clientMock { } return &clientMock{ clientWrapper: clientWrapper{ - clientStatusMonitor: newTestStatusMonitor(""), + clientStatusMonitor: newClientStatusMonitor("", 10), }, name: name, err: err, diff --git a/pool/statistic.go b/pool/statistic.go index c80027ee..3a4f4242 100644 --- a/pool/statistic.go +++ b/pool/statistic.go @@ -39,7 +39,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) { // NodeStatistic is metrics of certain connections. type NodeStatistic struct { address string - methods map[string]methodStatus + methods []statusSnapshot overallErrors uint64 currentErrors uint32 } @@ -144,7 +144,7 @@ func (n NodeStatistic) AverageCreateSession() time.Duration { return n.averageTime(methodSessionCreate) } -func (n NodeStatistic) averageTime(method string) time.Duration { +func (n NodeStatistic) averageTime(method MethodIndex) time.Duration { stat := n.methods[method] if stat.allRequests == 0 { return 0