forked from TrueCloudLab/frostfs-sdk-go
Compare commits
6 commits
d48788c7a9
...
6fdbe75517
Author | SHA1 | Date | |
---|---|---|---|
6fdbe75517 | |||
3353940554 | |||
a3b5d4d4f5 | |||
0314b326d3 | |||
0382785763 | |||
548a81d3e6 |
6 changed files with 147 additions and 304 deletions
|
@ -13,9 +13,9 @@ jobs:
|
|||
- name: Setup Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.20'
|
||||
go-version: '1.21'
|
||||
|
||||
- name: Run commit format checker
|
||||
uses: https://git.alexvan.in/alexvanin/dco-go@v1
|
||||
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v2
|
||||
with:
|
||||
from: 406c2324
|
||||
from: 'origin/${{ github.event.pull_request.base.ref }}'
|
||||
|
|
|
@ -2,12 +2,16 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
)
|
||||
|
||||
|
@ -26,6 +30,24 @@ func (x *PrmBalanceGet) SetAccount(id user.ID) {
|
|||
x.accountSet = true
|
||||
}
|
||||
|
||||
func (x *PrmBalanceGet) buildRequest(c *Client) (*v2accounting.BalanceRequest, error) {
|
||||
if !x.accountSet {
|
||||
return nil, errorAccountNotSet
|
||||
}
|
||||
|
||||
var accountV2 refs.OwnerID
|
||||
x.account.WriteToV2(&accountV2)
|
||||
|
||||
var body v2accounting.BalanceRequestBody
|
||||
body.SetOwnerID(&accountV2)
|
||||
|
||||
var req v2accounting.BalanceRequest
|
||||
req.SetBody(&body)
|
||||
|
||||
c.prepareRequest(&req, new(v2session.RequestMetaHeader))
|
||||
return &req, nil
|
||||
}
|
||||
|
||||
// ResBalanceGet groups resulting values of BalanceGet operation.
|
||||
type ResBalanceGet struct {
|
||||
statusRes
|
||||
|
@ -52,57 +74,35 @@ func (x ResBalanceGet) Amount() accounting.Decimal {
|
|||
// Return statuses:
|
||||
// - global (see Client docs).
|
||||
func (c *Client) BalanceGet(ctx context.Context, prm PrmBalanceGet) (*ResBalanceGet, error) {
|
||||
if !prm.accountSet {
|
||||
return nil, errorAccountNotSet
|
||||
req, err := prm.buildRequest(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// form request body
|
||||
var accountV2 refs.OwnerID
|
||||
prm.account.WriteToV2(&accountV2)
|
||||
|
||||
var body v2accounting.BalanceRequestBody
|
||||
body.SetOwnerID(&accountV2)
|
||||
|
||||
// form request
|
||||
var req v2accounting.BalanceRequest
|
||||
|
||||
req.SetBody(&body)
|
||||
|
||||
// init call context
|
||||
|
||||
var (
|
||||
cc contextCall
|
||||
res ResBalanceGet
|
||||
)
|
||||
|
||||
c.initCallContext(&cc)
|
||||
cc.meta = prm.prmCommonMeta
|
||||
cc.req = &req
|
||||
cc.statusRes = &res
|
||||
cc.call = func() (responseV2, error) {
|
||||
return rpcapi.Balance(&c.c, &req, client.WithContext(ctx))
|
||||
if err := signature.SignServiceMessage(&c.prm.key, req); err != nil {
|
||||
return nil, fmt.Errorf("sign request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := rpcapi.Balance(&c.c, req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res ResBalanceGet
|
||||
res.st, err = c.processResponse(resp)
|
||||
if err != nil || !apistatus.IsSuccessful(res.st) {
|
||||
return &res, err
|
||||
}
|
||||
cc.result = func(r responseV2) {
|
||||
resp := r.(*v2accounting.BalanceResponse)
|
||||
|
||||
const fieldBalance = "balance"
|
||||
|
||||
bal := resp.GetBody().GetBalance()
|
||||
if bal == nil {
|
||||
cc.err = newErrMissingResponseField(fieldBalance)
|
||||
return
|
||||
return &res, newErrMissingResponseField(fieldBalance)
|
||||
}
|
||||
|
||||
cc.err = res.amount.ReadFromV2(*bal)
|
||||
if cc.err != nil {
|
||||
cc.err = newErrInvalidResponseField(fieldBalance, cc.err)
|
||||
if err := res.amount.ReadFromV2(*bal); err != nil {
|
||||
return &res, newErrInvalidResponseField(fieldBalance, err)
|
||||
}
|
||||
}
|
||||
|
||||
// process call
|
||||
if !cc.processCall() {
|
||||
return nil, cc.err
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
|
229
client/common.go
229
client/common.go
|
@ -1,7 +1,6 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
|
@ -13,21 +12,11 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
)
|
||||
|
||||
// common interface of resulting structures with API status.
|
||||
type resCommon interface {
|
||||
setStatus(apistatus.Status)
|
||||
}
|
||||
|
||||
// structure is embedded to all resulting types in order to inherit status-related methods.
|
||||
type statusRes struct {
|
||||
st apistatus.Status
|
||||
}
|
||||
|
||||
// setStatus implements resCommon interface method.
|
||||
func (x *statusRes) setStatus(st apistatus.Status) {
|
||||
x.st = st
|
||||
}
|
||||
|
||||
// Status returns server's status return.
|
||||
//
|
||||
// Use apistatus package functionality to handle the status.
|
||||
|
@ -85,89 +74,12 @@ var (
|
|||
errorInvalidXHeaders = errors.New("xheaders must be presented only as key-value pairs")
|
||||
)
|
||||
|
||||
// groups all the details required to send a single request and process a response to it.
|
||||
type contextCall struct {
|
||||
// ==================================================
|
||||
// state vars that do not require explicit initialization
|
||||
|
||||
// final error to be returned from client method
|
||||
err error
|
||||
|
||||
// received response
|
||||
resp responseV2
|
||||
|
||||
// ==================================================
|
||||
// shared parameters which are set uniformly on all calls
|
||||
|
||||
// request signing key
|
||||
key ecdsa.PrivateKey
|
||||
|
||||
// callback prior to processing the response by the client
|
||||
callbackResp func(ResponseMetaInfo) error
|
||||
|
||||
// if set, protocol errors will be expanded into a final error
|
||||
resolveAPIFailures bool
|
||||
|
||||
// FrostFS network magic
|
||||
netMagic uint64
|
||||
|
||||
// Meta parameters
|
||||
meta prmCommonMeta
|
||||
|
||||
// ==================================================
|
||||
// custom call parameters
|
||||
|
||||
// structure of the call result
|
||||
statusRes resCommon
|
||||
|
||||
// request to be signed with a key and sent
|
||||
req request
|
||||
|
||||
// function to send a request (unary) and receive a response
|
||||
call func() (responseV2, error)
|
||||
|
||||
// function to send the request (req field)
|
||||
wReq func() error
|
||||
|
||||
// function to recv the response (resp field)
|
||||
rResp func() error
|
||||
|
||||
// function to close the message stream
|
||||
closer func() error
|
||||
|
||||
// function of writing response fields to the resulting structure (optional)
|
||||
result func(v2 responseV2)
|
||||
}
|
||||
|
||||
type request interface {
|
||||
GetMetaHeader() *v2session.RequestMetaHeader
|
||||
SetMetaHeader(*v2session.RequestMetaHeader)
|
||||
SetVerificationHeader(*v2session.RequestVerificationHeader)
|
||||
}
|
||||
|
||||
// sets needed fields of the request meta header.
|
||||
func (x contextCall) prepareRequest() {
|
||||
meta := x.req.GetMetaHeader()
|
||||
if meta == nil {
|
||||
meta = new(v2session.RequestMetaHeader)
|
||||
x.req.SetMetaHeader(meta)
|
||||
}
|
||||
|
||||
if meta.GetTTL() == 0 {
|
||||
meta.SetTTL(2)
|
||||
}
|
||||
|
||||
if meta.GetVersion() == nil {
|
||||
var verV2 refs.Version
|
||||
version.Current().WriteToV2(&verV2)
|
||||
meta.SetVersion(&verV2)
|
||||
}
|
||||
|
||||
meta.SetNetworkMagic(x.netMagic)
|
||||
|
||||
writeXHeadersToMeta(x.meta.xHeaders, meta)
|
||||
}
|
||||
|
||||
func (c *Client) prepareRequest(req request, meta *v2session.RequestMetaHeader) {
|
||||
ttl := meta.GetTTL()
|
||||
if ttl == 0 {
|
||||
|
@ -187,75 +99,6 @@ func (c *Client) prepareRequest(req request, meta *v2session.RequestMetaHeader)
|
|||
req.SetMetaHeader(meta)
|
||||
}
|
||||
|
||||
// prepares, signs and writes the request. Result means success.
|
||||
// If failed, contextCall.err contains the reason.
|
||||
func (x *contextCall) writeRequest() bool {
|
||||
x.prepareRequest()
|
||||
|
||||
x.req.SetVerificationHeader(nil)
|
||||
|
||||
// sign the request
|
||||
x.err = signature.SignServiceMessage(&x.key, x.req)
|
||||
if x.err != nil {
|
||||
x.err = fmt.Errorf("sign request: %w", x.err)
|
||||
return false
|
||||
}
|
||||
|
||||
x.err = x.wReq()
|
||||
if x.err != nil {
|
||||
x.err = fmt.Errorf("write request: %w", x.err)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// performs common actions of response processing and writes any problem as a result status or client error
|
||||
// (in both cases returns false).
|
||||
//
|
||||
// Actions:
|
||||
// - verify signature (internal);
|
||||
// - call response callback (internal);
|
||||
// - unwrap status error (optional).
|
||||
func (x *contextCall) processResponse() bool {
|
||||
// call response callback if set
|
||||
if x.callbackResp != nil {
|
||||
x.err = x.callbackResp(ResponseMetaInfo{
|
||||
key: x.resp.GetVerificationHeader().GetBodySignature().GetKey(),
|
||||
epoch: x.resp.GetMetaHeader().GetEpoch(),
|
||||
})
|
||||
if x.err != nil {
|
||||
x.err = fmt.Errorf("response callback error: %w", x.err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// note that we call response callback before signature check since it is expected more lightweight
|
||||
// while verification needs marshaling
|
||||
|
||||
// verify response signature
|
||||
x.err = signature.VerifyServiceMessage(x.resp)
|
||||
if x.err != nil {
|
||||
x.err = fmt.Errorf("invalid response signature: %w", x.err)
|
||||
return false
|
||||
}
|
||||
|
||||
// get result status
|
||||
st := apistatus.FromStatusV2(x.resp.GetMetaHeader().GetStatus())
|
||||
|
||||
// unwrap unsuccessful status and return it
|
||||
// as error if client has been configured so
|
||||
successfulStatus := apistatus.IsSuccessful(st)
|
||||
|
||||
if x.resolveAPIFailures {
|
||||
x.err = apistatus.ErrFromStatus(st)
|
||||
} else {
|
||||
x.statusRes.setStatus(st)
|
||||
}
|
||||
|
||||
return successfulStatus
|
||||
}
|
||||
|
||||
// processResponse verifies response signature and converts status to an error if needed.
|
||||
func (c *Client) processResponse(resp responseV2) (apistatus.Status, error) {
|
||||
if c.prm.cbRespInfo != nil {
|
||||
|
@ -280,78 +123,6 @@ func (c *Client) processResponse(resp responseV2) (apistatus.Status, error) {
|
|||
return st, nil
|
||||
}
|
||||
|
||||
// reads response (if rResp is set) and processes it. Result means success.
|
||||
// If failed, contextCall.err (or statusRes if resolveAPIFailures is set) contains the reason.
|
||||
func (x *contextCall) readResponse() bool {
|
||||
if x.rResp != nil {
|
||||
x.err = x.rResp()
|
||||
if x.err != nil {
|
||||
x.err = fmt.Errorf("read response: %w", x.err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return x.processResponse()
|
||||
}
|
||||
|
||||
// closes the message stream (if closer is set) and writes the results (if result is set).
|
||||
// Return means success. If failed, contextCall.err contains the reason.
|
||||
func (x *contextCall) close() bool {
|
||||
if x.closer != nil {
|
||||
x.err = x.closer()
|
||||
if x.err != nil {
|
||||
x.err = fmt.Errorf("close RPC: %w", x.err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// write response to resulting structure
|
||||
if x.result != nil {
|
||||
x.result(x.resp)
|
||||
}
|
||||
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
// goes through all stages of sending a request and processing a response. Returns true if successful.
|
||||
// If failed, contextCall.err contains the reason.
|
||||
func (x *contextCall) processCall() bool {
|
||||
// set request writer
|
||||
x.wReq = func() error {
|
||||
var err error
|
||||
x.resp, err = x.call()
|
||||
return err
|
||||
}
|
||||
|
||||
// write request
|
||||
ok := x.writeRequest()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
// read response
|
||||
ok = x.readResponse()
|
||||
if !ok {
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
// close and write response to resulting structure
|
||||
ok = x.close()
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return x.err == nil
|
||||
}
|
||||
|
||||
// initializes static cross-call parameters inherited from client.
|
||||
func (c *Client) initCallContext(ctx *contextCall) {
|
||||
ctx.key = c.prm.key
|
||||
ctx.resolveAPIFailures = c.prm.resolveFrostFSErrors
|
||||
ctx.callbackResp = c.prm.cbRespInfo
|
||||
ctx.netMagic = c.prm.netMagic
|
||||
}
|
||||
|
||||
// ExecRaw executes f with underlying git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client.Client
|
||||
// instance. Communicate over the Protocol Buffers protocol in a more flexible way:
|
||||
// most often used to transmit data over a fixed version of the FrostFS protocol, as well
|
||||
|
|
|
@ -17,26 +17,31 @@ import (
|
|||
|
||||
// PrmContainerEACL groups parameters of ContainerEACL operation.
|
||||
type PrmContainerEACL struct {
|
||||
prmCommonMeta
|
||||
// FrostFS request X-Headers.
|
||||
XHeaders []string
|
||||
|
||||
idSet bool
|
||||
id cid.ID
|
||||
ContainerID *cid.ID
|
||||
}
|
||||
|
||||
// SetContainer sets identifier of the FrostFS container to read the eACL table.
|
||||
// Required parameter.
|
||||
//
|
||||
// Deprecated: Use PrmContainerEACL.ContainerID instead.
|
||||
func (x *PrmContainerEACL) SetContainer(id cid.ID) {
|
||||
x.id = id
|
||||
x.idSet = true
|
||||
x.ContainerID = &id
|
||||
}
|
||||
|
||||
func (x *PrmContainerEACL) buildRequest(c *Client) (*v2container.GetExtendedACLRequest, error) {
|
||||
if !x.idSet {
|
||||
if x.ContainerID == nil {
|
||||
return nil, errorMissingContainer
|
||||
}
|
||||
|
||||
if len(x.XHeaders)%2 != 0 {
|
||||
return nil, errorInvalidXHeaders
|
||||
}
|
||||
|
||||
var cidV2 refs.ContainerID
|
||||
x.id.WriteToV2(&cidV2)
|
||||
x.ContainerID.WriteToV2(&cidV2)
|
||||
|
||||
reqBody := new(v2container.GetExtendedACLRequestBody)
|
||||
reqBody.SetContainerID(&cidV2)
|
||||
|
|
98
pool/pool.go
98
pool/pool.go
|
@ -523,8 +523,9 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
|
|||
return eacl.Table{}, err
|
||||
}
|
||||
|
||||
var cliPrm sdkClient.PrmContainerEACL
|
||||
cliPrm.SetContainer(prm.cnrID)
|
||||
cliPrm := sdkClient.PrmContainerEACL{
|
||||
ContainerID: &prm.ContainerID,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
res, err := cl.ContainerEACL(ctx, cliPrm)
|
||||
|
@ -1521,12 +1522,12 @@ func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) {
|
|||
|
||||
// PrmContainerEACL groups parameters of GetEACL operation.
|
||||
type PrmContainerEACL struct {
|
||||
cnrID cid.ID
|
||||
ContainerID cid.ID
|
||||
}
|
||||
|
||||
// SetContainerID specifies identifier of the FrostFS container to read the eACL table.
|
||||
func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
|
||||
x.cnrID = cnrID
|
||||
x.ContainerID = cnrID
|
||||
}
|
||||
|
||||
// PrmContainerSetEACL groups parameters of SetEACL operation.
|
||||
|
@ -2150,7 +2151,7 @@ func (p *Pool) PutObject(ctx context.Context, prm PrmObjectPut) (oid.ID, error)
|
|||
if err != nil {
|
||||
// removes session token from cache in case of token error
|
||||
p.checkSessionTokenErr(err, ctxCall.endpoint)
|
||||
return id, fmt.Errorf("init writing on API client: %w", err)
|
||||
return id, fmt.Errorf("init writing on API client %s: %w", ctxCall.endpoint, err)
|
||||
}
|
||||
|
||||
return id, nil
|
||||
|
@ -2193,7 +2194,7 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
|
|||
|
||||
return p.call(ctx, &cc, func() error {
|
||||
if err = cc.client.objectDelete(ctx, prm); err != nil {
|
||||
return fmt.Errorf("remove object via client: %w", err)
|
||||
return fmt.Errorf("remove object via client %s: %w", cc.endpoint, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -2244,7 +2245,10 @@ func (p *Pool) GetObject(ctx context.Context, prm PrmObjectGet) (ResGetObject, e
|
|||
|
||||
return res, p.call(ctx, &cc, func() error {
|
||||
res, err = cc.client.objectGet(ctx, prm)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("get object via client %s: %w", cc.endpoint, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2266,7 +2270,10 @@ func (p *Pool) HeadObject(ctx context.Context, prm PrmObjectHead) (object.Object
|
|||
|
||||
return obj, p.call(ctx, &cc, func() error {
|
||||
obj, err = cc.client.objectHead(ctx, prm)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("head object via client %s: %w", cc.endpoint, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2314,7 +2321,10 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRa
|
|||
|
||||
return res, p.call(ctx, &cc, func() error {
|
||||
res, err = cc.client.objectRange(ctx, prm)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("object range via client %s: %w", cc.endpoint, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2375,7 +2385,10 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjec
|
|||
|
||||
return res, p.call(ctx, &cc, func() error {
|
||||
res, err = cc.client.objectSearch(ctx, prm)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("search object via client %s: %w", cc.endpoint, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2395,7 +2408,12 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (cid.ID, e
|
|||
return cid.ID{}, err
|
||||
}
|
||||
|
||||
return cp.containerPut(ctx, prm)
|
||||
cnrID, err := cp.containerPut(ctx, prm)
|
||||
if err != nil {
|
||||
return cid.ID{}, fmt.Errorf("put container via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return cnrID, nil
|
||||
}
|
||||
|
||||
// GetContainer reads FrostFS container by ID.
|
||||
|
@ -2407,7 +2425,12 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (container
|
|||
return container.Container{}, err
|
||||
}
|
||||
|
||||
return cp.containerGet(ctx, prm)
|
||||
cnrs, err := cp.containerGet(ctx, prm)
|
||||
if err != nil {
|
||||
return container.Container{}, fmt.Errorf("get container via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return cnrs, nil
|
||||
}
|
||||
|
||||
// ListContainers requests identifiers of the account-owned containers.
|
||||
|
@ -2417,7 +2440,12 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return cp.containerList(ctx, prm)
|
||||
cnrIDs, err := cp.containerList(ctx, prm)
|
||||
if err != nil {
|
||||
return []cid.ID{}, fmt.Errorf("list containers via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return cnrIDs, nil
|
||||
}
|
||||
|
||||
// DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
|
||||
|
@ -2434,7 +2462,12 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
|
|||
return err
|
||||
}
|
||||
|
||||
return cp.containerDelete(ctx, prm)
|
||||
err = cp.containerDelete(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete container via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEACL reads eACL table of the FrostFS container.
|
||||
|
@ -2446,7 +2479,12 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (eacl.Table, e
|
|||
return eacl.Table{}, err
|
||||
}
|
||||
|
||||
return cp.containerEACL(ctx, prm)
|
||||
eaclResult, err := cp.containerEACL(ctx, prm)
|
||||
if err != nil {
|
||||
return eacl.Table{}, fmt.Errorf("get EACL via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return eaclResult, nil
|
||||
}
|
||||
|
||||
// SetEACL sends request to update eACL table of the FrostFS container and waits for the operation to complete.
|
||||
|
@ -2463,7 +2501,12 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return cp.containerSetEACL(ctx, prm)
|
||||
err = cp.containerSetEACL(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("set EACL via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Balance requests current balance of the FrostFS account.
|
||||
|
@ -2475,15 +2518,24 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim
|
|||
return accounting.Decimal{}, err
|
||||
}
|
||||
|
||||
return cp.balanceGet(ctx, prm)
|
||||
balance, err := cp.balanceGet(ctx, prm)
|
||||
if err != nil {
|
||||
return accounting.Decimal{}, fmt.Errorf("get balance via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return balance, nil
|
||||
}
|
||||
|
||||
// Statistic returns connection statistics.
|
||||
func (p Pool) Statistic() Statistic {
|
||||
stat := Statistic{}
|
||||
for _, inner := range p.innerPools {
|
||||
nodes := make([]string, 0, len(inner.clients))
|
||||
inner.lock.RLock()
|
||||
for _, cl := range inner.clients {
|
||||
if cl.isHealthy() {
|
||||
nodes = append(nodes, cl.address())
|
||||
}
|
||||
node := NodeStatistic{
|
||||
address: cl.address(),
|
||||
methods: cl.methodsStatus(),
|
||||
|
@ -2494,6 +2546,9 @@ func (p Pool) Statistic() Statistic {
|
|||
stat.overallErrors += node.overallErrors
|
||||
}
|
||||
inner.lock.RUnlock()
|
||||
if len(stat.currentNodes) == 0 {
|
||||
stat.currentNodes = nodes
|
||||
}
|
||||
}
|
||||
|
||||
return stat
|
||||
|
@ -2514,7 +2569,7 @@ func waitForContainerPresence(ctx context.Context, cli client, cnrID cid.ID, wai
|
|||
func waitForEACLPresence(ctx context.Context, cli client, cnrID *cid.ID, table *eacl.Table, waitParams *WaitParams) error {
|
||||
var prm PrmContainerEACL
|
||||
if cnrID != nil {
|
||||
prm.SetContainerID(*cnrID)
|
||||
prm.ContainerID = *cnrID
|
||||
}
|
||||
|
||||
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
|
||||
|
@ -2571,7 +2626,12 @@ func (p *Pool) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|||
return netmap.NetworkInfo{}, err
|
||||
}
|
||||
|
||||
return cp.networkInfo(ctx, prmNetworkInfo{})
|
||||
netInfo, err := cp.networkInfo(ctx, prmNetworkInfo{})
|
||||
if err != nil {
|
||||
return netmap.NetworkInfo{}, fmt.Errorf("get network info via client '%s': %w", cp.address(), err)
|
||||
}
|
||||
|
||||
return netInfo, nil
|
||||
}
|
||||
|
||||
// Close closes the Pool and releases all the associated resources.
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
type Statistic struct {
|
||||
overallErrors uint64
|
||||
nodes []NodeStatistic
|
||||
currentNodes []string
|
||||
}
|
||||
|
||||
// OverallErrors returns sum of errors on all connections. It doesn't decrease.
|
||||
|
@ -21,6 +22,12 @@ func (s Statistic) Nodes() []NodeStatistic {
|
|||
return s.nodes
|
||||
}
|
||||
|
||||
// CurrentNodes returns list of nodes of inner pool that has at least one healthy node.
|
||||
// These nodes have the same and the highest priority among the other healthy nodes.
|
||||
func (s Statistic) CurrentNodes() []string {
|
||||
return s.currentNodes
|
||||
}
|
||||
|
||||
// ErrUnknownNode indicate that node with current address is not found in list.
|
||||
var ErrUnknownNode = errors.New("unknown node")
|
||||
|
||||
|
|
Loading…
Reference in a new issue