diff --git a/pool/pool.go b/pool/pool.go index e33925dd..c475b6ab 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -33,36 +33,65 @@ import ( ) // client represents virtual connection to the single NeoFS network endpoint from which Pool is formed. +// This interface is expected to have exactly one production implementation - clientWrapper. +// Others are expected to be for test purposes only. type client interface { + // see clientWrapper.balanceGet. balanceGet(context.Context, PrmBalanceGet) (*accounting.Decimal, error) + // see clientWrapper.containerPut. containerPut(context.Context, PrmContainerPut) (*cid.ID, error) + // see clientWrapper.containerGet. containerGet(context.Context, PrmContainerGet) (*container.Container, error) + // see clientWrapper.containerList. containerList(context.Context, PrmContainerList) ([]cid.ID, error) + // see clientWrapper.containerDelete. containerDelete(context.Context, PrmContainerDelete) error + // see clientWrapper.containerEACL. containerEACL(context.Context, PrmContainerEACL) (*eacl.Table, error) + // see clientWrapper.containerSetEACL. containerSetEACL(context.Context, PrmContainerSetEACL) error + // see clientWrapper.endpointInfo. endpointInfo(context.Context, prmEndpointInfo) (*netmap.NodeInfo, error) + // see clientWrapper.networkInfo. networkInfo(context.Context, prmNetworkInfo) (*netmap.NetworkInfo, error) + // see clientWrapper.objectPut. objectPut(context.Context, PrmObjectPut) (*oid.ID, error) + // see clientWrapper.objectDelete. objectDelete(context.Context, PrmObjectDelete) error + // see clientWrapper.objectGet. objectGet(context.Context, PrmObjectGet) (*ResGetObject, error) + // see clientWrapper.objectHead. objectHead(context.Context, PrmObjectHead) (*object.Object, error) + // see clientWrapper.objectRange. objectRange(context.Context, PrmObjectRange) (*ResObjectRange, error) + // see clientWrapper.objectSearch. objectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error) + // see clientWrapper.sessionCreate. sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error) clientStatus } +// clientStatus provide access to some metrics for connection. type clientStatus interface { + // isHealthy checks if the connection can handle requests. isHealthy() bool + // setHealthy allows set healthy status for connection. + // It's used to update status during Pool.startRebalance routing. setHealthy(bool) bool + // address return address of endpoint. address() string + // currentErrorRate returns current errors rate. + // After specific threshold connection is considered as unhealthy. + // Pool.startRebalance routine can make this connection healthy again. currentErrorRate() uint32 + // overallErrorRate returns the number of all happened errors. overallErrorRate() uint64 + // methodsStatus returns statistic for all used methods. methodsStatus() []statusSnapshot } +// clientStatusMonitor count error rate and other statistics for connection. type clientStatusMonitor struct { addr string healthy *atomic.Bool @@ -74,12 +103,14 @@ type clientStatusMonitor struct { methods []*methodStatus } +// methodStatus provide statistic for specific method. type methodStatus struct { name string mu sync.RWMutex // protect counters statusSnapshot } +// statusSnapshot is statistic for specific method. type statusSnapshot struct { allTime uint64 allRequests uint64 @@ -178,6 +209,7 @@ type clientWrapper struct { clientStatusMonitor } +// wrapperPrm is params to create clientWrapper. type wrapperPrm struct { address string key ecdsa.PrivateKey @@ -186,26 +218,33 @@ type wrapperPrm struct { responseInfoCallback func(sdkClient.ResponseMetaInfo) error } +// setAddress sets endpoint to connect in NeoFS network. func (x *wrapperPrm) setAddress(address string) { x.address = address } +// setKey sets sdkClient.Client private key to be used for the protocol communication by default. func (x *wrapperPrm) setKey(key ecdsa.PrivateKey) { x.key = key } +// setTimeout sets the timeout for connection to be established. func (x *wrapperPrm) setTimeout(timeout time.Duration) { x.timeout = timeout } +// setErrorThreshold sets threshold after reaching which connection is considered unhealthy +// until Pool.startRebalance routing updates its status. func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } +// setResponseInfoCallback sets callback that will be invoked after every response. func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { x.responseInfoCallback = f } +// newWrapper creates a clientWrapper that implements the client interface. func newWrapper(prm wrapperPrm) (*clientWrapper, error) { var prmInit sdkClient.PrmInit prmInit.SetDefaultPrivateKey(prm.key) @@ -230,6 +269,8 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) { return res, nil } +// balanceGet invokes sdkClient.BalanceGet parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) { var cliPrm sdkClient.PrmBalanceGet cliPrm.SetAccount(prm.account) @@ -248,6 +289,8 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc return res.Amount(), nil } +// containerPut invokes sdkClient.ContainerPut parse response status to error and return result as is. +// It also waits for the container to appear on the network. func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { start := time.Now() res, err := c.client.ContainerPut(ctx, prm.prmClient) @@ -272,6 +315,8 @@ func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) ( return res.ID(), nil } +// containerGet invokes sdkClient.ContainerGet parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (*container.Container, error) { var cliPrm sdkClient.PrmContainerGet cliPrm.SetContainer(prm.cnrID) @@ -291,6 +336,7 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) ( return &cnr, nil } +// containerList invokes sdkClient.ContainerList parse response status to error and return result as is. func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) ([]cid.ID, error) { var cliPrm sdkClient.PrmContainerList cliPrm.SetAccount(prm.ownerID) @@ -308,6 +354,8 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) return res.Containers(), nil } +// containerDelete invokes sdkClient.ContainerDelete parse response status to error. +// It also waits for the container to be removed from the network. func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error { var cliPrm sdkClient.PrmContainerDelete cliPrm.SetContainer(prm.cnrID) @@ -333,6 +381,8 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel return waitForContainerRemoved(ctx, c, &prm.cnrID, &prm.waitParams) } +// containerEACL invokes sdkClient.ContainerEACL parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) { var cliPrm sdkClient.PrmContainerEACL cliPrm.SetContainer(prm.cnrID) @@ -350,6 +400,8 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) return res.Table(), nil } +// containerSetEACL invokes sdkClient.ContainerSetEACL parse response status to error. +// It also waits for the EACL to appear on the network. func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSetEACL) error { var cliPrm sdkClient.PrmContainerSetEACL cliPrm.SetTable(prm.table) @@ -386,6 +438,8 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe return nil } +// endpointInfo invokes sdkClient.EndpointInfo parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) { start := time.Now() res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) @@ -400,6 +454,8 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*n return res.NodeInfo(), nil } +// networkInfo invokes sdkClient.NetworkInfo parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) { start := time.Now() res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) @@ -414,6 +470,8 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*net return res.Info(), nil } +// objectPut writes object to NeoFS. +// Exactly one return value is non-nil. func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { var cliPrm sdkClient.PrmObjectPutInit start := time.Now() @@ -497,6 +555,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I return &id, nil } +// objectDelete invokes sdkClient.ObjectDelete parse response status to error. func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) error { var cliPrm sdkClient.PrmObjectDelete cliPrm.FromContainer(prm.addr.Container()) @@ -527,6 +586,8 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e return nil } +// objectGet returns reader for object. +// Exactly one return value is non-nil. func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { var cliPrm sdkClient.PrmObjectGet cliPrm.FromContainer(prm.addr.Container()) @@ -574,6 +635,8 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe return &res, nil } +// objectHead invokes sdkClient.ObjectHead parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*object.Object, error) { var cliPrm sdkClient.PrmObjectHead cliPrm.FromContainer(prm.addr.Container()) @@ -610,6 +673,8 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj return &obj, nil } +// objectRange returns object range reader. +// Exactly one return value is non-nil. func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) { var cliPrm sdkClient.PrmObjectRange cliPrm.FromContainer(prm.addr.Container()) @@ -643,6 +708,8 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R }, nil } +// objectSearch invokes sdkClient.ObjectSearchInit parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { var cliPrm sdkClient.PrmObjectSearch @@ -668,6 +735,8 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) ( return &ResObjectSearch{r: res}, nil } +// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is. +// Exactly one return value is non-nil. func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) (*resCreateSession, error) { var cliPrm sdkClient.PrmSessionCreate cliPrm.SetExp(prm.exp) @@ -1403,6 +1472,7 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { return nodesParams, nil } +// startRebalance runs loop to monitor connection healthy status. func (p *Pool) startRebalance(ctx context.Context) { ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) buffers := make([][]float64, len(p.rebalanceParams.nodesParams)) @@ -1798,6 +1868,7 @@ type ResGetObject struct { } // GetObject reads object header and initiates reading an object payload through a remote server using NeoFS API protocol. +// Exactly one return value is non-nil. func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { var prmCtx prmContext prmCtx.useDefaultSession() @@ -1823,6 +1894,7 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (*ResGetObject, } // HeadObject reads object header through a remote server using NeoFS API protocol. +// Exactly one return value is non-nil. func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) { var prmCtx prmContext prmCtx.useDefaultSession() @@ -1875,6 +1947,7 @@ func (x *ResObjectRange) Close() error { // ObjectRange initiates reading an object's payload range through a remote // server using NeoFS API protocol. +// Exactly one return value is non-nil. func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (*ResObjectRange, error) { var prmCtx prmContext prmCtx.useDefaultSession() @@ -1941,6 +2014,7 @@ func (x *ResObjectSearch) Close() { // The call only opens the transmission channel, explicit fetching of matched objects // is done using the ResObjectSearch. Exactly one return value is non-nil. // Resulting reader must be finally closed. +// Exactly one return value is non-nil. func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObjectSearch, error) { var prmCtx prmContext prmCtx.useDefaultSession() @@ -1984,6 +2058,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, } // GetContainer reads NeoFS container by ID. +// Exactly one return value is non-nil. func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*container.Container, error) { cp, err := p.connection() if err != nil { @@ -2020,6 +2095,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro } // GetEACL reads eACL table of the NeoFS container. +// Exactly one return value is non-nil. func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) { cp, err := p.connection() if err != nil { @@ -2046,6 +2122,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { } // Balance requests current balance of the NeoFS account. +// Exactly one return value is non-nil. func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) { cp, err := p.connection() if err != nil { @@ -2142,6 +2219,7 @@ func waitFor(ctx context.Context, params *WaitParams, condition func(context.Con } // NetworkInfo requests information about the NeoFS network of which the remote server is a part. +// Exactly one return value is non-nil. func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) { cp, err := p.connection() if err != nil {