[#291] pool: Improve docs

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
remotes/fyrchik/fuzz-tests
Denis Kirillov 2022-07-22 17:41:44 +03:00 committed by fyrchik
parent 90f1cc7a1a
commit 828cfdc5bf
1 changed files with 78 additions and 0 deletions

View File

@ -33,36 +33,65 @@ import (
)
// client represents virtual connection to the single NeoFS network endpoint from which Pool is formed.
// This interface is expected to have exactly one production implementation - clientWrapper.
// Others are expected to be for test purposes only.
type client interface {
// see clientWrapper.balanceGet.
balanceGet(context.Context, PrmBalanceGet) (*accounting.Decimal, error)
// see clientWrapper.containerPut.
containerPut(context.Context, PrmContainerPut) (*cid.ID, error)
// see clientWrapper.containerGet.
containerGet(context.Context, PrmContainerGet) (*container.Container, error)
// see clientWrapper.containerList.
containerList(context.Context, PrmContainerList) ([]cid.ID, error)
// see clientWrapper.containerDelete.
containerDelete(context.Context, PrmContainerDelete) error
// see clientWrapper.containerEACL.
containerEACL(context.Context, PrmContainerEACL) (*eacl.Table, error)
// see clientWrapper.containerSetEACL.
containerSetEACL(context.Context, PrmContainerSetEACL) error
// see clientWrapper.endpointInfo.
endpointInfo(context.Context, prmEndpointInfo) (*netmap.NodeInfo, error)
// see clientWrapper.networkInfo.
networkInfo(context.Context, prmNetworkInfo) (*netmap.NetworkInfo, error)
// see clientWrapper.objectPut.
objectPut(context.Context, PrmObjectPut) (*oid.ID, error)
// see clientWrapper.objectDelete.
objectDelete(context.Context, PrmObjectDelete) error
// see clientWrapper.objectGet.
objectGet(context.Context, PrmObjectGet) (*ResGetObject, error)
// see clientWrapper.objectHead.
objectHead(context.Context, PrmObjectHead) (*object.Object, error)
// see clientWrapper.objectRange.
objectRange(context.Context, PrmObjectRange) (*ResObjectRange, error)
// see clientWrapper.objectSearch.
objectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error)
// see clientWrapper.sessionCreate.
sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error)
clientStatus
}
// clientStatus provide access to some metrics for connection.
type clientStatus interface {
// isHealthy checks if the connection can handle requests.
isHealthy() bool
// setHealthy allows set healthy status for connection.
// It's used to update status during Pool.startRebalance routing.
setHealthy(bool) bool
// address return address of endpoint.
address() string
// currentErrorRate returns current errors rate.
// After specific threshold connection is considered as unhealthy.
// Pool.startRebalance routine can make this connection healthy again.
currentErrorRate() uint32
// overallErrorRate returns the number of all happened errors.
overallErrorRate() uint64
// methodsStatus returns statistic for all used methods.
methodsStatus() []statusSnapshot
}
// clientStatusMonitor count error rate and other statistics for connection.
type clientStatusMonitor struct {
addr string
healthy *atomic.Bool
@ -74,12 +103,14 @@ type clientStatusMonitor struct {
methods []*methodStatus
}
// methodStatus provide statistic for specific method.
type methodStatus struct {
name string
mu sync.RWMutex // protect counters
statusSnapshot
}
// statusSnapshot is statistic for specific method.
type statusSnapshot struct {
allTime uint64
allRequests uint64
@ -178,6 +209,7 @@ type clientWrapper struct {
clientStatusMonitor
}
// wrapperPrm is params to create clientWrapper.
type wrapperPrm struct {
address string
key ecdsa.PrivateKey
@ -186,26 +218,33 @@ type wrapperPrm struct {
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
}
// setAddress sets endpoint to connect in NeoFS network.
func (x *wrapperPrm) setAddress(address string) {
x.address = address
}
// setKey sets sdkClient.Client private key to be used for the protocol communication by default.
func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) {
x.key = key
}
// setTimeout sets the timeout for connection to be established.
func (x *wrapperPrm) setTimeout(timeout time.Duration) {
x.timeout = timeout
}
// setErrorThreshold sets threshold after reaching which connection is considered unhealthy
// until Pool.startRebalance routing updates its status.
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
// setResponseInfoCallback sets callback that will be invoked after every response.
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
x.responseInfoCallback = f
}
// newWrapper creates a clientWrapper that implements the client interface.
func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(prm.key)
@ -230,6 +269,8 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
return res, nil
}
// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) {
var cliPrm sdkClient.PrmBalanceGet
cliPrm.SetAccount(prm.account)
@ -248,6 +289,8 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc
return res.Amount(), nil
}
// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is.
// It also waits for the container to appear on the network.
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
start := time.Now()
res, err := c.client.ContainerPut(ctx, prm.prmClient)
@ -272,6 +315,8 @@ func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (
return res.ID(), nil
}
// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (*container.Container, error) {
var cliPrm sdkClient.PrmContainerGet
cliPrm.SetContainer(prm.cnrID)
@ -291,6 +336,7 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (
return &cnr, nil
}
// containerList invokes sdkClient.ContainerList parse response status to error and return result as is.
func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) {
var cliPrm sdkClient.PrmContainerList
cliPrm.SetAccount(prm.ownerID)
@ -308,6 +354,8 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
return res.Containers(), nil
}
// containerDelete invokes sdkClient.ContainerDelete parse response status to error.
// It also waits for the container to be removed from the network.
func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error {
var cliPrm sdkClient.PrmContainerDelete
cliPrm.SetContainer(prm.cnrID)
@ -333,6 +381,8 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel
return waitForContainerRemoved(ctx, c, &prm.cnrID, &prm.waitParams)
}
// containerEACL invokes sdkClient.ContainerEACL parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) {
var cliPrm sdkClient.PrmContainerEACL
cliPrm.SetContainer(prm.cnrID)
@ -350,6 +400,8 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
return res.Table(), nil
}
// containerSetEACL invokes sdkClient.ContainerSetEACL parse response status to error.
// It also waits for the EACL to appear on the network.
func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
var cliPrm sdkClient.PrmContainerSetEACL
cliPrm.SetTable(prm.table)
@ -386,6 +438,8 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
return nil
}
// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) {
start := time.Now()
res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
@ -400,6 +454,8 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*n
return res.NodeInfo(), nil
}
// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) {
start := time.Now()
res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
@ -414,6 +470,8 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*net
return res.Info(), nil
}
// objectPut writes object to NeoFS.
// Exactly one return value is non-nil.
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) {
var cliPrm sdkClient.PrmObjectPutInit
start := time.Now()
@ -497,6 +555,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I
return &id, nil
}
// objectDelete invokes sdkClient.ObjectDelete parse response status to error.
func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error {
var cliPrm sdkClient.PrmObjectDelete
cliPrm.FromContainer(prm.addr.Container())
@ -527,6 +586,8 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
return nil
}
// objectGet returns reader for object.
// Exactly one return value is non-nil.
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) {
var cliPrm sdkClient.PrmObjectGet
cliPrm.FromContainer(prm.addr.Container())
@ -574,6 +635,8 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe
return &res, nil
}
// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
var cliPrm sdkClient.PrmObjectHead
cliPrm.FromContainer(prm.addr.Container())
@ -610,6 +673,8 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj
return &obj, nil
}
// objectRange returns object range reader.
// Exactly one return value is non-nil.
func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) {
var cliPrm sdkClient.PrmObjectRange
cliPrm.FromContainer(prm.addr.Container())
@ -643,6 +708,8 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R
}, nil
}
// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) {
var cliPrm sdkClient.PrmObjectSearch
@ -668,6 +735,8 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
return &ResObjectSearch{r: res}, nil
}
// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is.
// Exactly one return value is non-nil.
func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (*resCreateSession, error) {
var cliPrm sdkClient.PrmSessionCreate
cliPrm.SetExp(prm.exp)
@ -1403,6 +1472,7 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
return nodesParams, nil
}
// startRebalance runs loop to monitor connection healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
@ -1798,6 +1868,7 @@ type ResGetObject struct {
}
// GetObject reads object header and initiates reading an object payload through a remote server using NeoFS API protocol.
// Exactly one return value is non-nil.
func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) {
var prmCtx prmContext
prmCtx.useDefaultSession()
@ -1823,6 +1894,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject,
}
// HeadObject reads object header through a remote server using NeoFS API protocol.
// Exactly one return value is non-nil.
func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
var prmCtx prmContext
prmCtx.useDefaultSession()
@ -1875,6 +1947,7 @@ func (x *ResObjectRange) Close() error {
// ObjectRange initiates reading an object's payload range through a remote
// server using NeoFS API protocol.
// Exactly one return value is non-nil.
func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) {
var prmCtx prmContext
prmCtx.useDefaultSession()
@ -1941,6 +2014,7 @@ func (x *ResObjectSearch) Close() {
// The call only opens the transmission channel, explicit fetching of matched objects
// is done using the ResObjectSearch. Exactly one return value is non-nil.
// Resulting reader must be finally closed.
// Exactly one return value is non-nil.
func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) {
var prmCtx prmContext
prmCtx.useDefaultSession()
@ -1984,6 +2058,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID,
}
// GetContainer reads NeoFS container by ID.
// Exactly one return value is non-nil.
func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*container.Container, error) {
cp, err := p.connection()
if err != nil {
@ -2020,6 +2095,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
}
// GetEACL reads eACL table of the NeoFS container.
// Exactly one return value is non-nil.
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) {
cp, err := p.connection()
if err != nil {
@ -2046,6 +2122,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
}
// Balance requests current balance of the NeoFS account.
// Exactly one return value is non-nil.
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) {
cp, err := p.connection()
if err != nil {
@ -2142,6 +2219,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con
}
// NetworkInfo requests information about the NeoFS network of which the remote server is a part.
// Exactly one return value is non-nil.
func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) {
cp, err := p.connection()
if err != nil {