forked from TrueCloudLab/frostfs-sdk-go
[#283] pool: Add latency calculating
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
99e185690e
commit
423804de84
1 changed files with 102 additions and 25 deletions
113
pool/pool.go
113
pool/pool.go
|
@ -58,15 +58,20 @@ type clientStatus interface {
|
||||||
isHealthy() bool
|
isHealthy() bool
|
||||||
setHealthy(bool) bool
|
setHealthy(bool) bool
|
||||||
address() string
|
address() string
|
||||||
errorRate() uint32
|
currentErrorRate() uint32
|
||||||
|
overallErrorRate() uint64
|
||||||
resetErrorCounter()
|
resetErrorCounter()
|
||||||
|
latency() time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientStatusMonitor struct {
|
type clientStatusMonitor struct {
|
||||||
addr string
|
addr string
|
||||||
healthy *atomic.Bool
|
healthy *atomic.Bool
|
||||||
errorCount *atomic.Uint32
|
currentErrorCount *atomic.Uint32
|
||||||
|
overallErrorCount *atomic.Uint64
|
||||||
errorThreshold uint32
|
errorThreshold uint32
|
||||||
|
allTime *atomic.Uint64
|
||||||
|
allRequests *atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
|
||||||
|
@ -115,8 +120,11 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
|
||||||
clientStatusMonitor: &clientStatusMonitor{
|
clientStatusMonitor: &clientStatusMonitor{
|
||||||
addr: prm.address,
|
addr: prm.address,
|
||||||
healthy: atomic.NewBool(true),
|
healthy: atomic.NewBool(true),
|
||||||
errorCount: atomic.NewUint32(0),
|
currentErrorCount: atomic.NewUint32(0),
|
||||||
|
overallErrorCount: atomic.NewUint64(0),
|
||||||
errorThreshold: prm.errorThreshold,
|
errorThreshold: prm.errorThreshold,
|
||||||
|
allTime: atomic.NewUint64(0),
|
||||||
|
allRequests: atomic.NewUint64(0),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,15 +146,20 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc
|
||||||
var cliPrm sdkClient.PrmBalanceGet
|
var cliPrm sdkClient.PrmBalanceGet
|
||||||
cliPrm.SetAccount(prm.account)
|
cliPrm.SetAccount(prm.account)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.BalanceGet(ctx, cliPrm)
|
res, err := c.client.BalanceGet(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("balance get on client: %w", err)
|
return nil, fmt.Errorf("balance get on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return res.Amount(), nil
|
return res.Amount(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
|
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ContainerPut(ctx, prm.prmClient)
|
res, err := c.client.ContainerPut(ctx, prm.prmClient)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("container put on client: %w", err)
|
return nil, fmt.Errorf("container put on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -167,13 +180,14 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (
|
||||||
var cliPrm sdkClient.PrmContainerGet
|
var cliPrm sdkClient.PrmContainerGet
|
||||||
cliPrm.SetContainer(prm.cnrID)
|
cliPrm.SetContainer(prm.cnrID)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ContainerGet(ctx, cliPrm)
|
res, err := c.client.ContainerGet(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("container get on client: %w", err)
|
return nil, fmt.Errorf("container get on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnr := res.Container()
|
cnr := res.Container()
|
||||||
|
|
||||||
return &cnr, nil
|
return &cnr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,7 +195,9 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
|
||||||
var cliPrm sdkClient.PrmContainerList
|
var cliPrm sdkClient.PrmContainerList
|
||||||
cliPrm.SetAccount(prm.ownerID)
|
cliPrm.SetAccount(prm.ownerID)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ContainerList(ctx, cliPrm)
|
res, err := c.client.ContainerList(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("container list on client: %w", err)
|
return nil, fmt.Errorf("container list on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -195,7 +211,9 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel
|
||||||
cliPrm.WithinSession(prm.stoken)
|
cliPrm.WithinSession(prm.stoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ContainerDelete(ctx, cliPrm)
|
res, err := c.client.ContainerDelete(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return fmt.Errorf("container delete on client: %w", err)
|
return fmt.Errorf("container delete on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -211,7 +229,9 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
|
||||||
var cliPrm sdkClient.PrmContainerEACL
|
var cliPrm sdkClient.PrmContainerEACL
|
||||||
cliPrm.SetContainer(prm.cnrID)
|
cliPrm.SetContainer(prm.cnrID)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ContainerEACL(ctx, cliPrm)
|
res, err := c.client.ContainerEACL(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("get eacl on client: %w", err)
|
return nil, fmt.Errorf("get eacl on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -226,7 +246,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
|
||||||
cliPrm.WithinSession(prm.session)
|
cliPrm.WithinSession(prm.session)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ContainerSetEACL(ctx, cliPrm)
|
res, err := c.client.ContainerSetEACL(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return fmt.Errorf("set eacl on client: %w", err)
|
return fmt.Errorf("set eacl on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -249,7 +271,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) {
|
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) {
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
|
res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("endpoint info on client: %w", err)
|
return nil, fmt.Errorf("endpoint info on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -257,7 +281,9 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*n
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) {
|
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) {
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("network info on client: %w", err)
|
return nil, fmt.Errorf("network info on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -266,7 +292,9 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*net
|
||||||
|
|
||||||
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) {
|
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) {
|
||||||
var cliPrm sdkClient.PrmObjectPutInit
|
var cliPrm sdkClient.PrmObjectPutInit
|
||||||
|
start := time.Now()
|
||||||
wObj, err := c.client.ObjectPutInit(ctx, cliPrm)
|
wObj, err := c.client.ObjectPutInit(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(nil, err); err != nil {
|
if err = c.handleError(nil, err); err != nil {
|
||||||
return nil, fmt.Errorf("init writing on API client: %w", err)
|
return nil, fmt.Errorf("init writing on API client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -308,7 +336,10 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I
|
||||||
for {
|
for {
|
||||||
n, err = prm.payload.Read(buf)
|
n, err = prm.payload.Read(buf)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
if !wObj.WritePayloadChunk(buf[:n]) {
|
start = time.Now()
|
||||||
|
successWrite := wObj.WritePayloadChunk(buf[:n])
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
|
if !successWrite {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -354,7 +385,10 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
|
||||||
if prm.key != nil {
|
if prm.key != nil {
|
||||||
cliPrm.UseKey(*prm.key)
|
cliPrm.UseKey(*prm.key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ObjectDelete(ctx, cliPrm)
|
res, err := c.client.ObjectDelete(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return fmt.Errorf("delete object on client: %w", err)
|
return fmt.Errorf("delete object on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -385,13 +419,19 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe
|
||||||
rObj.UseKey(*prm.key)
|
rObj.UseKey(*prm.key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !rObj.ReadHeader(&res.Header) {
|
start := time.Now()
|
||||||
|
successReadHeader := rObj.ReadHeader(&res.Header)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
|
if !successReadHeader {
|
||||||
rObjRes, err := rObj.Close()
|
rObjRes, err := rObj.Close()
|
||||||
err = c.handleError(rObjRes.Status(), err)
|
err = c.handleError(rObjRes.Status(), err)
|
||||||
return nil, fmt.Errorf("read header: %w", err)
|
return nil, fmt.Errorf("read header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res.Payload = (*objectReadCloser)(rObj)
|
res.Payload = &objectReadCloser{
|
||||||
|
reader: rObj,
|
||||||
|
elapsedTimeCallback: c.incRequests,
|
||||||
|
}
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
@ -415,7 +455,9 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj
|
||||||
|
|
||||||
var obj object.Object
|
var obj object.Object
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ObjectHead(ctx, cliPrm)
|
res, err := c.client.ObjectHead(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("read object header via client: %w", err)
|
return nil, fmt.Errorf("read object header via client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -441,7 +483,9 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R
|
||||||
cliPrm.WithBearerToken(*prm.btoken)
|
cliPrm.WithBearerToken(*prm.btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.ObjectRangeInit(ctx, cliPrm)
|
res, err := c.client.ObjectRangeInit(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(nil, err); err != nil {
|
if err = c.handleError(nil, err); err != nil {
|
||||||
return nil, fmt.Errorf("init payload range reading on client: %w", err)
|
return nil, fmt.Errorf("init payload range reading on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -449,7 +493,10 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R
|
||||||
res.UseKey(*prm.key)
|
res.UseKey(*prm.key)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ResObjectRange{payload: res}, nil
|
return &ResObjectRange{
|
||||||
|
payload: res,
|
||||||
|
elapsedTimeCallback: c.incRequests,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) {
|
func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) {
|
||||||
|
@ -482,7 +529,9 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
|
||||||
cliPrm.SetExp(prm.exp)
|
cliPrm.SetExp(prm.exp)
|
||||||
cliPrm.UseKey(prm.key)
|
cliPrm.UseKey(prm.key)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
res, err := c.client.SessionCreate(ctx, cliPrm)
|
res, err := c.client.SessionCreate(ctx, cliPrm)
|
||||||
|
c.incRequests(time.Since(start))
|
||||||
if err = c.handleError(res.Status(), err); err != nil {
|
if err = c.handleError(res.Status(), err); err != nil {
|
||||||
return nil, fmt.Errorf("session creation on client: %w", err)
|
return nil, fmt.Errorf("session creation on client: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -505,17 +554,35 @@ func (c *clientStatusMonitor) address() string {
|
||||||
return c.addr
|
return c.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) errorRate() uint32 {
|
func (c *clientStatusMonitor) incErrorRate() {
|
||||||
return c.errorCount.Load()
|
c.currentErrorCount.Inc()
|
||||||
|
c.overallErrorCount.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientStatusMonitor) currentErrorRate() uint32 {
|
||||||
|
return c.currentErrorCount.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientStatusMonitor) overallErrorRate() uint64 {
|
||||||
|
return c.overallErrorCount.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) resetErrorCounter() {
|
func (c *clientStatusMonitor) resetErrorCounter() {
|
||||||
c.errorCount.Store(0)
|
c.currentErrorCount.Store(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientStatusMonitor) latency() time.Duration {
|
||||||
|
return time.Duration(c.allTime.Load() / c.allRequests.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientStatusMonitor) incRequests(elapsed time.Duration) {
|
||||||
|
c.allTime.Add(uint64(elapsed))
|
||||||
|
c.allRequests.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
|
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.errorCount.Inc()
|
c.incErrorRate()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,8 +591,8 @@ func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error
|
||||||
case apistatus.ServerInternal, *apistatus.ServerInternal,
|
case apistatus.ServerInternal, *apistatus.ServerInternal,
|
||||||
apistatus.WrongMagicNumber, *apistatus.WrongMagicNumber,
|
apistatus.WrongMagicNumber, *apistatus.WrongMagicNumber,
|
||||||
apistatus.SignatureVerification, *apistatus.SignatureVerification:
|
apistatus.SignatureVerification, *apistatus.SignatureVerification:
|
||||||
c.errorCount.Inc()
|
c.incErrorRate()
|
||||||
if c.errorCount.Load() >= c.errorThreshold {
|
if c.currentErrorRate() >= c.errorThreshold {
|
||||||
c.setHealthy(false)
|
c.setHealthy(false)
|
||||||
c.resetErrorCounter()
|
c.resetErrorCounter()
|
||||||
}
|
}
|
||||||
|
@ -1549,16 +1616,22 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type objectReadCloser sdkClient.ObjectReader
|
type objectReadCloser struct {
|
||||||
|
reader *sdkClient.ObjectReader
|
||||||
|
elapsedTimeCallback func(time.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
// Read implements io.Reader of the object payload.
|
// Read implements io.Reader of the object payload.
|
||||||
func (x *objectReadCloser) Read(p []byte) (int, error) {
|
func (x *objectReadCloser) Read(p []byte) (int, error) {
|
||||||
return (*sdkClient.ObjectReader)(x).Read(p)
|
start := time.Now()
|
||||||
|
n, err := x.reader.Read(p)
|
||||||
|
x.elapsedTimeCallback(time.Since(start))
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements io.Closer of the object payload.
|
// Close implements io.Closer of the object payload.
|
||||||
func (x *objectReadCloser) Close() error {
|
func (x *objectReadCloser) Close() error {
|
||||||
_, err := (*sdkClient.ObjectReader)(x).Close()
|
_, err := x.reader.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1627,11 +1700,15 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Objec
|
||||||
// usage is unsafe.
|
// usage is unsafe.
|
||||||
type ResObjectRange struct {
|
type ResObjectRange struct {
|
||||||
payload *sdkClient.ObjectRangeReader
|
payload *sdkClient.ObjectRangeReader
|
||||||
|
elapsedTimeCallback func(time.Duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read implements io.Reader of the object payload.
|
// Read implements io.Reader of the object payload.
|
||||||
func (x *ResObjectRange) Read(p []byte) (int, error) {
|
func (x *ResObjectRange) Read(p []byte) (int, error) {
|
||||||
return x.payload.Read(p)
|
start := time.Now()
|
||||||
|
n, err := x.payload.Read(p)
|
||||||
|
x.elapsedTimeCallback(time.Since(start))
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close ends reading the payload range and returns the result of the operation
|
// Close ends reading the payload range and returns the result of the operation
|
||||||
|
|
Loading…
Reference in a new issue