[#283] pool: Compute avg time for every method

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-19 16:13:59 +03:00 committed by fyrchik
parent 54145916a9
commit b4f4ee4f79
2 changed files with 155 additions and 48 deletions

View file

@ -60,8 +60,7 @@ type clientStatus interface {
address() string address() string
currentErrorRate() uint32 currentErrorRate() uint32
overallErrorRate() uint64 overallErrorRate() uint64
latency() time.Duration methodsStatus() map[string]methodStatus
requests() uint64
} }
type clientStatusMonitor struct { type clientStatusMonitor struct {
@ -72,10 +71,33 @@ type clientStatusMonitor struct {
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
currentErrorCount uint32 currentErrorCount uint32
overallErrorCount uint64 overallErrorCount uint64
methods map[string]methodStatus
}
type methodStatus struct {
name string
allTime uint64 allTime uint64
allRequests uint64 allRequests uint64
} }
const (
methodBalanceGet = "balanceGet"
methodContainerPut = "containerPut"
methodContainerGet = "containerGet"
methodContainerList = "containerList"
methodContainerDelete = "containerDelete"
methodContainerEACL = "containerEACL"
methodContainerSetEACL = "containerSetEACL"
methodEndpointInfo = "endpointInfo"
methodNetworkInfo = "networkInfo"
methodObjectPut = "objectPut"
methodObjectDelete = "objectDelete"
methodObjectGet = "objectGet"
methodObjectHead = "objectHead"
methodObjectRange = "objectRange"
methodSessionCreate = "sessionCreate"
)
// clientWrapper is used by default, alternative implementations are intended for testing purposes only. // clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct { type clientWrapper struct {
client sdkClient.Client client sdkClient.Client
@ -145,7 +167,7 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc
start := time.Now() start := time.Now()
res, err := c.client.BalanceGet(ctx, cliPrm) res, err := c.client.BalanceGet(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodBalanceGet)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -160,7 +182,7 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
start := time.Now() start := time.Now()
res, err := c.client.ContainerPut(ctx, prm.prmClient) res, err := c.client.ContainerPut(ctx, prm.prmClient)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodContainerPut)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -187,7 +209,7 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (
start := time.Now() start := time.Now()
res, err := c.client.ContainerGet(ctx, cliPrm) res, err := c.client.ContainerGet(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodContainerGet)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -206,7 +228,7 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
start := time.Now() start := time.Now()
res, err := c.client.ContainerList(ctx, cliPrm) res, err := c.client.ContainerList(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodContainerList)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -226,7 +248,7 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel
start := time.Now() start := time.Now()
res, err := c.client.ContainerDelete(ctx, cliPrm) res, err := c.client.ContainerDelete(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodContainerDelete)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -248,7 +270,7 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
start := time.Now() start := time.Now()
res, err := c.client.ContainerEACL(ctx, cliPrm) res, err := c.client.ContainerEACL(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodContainerEACL)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -269,7 +291,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
start := time.Now() start := time.Now()
res, err := c.client.ContainerSetEACL(ctx, cliPrm) res, err := c.client.ContainerSetEACL(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodContainerSetEACL)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -298,7 +320,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) { func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) {
start := time.Now() start := time.Now()
res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodEndpointInfo)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -312,7 +334,7 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*n
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) { func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) {
start := time.Now() start := time.Now()
res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodNetworkInfo)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -327,7 +349,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I
var cliPrm sdkClient.PrmObjectPutInit var cliPrm sdkClient.PrmObjectPutInit
start := time.Now() start := time.Now()
wObj, err := c.client.ObjectPutInit(ctx, cliPrm) wObj, err := c.client.ObjectPutInit(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(nil, err); err != nil { if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("init writing on API client: %w", err) return nil, fmt.Errorf("init writing on API client: %w", err)
} }
@ -371,7 +393,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I
if n > 0 { if n > 0 {
start = time.Now() start = time.Now()
successWrite := wObj.WritePayloadChunk(buf[:n]) successWrite := wObj.WritePayloadChunk(buf[:n])
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodObjectPut)
if !successWrite { if !successWrite {
break break
} }
@ -425,7 +447,7 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
start := time.Now() start := time.Now()
res, err := c.client.ObjectDelete(ctx, cliPrm) res, err := c.client.ObjectDelete(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodObjectDelete)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -462,7 +484,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe
start := time.Now() start := time.Now()
successReadHeader := rObj.ReadHeader(&res.Header) successReadHeader := rObj.ReadHeader(&res.Header)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodObjectGet)
if !successReadHeader { if !successReadHeader {
rObjRes, err := rObj.Close() rObjRes, err := rObj.Close()
var st apistatus.Status var st apistatus.Status
@ -475,7 +497,9 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe
res.Payload = &objectReadCloser{ res.Payload = &objectReadCloser{
reader: rObj, reader: rObj,
elapsedTimeCallback: c.incRequests, elapsedTimeCallback: func(elapsed time.Duration) {
c.incRequests(elapsed, methodObjectGet)
},
} }
return &res, nil return &res, nil
@ -502,7 +526,7 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj
start := time.Now() start := time.Now()
res, err := c.client.ObjectHead(ctx, cliPrm) res, err := c.client.ObjectHead(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodObjectHead)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -534,7 +558,7 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R
start := time.Now() start := time.Now()
res, err := c.client.ObjectRangeInit(ctx, cliPrm) res, err := c.client.ObjectRangeInit(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodObjectRange)
if err = c.handleError(nil, err); err != nil { if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("init payload range reading on client: %w", err) return nil, fmt.Errorf("init payload range reading on client: %w", err)
} }
@ -544,7 +568,9 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R
return &ResObjectRange{ return &ResObjectRange{
payload: res, payload: res,
elapsedTimeCallback: c.incRequests, elapsedTimeCallback: func(elapsed time.Duration) {
c.incRequests(elapsed, methodObjectRange)
},
}, nil }, nil
} }
@ -580,7 +606,7 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
start := time.Now() start := time.Now()
res, err := c.client.SessionCreate(ctx, cliPrm) res, err := c.client.SessionCreate(ctx, cliPrm)
c.incRequests(time.Since(start)) c.incRequests(time.Since(start), methodSessionCreate)
var st apistatus.Status var st apistatus.Status
if res != nil { if res != nil {
st = res.Status() st = res.Status()
@ -630,26 +656,28 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 {
return c.overallErrorCount return c.overallErrorCount
} }
func (c *clientStatusMonitor) latency() time.Duration { func (c *clientStatusMonitor) methodsStatus() map[string]methodStatus {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
if c.allRequests == 0 {
return 0 result := make(map[string]methodStatus)
for key, val := range c.methods {
result[key] = val
} }
return time.Duration(c.allTime / c.allRequests)
return result
} }
func (c *clientStatusMonitor) requests() uint64 { func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method string) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.allRequests
}
func (c *clientStatusMonitor) incRequests(elapsed time.Duration) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.allTime += uint64(elapsed) methodStat, ok := c.methods[method]
c.allRequests++ if !ok {
methodStat.name = method
}
methodStat.allTime += uint64(elapsed)
methodStat.allRequests++
c.methods[method] = methodStat
} }
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
@ -1976,8 +2004,7 @@ func (p Pool) Statistic() Statistic {
for _, cl := range inner.clients { for _, cl := range inner.clients {
node := NodeStatistic{ node := NodeStatistic{
address: cl.address(), address: cl.address(),
latency: cl.latency(), methods: cl.methodsStatus(),
requests: cl.requests(),
overallErrors: cl.overallErrorRate(), overallErrors: cl.overallErrorRate(),
currentErrors: cl.currentErrorRate(), currentErrors: cl.currentErrorRate(),
} }

View file

@ -39,8 +39,7 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) {
// NodeStatistic is metrics of certain connections. // NodeStatistic is metrics of certain connections.
type NodeStatistic struct { type NodeStatistic struct {
address string address string
latency time.Duration methods map[string]methodStatus
requests uint64
overallErrors uint64 overallErrors uint64
currentErrors uint32 currentErrors uint32
} }
@ -57,17 +56,98 @@ func (n NodeStatistic) CurrentErrors() uint32 {
return n.currentErrors return n.currentErrors
} }
// Latency returns average latency for node request.
func (n NodeStatistic) Latency() time.Duration {
return n.latency
}
// Requests returns number of requests. // Requests returns number of requests.
func (n NodeStatistic) Requests() uint64 { func (n NodeStatistic) Requests() (requests uint64) {
return n.requests for _, val := range n.methods {
requests += val.allRequests
}
return requests
} }
// Address returns node endpoint address. // Address returns node endpoint address.
func (n NodeStatistic) Address() string { func (n NodeStatistic) Address() string {
return n.address return n.address
} }
// AverageGetBalance returns average time to perform BalanceGet request.
func (n NodeStatistic) AverageGetBalance() time.Duration {
return n.averageTime(methodBalanceGet)
}
// AveragePutContainer returns average time to perform ContainerPut request.
func (n NodeStatistic) AveragePutContainer() time.Duration {
return n.averageTime(methodContainerPut)
}
// AverageGetContainer returns average time to perform ContainerGet request.
func (n NodeStatistic) AverageGetContainer() time.Duration {
return n.averageTime(methodContainerGet)
}
// AverageListContainer returns average time to perform ContainerList request.
func (n NodeStatistic) AverageListContainer() time.Duration {
return n.averageTime(methodContainerList)
}
// AverageDeleteContainer returns average time to perform ContainerDelete request.
func (n NodeStatistic) AverageDeleteContainer() time.Duration {
return n.averageTime(methodContainerDelete)
}
// AverageGetContainerEACL returns average time to perform ContainerEACL request.
func (n NodeStatistic) AverageGetContainerEACL() time.Duration {
return n.averageTime(methodContainerEACL)
}
// AverageSetContainerEACL returns average time to perform ContainerSetEACL request.
func (n NodeStatistic) AverageSetContainerEACL() time.Duration {
return n.averageTime(methodContainerSetEACL)
}
// AverageEndpointInfo returns average time to perform EndpointInfo request.
func (n NodeStatistic) AverageEndpointInfo() time.Duration {
return n.averageTime(methodEndpointInfo)
}
// AverageNetworkInfo returns average time to perform NetworkInfo request.
func (n NodeStatistic) AverageNetworkInfo() time.Duration {
return n.averageTime(methodNetworkInfo)
}
// AveragePutObject returns average time to perform ObjectPut request.
func (n NodeStatistic) AveragePutObject() time.Duration {
return n.averageTime(methodObjectPut)
}
// AverageDeleteObject returns average time to perform ObjectDelete request.
func (n NodeStatistic) AverageDeleteObject() time.Duration {
return n.averageTime(methodObjectDelete)
}
// AverageGetObject returns average time to perform ObjectGet request.
func (n NodeStatistic) AverageGetObject() time.Duration {
return n.averageTime(methodObjectGet)
}
// AverageHeadObject returns average time to perform ObjectHead request.
func (n NodeStatistic) AverageHeadObject() time.Duration {
return n.averageTime(methodObjectHead)
}
// AverageRangeObject returns average time to perform ObjectRange request.
func (n NodeStatistic) AverageRangeObject() time.Duration {
return n.averageTime(methodObjectRange)
}
// AverageCreateSession returns average time to perform SessionCreate request.
func (n NodeStatistic) AverageCreateSession() time.Duration {
return n.averageTime(methodSessionCreate)
}
func (n NodeStatistic) averageTime(method string) time.Duration {
stat := n.methods[method]
if stat.allRequests == 0 {
return 0
}
return time.Duration(stat.allTime / stat.allRequests)
}