forked from TrueCloudLab/frostfs-sdk-go
[#283] pool: Store methods status in slice
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
b4f4ee4f79
commit
f4ac75423c
5 changed files with 106 additions and 56 deletions
|
@ -32,7 +32,7 @@ type mockClient struct {
|
||||||
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
|
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
|
||||||
return &mockClient{
|
return &mockClient{
|
||||||
key: key,
|
key: key,
|
||||||
clientStatusMonitor: newTestStatusMonitor(addr),
|
clientStatusMonitor: newClientStatusMonitor(addr, 10),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
137
pool/pool.go
137
pool/pool.go
|
@ -60,7 +60,7 @@ type clientStatus interface {
|
||||||
address() string
|
address() string
|
||||||
currentErrorRate() uint32
|
currentErrorRate() uint32
|
||||||
overallErrorRate() uint64
|
overallErrorRate() uint64
|
||||||
methodsStatus() map[string]methodStatus
|
methodsStatus() []statusSnapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientStatusMonitor struct {
|
type clientStatusMonitor struct {
|
||||||
|
@ -71,33 +71,106 @@ type clientStatusMonitor struct {
|
||||||
mu sync.RWMutex // protect counters
|
mu sync.RWMutex // protect counters
|
||||||
currentErrorCount uint32
|
currentErrorCount uint32
|
||||||
overallErrorCount uint64
|
overallErrorCount uint64
|
||||||
methods map[string]methodStatus
|
methods []*methodStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
type methodStatus struct {
|
type methodStatus struct {
|
||||||
name string
|
name string
|
||||||
|
mu sync.RWMutex // protect counters
|
||||||
|
statusSnapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
type statusSnapshot struct {
|
||||||
allTime uint64
|
allTime uint64
|
||||||
allRequests uint64
|
allRequests uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MethodIndex index of method in list of statuses in clientStatusMonitor.
|
||||||
|
type MethodIndex int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
methodBalanceGet = "balanceGet"
|
methodBalanceGet MethodIndex = iota
|
||||||
methodContainerPut = "containerPut"
|
methodContainerPut
|
||||||
methodContainerGet = "containerGet"
|
methodContainerGet
|
||||||
methodContainerList = "containerList"
|
methodContainerList
|
||||||
methodContainerDelete = "containerDelete"
|
methodContainerDelete
|
||||||
methodContainerEACL = "containerEACL"
|
methodContainerEACL
|
||||||
methodContainerSetEACL = "containerSetEACL"
|
methodContainerSetEACL
|
||||||
methodEndpointInfo = "endpointInfo"
|
methodEndpointInfo
|
||||||
methodNetworkInfo = "networkInfo"
|
methodNetworkInfo
|
||||||
methodObjectPut = "objectPut"
|
methodObjectPut
|
||||||
methodObjectDelete = "objectDelete"
|
methodObjectDelete
|
||||||
methodObjectGet = "objectGet"
|
methodObjectGet
|
||||||
methodObjectHead = "objectHead"
|
methodObjectHead
|
||||||
methodObjectRange = "objectRange"
|
methodObjectRange
|
||||||
methodSessionCreate = "sessionCreate"
|
methodSessionCreate
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// String implements fmt.Stringer.
|
||||||
|
func (m MethodIndex) String() string {
|
||||||
|
switch m {
|
||||||
|
case methodBalanceGet:
|
||||||
|
return "balanceGet"
|
||||||
|
case methodContainerPut:
|
||||||
|
return "containerPut"
|
||||||
|
case methodContainerGet:
|
||||||
|
return "containerGet"
|
||||||
|
case methodContainerList:
|
||||||
|
return "containerList"
|
||||||
|
case methodContainerDelete:
|
||||||
|
return "containerDelete"
|
||||||
|
case methodContainerEACL:
|
||||||
|
return "containerEACL"
|
||||||
|
case methodContainerSetEACL:
|
||||||
|
return "containerSetEACL"
|
||||||
|
case methodEndpointInfo:
|
||||||
|
return "endpointInfo"
|
||||||
|
case methodNetworkInfo:
|
||||||
|
return "networkInfo"
|
||||||
|
case methodObjectPut:
|
||||||
|
return "objectPut"
|
||||||
|
case methodObjectDelete:
|
||||||
|
return "objectDelete"
|
||||||
|
case methodObjectGet:
|
||||||
|
return "objectGet"
|
||||||
|
case methodObjectHead:
|
||||||
|
return "objectHead"
|
||||||
|
case methodObjectRange:
|
||||||
|
return "objectRange"
|
||||||
|
case methodSessionCreate:
|
||||||
|
return "sessionCreate"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor {
|
||||||
|
methods := make([]*methodStatus, methodSessionCreate+1)
|
||||||
|
for i := methodBalanceGet; i <= methodSessionCreate; i++ {
|
||||||
|
methods[i] = &methodStatus{name: i.String()}
|
||||||
|
}
|
||||||
|
|
||||||
|
return clientStatusMonitor{
|
||||||
|
addr: addr,
|
||||||
|
healthy: atomic.NewBool(true),
|
||||||
|
errorThreshold: errorThreshold,
|
||||||
|
methods: methods,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *methodStatus) snapshot() statusSnapshot {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return m.statusSnapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *methodStatus) incRequests(elapsed time.Duration) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.allTime += uint64(elapsed)
|
||||||
|
m.allRequests++
|
||||||
|
}
|
||||||
|
|
||||||
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
||||||
type clientWrapper struct {
|
type clientWrapper struct {
|
||||||
client sdkClient.Client
|
client sdkClient.Client
|
||||||
|
@ -140,11 +213,7 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
|
||||||
|
|
||||||
res := &clientWrapper{
|
res := &clientWrapper{
|
||||||
key: prm.key,
|
key: prm.key,
|
||||||
clientStatusMonitor: clientStatusMonitor{
|
clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold),
|
||||||
addr: prm.address,
|
|
||||||
healthy: atomic.NewBool(true),
|
|
||||||
errorThreshold: prm.errorThreshold,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res.client.Init(prmInit)
|
res.client.Init(prmInit)
|
||||||
|
@ -656,28 +725,18 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
|
||||||
return c.overallErrorCount
|
return c.overallErrorCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) methodsStatus() map[string]methodStatus {
|
func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {
|
||||||
c.mu.RLock()
|
result := make([]statusSnapshot, len(c.methods))
|
||||||
defer c.mu.RUnlock()
|
for i, val := range c.methods {
|
||||||
|
result[i] = val.snapshot()
|
||||||
result := make(map[string]methodStatus)
|
|
||||||
for key, val := range c.methods {
|
|
||||||
result[key] = val
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method string) {
|
func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||||
c.mu.Lock()
|
methodStat := c.methods[method]
|
||||||
defer c.mu.Unlock()
|
methodStat.incRequests(elapsed)
|
||||||
methodStat, ok := c.methods[method]
|
|
||||||
if !ok {
|
|
||||||
methodStat.name = method
|
|
||||||
}
|
|
||||||
methodStat.allTime += uint64(elapsed)
|
|
||||||
methodStat.allRequests++
|
|
||||||
c.methods[method] = methodStat
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
|
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
|
||||||
|
|
|
@ -17,7 +17,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/atomic"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -508,16 +507,8 @@ func TestWaitPresence(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestStatusMonitor(addr string) clientStatusMonitor {
|
|
||||||
return clientStatusMonitor{
|
|
||||||
addr: addr,
|
|
||||||
healthy: atomic.NewBool(true),
|
|
||||||
errorThreshold: 10,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStatusMonitor(t *testing.T) {
|
func TestStatusMonitor(t *testing.T) {
|
||||||
monitor := newTestStatusMonitor("")
|
monitor := newClientStatusMonitor("", 10)
|
||||||
monitor.errorThreshold = 3
|
monitor.errorThreshold = 3
|
||||||
|
|
||||||
count := 10
|
count := 10
|
||||||
|
@ -530,7 +521,7 @@ func TestStatusMonitor(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
func TestHandleError(t *testing.T) {
|
||||||
monitor := newTestStatusMonitor("")
|
monitor := newClientStatusMonitor("", 10)
|
||||||
|
|
||||||
for i, tc := range []struct {
|
for i, tc := range []struct {
|
||||||
status apistatus.Status
|
status apistatus.Status
|
||||||
|
|
|
@ -63,7 +63,7 @@ func newNetmapMock(name string, needErr bool) *clientMock {
|
||||||
}
|
}
|
||||||
return &clientMock{
|
return &clientMock{
|
||||||
clientWrapper: clientWrapper{
|
clientWrapper: clientWrapper{
|
||||||
clientStatusMonitor: newTestStatusMonitor(""),
|
clientStatusMonitor: newClientStatusMonitor("", 10),
|
||||||
},
|
},
|
||||||
name: name,
|
name: name,
|
||||||
err: err,
|
err: err,
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
|
||||||
// NodeStatistic is metrics of certain connections.
|
// NodeStatistic is metrics of certain connections.
|
||||||
type NodeStatistic struct {
|
type NodeStatistic struct {
|
||||||
address string
|
address string
|
||||||
methods map[string]methodStatus
|
methods []statusSnapshot
|
||||||
overallErrors uint64
|
overallErrors uint64
|
||||||
currentErrors uint32
|
currentErrors uint32
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ func (n NodeStatistic) AverageCreateSession() time.Duration {
|
||||||
return n.averageTime(methodSessionCreate)
|
return n.averageTime(methodSessionCreate)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n NodeStatistic) averageTime(method string) time.Duration {
|
func (n NodeStatistic) averageTime(method MethodIndex) time.Duration {
|
||||||
stat := n.methods[method]
|
stat := n.methods[method]
|
||||||
if stat.allRequests == 0 {
|
if stat.allRequests == 0 {
|
||||||
return 0
|
return 0
|
||||||
|
|
Loading…
Reference in a new issue