forked from TrueCloudLab/frostfs-sdk-go
[#83] pool: Support changes of client.Client
interface
Handle status results and return status failures in `error` return. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
87cbd4b701
commit
639ab1e443
1 changed files with 246 additions and 30 deletions
276
pool/pool.go
276
pool/pool.go
|
@ -3,6 +3,7 @@ package pool
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||||
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/container"
|
"github.com/nspcc-dev/neofs-sdk-go/container"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/eacl"
|
"github.com/nspcc-dev/neofs-sdk-go/eacl"
|
||||||
|
@ -111,7 +113,7 @@ type Container interface {
|
||||||
GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error)
|
GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error)
|
||||||
ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error)
|
ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error)
|
||||||
DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error
|
DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error
|
||||||
GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*client.EACLWithSignature, error)
|
GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error)
|
||||||
SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error
|
SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error
|
||||||
AnnounceContainerUsedSpace(ctx context.Context, announce []container.UsedSpaceAnnouncement, opts ...CallOption) error
|
AnnounceContainerUsedSpace(ctx context.Context, announce []container.UsedSpaceAnnouncement, opts ...CallOption) error
|
||||||
}
|
}
|
||||||
|
@ -178,11 +180,18 @@ type pool struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
|
wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
cache, err := NewCache()
|
cache, err := NewCache()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
return nil, fmt.Errorf("couldn't create cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
||||||
|
|
||||||
clientPacks := make([]*clientPack, len(options.weights))
|
clientPacks := make([]*clientPack, len(options.weights))
|
||||||
var atLeastOneHealthy bool
|
var atLeastOneHealthy bool
|
||||||
for i, address := range options.addresses {
|
for i, address := range options.addresses {
|
||||||
|
@ -193,13 +202,16 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var healthy bool
|
var healthy bool
|
||||||
st, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
|
cliRes, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
|
||||||
if err != nil && options.Logger != nil {
|
if err != nil && options.Logger != nil {
|
||||||
options.Logger.Warn("failed to create neofs session token for client",
|
options.Logger.Warn("failed to create neofs session token for client",
|
||||||
zap.String("address", address),
|
zap.String("address", address),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
healthy, atLeastOneHealthy = true, true
|
healthy, atLeastOneHealthy = true, true
|
||||||
|
|
||||||
|
st := sessionTokenForOwner(ownerID, cliRes)
|
||||||
|
|
||||||
_ = cache.Put(formCacheKey(address, options.Key), st)
|
_ = cache.Put(formCacheKey(address, options.Key), st)
|
||||||
}
|
}
|
||||||
clientPacks[i] = &clientPack{client: c, healthy: healthy, address: address}
|
clientPacks[i] = &clientPack{client: c, healthy: healthy, address: address}
|
||||||
|
@ -211,11 +223,6 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
||||||
|
|
||||||
source := rand.NewSource(time.Now().UnixNano())
|
source := rand.NewSource(time.Now().UnixNano())
|
||||||
sampler := NewSampler(options.weights, source)
|
sampler := NewSampler(options.weights, source)
|
||||||
wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
pool := &pool{
|
pool := &pool{
|
||||||
|
@ -272,10 +279,12 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu
|
||||||
if ok {
|
if ok {
|
||||||
bufferWeights[i] = options.weights[i]
|
bufferWeights[i] = options.weights[i]
|
||||||
if !cp.healthy {
|
if !cp.healthy {
|
||||||
if tkn, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
|
if cliRes, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil {
|
||||||
ok = false
|
ok = false
|
||||||
bufferWeights[i] = 0
|
bufferWeights[i] = 0
|
||||||
} else {
|
} else {
|
||||||
|
tkn := p.newSessionToken(cliRes)
|
||||||
|
|
||||||
_ = p.cache.Put(formCacheKey(cp.address, p.key), tkn)
|
_ = p.cache.Put(formCacheKey(cp.address, p.key), tkn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -376,10 +385,13 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client
|
||||||
cacheKey := formCacheKey(cp.address, key)
|
cacheKey := formCacheKey(cp.address, key)
|
||||||
sessionToken = p.cache.Get(cacheKey)
|
sessionToken = p.cache.Get(cacheKey)
|
||||||
if sessionToken == nil {
|
if sessionToken == nil {
|
||||||
sessionToken, err = cp.client.CreateSession(ctx, math.MaxUint32, clientCallOptions...)
|
cliRes, err := cp.client.CreateSession(ctx, math.MaxUint32, clientCallOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sessionToken = p.newSessionToken(cliRes)
|
||||||
|
|
||||||
_ = p.cache.Put(cacheKey, sessionToken)
|
_ = p.cache.Put(cacheKey, sessionToken)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -412,11 +424,21 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.PutObject(ctx, params, options...)
|
res, err := cp.client.PutObject(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.PutObject(ctx, params, opts...)
|
return p.PutObject(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
|
func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error {
|
||||||
|
@ -425,11 +447,20 @@ func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectPara
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = cp.client.DeleteObject(ctx, params, options...)
|
|
||||||
|
res, err := cp.client.DeleteObject(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.DeleteObject(ctx, params, opts...)
|
return p.DeleteObject(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// here err already carries both status and client errors
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -440,11 +471,21 @@ func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, op
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.GetObject(ctx, params, options...)
|
res, err := cp.client.GetObject(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.GetObject(ctx, params, opts...)
|
return p.GetObject(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
||||||
|
@ -453,12 +494,22 @@ func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderP
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.GetObjectHeader(ctx, params, options...)
|
res, err := cp.client.HeadObject(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.GetObjectHeader(ctx, params, opts...)
|
return p.GetObjectHeader(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) {
|
func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) {
|
||||||
|
@ -468,11 +519,31 @@ func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeD
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
|
res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.ObjectPayloadRangeData(ctx, params, opts...)
|
return p.ObjectPayloadRangeData(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Data(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams {
|
||||||
|
var prmCopy client.RangeChecksumParams
|
||||||
|
|
||||||
|
prmCopy.WithAddress(prm.Address())
|
||||||
|
prmCopy.WithSalt(prm.Salt())
|
||||||
|
prmCopy.WithRangeList(prm.RangeList()...)
|
||||||
|
|
||||||
|
return &prmCopy
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) {
|
func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) {
|
||||||
|
@ -481,12 +552,40 @@ func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.Rang
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.ObjectPayloadRangeSHA256(ctx, params, options...)
|
|
||||||
|
// FIXME: pretty bad approach but we should not mutate params through the pointer
|
||||||
|
// If non-SHA256 algo is set then we need to reset it.
|
||||||
|
params = copyRangeChecksumParams(params)
|
||||||
|
// SHA256 by default, no need to do smth
|
||||||
|
|
||||||
|
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.ObjectPayloadRangeSHA256(ctx, params, opts...)
|
return p.ObjectPayloadRangeSHA256(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cliHashes := res.Hashes()
|
||||||
|
|
||||||
|
hs := make([][sha256.Size]byte, len(cliHashes))
|
||||||
|
|
||||||
|
for i := range cliHashes {
|
||||||
|
if ln := len(cliHashes[i]); ln != sha256.Size {
|
||||||
|
return nil, fmt.Errorf("invalid SHA256 checksum size %d", ln)
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(hs[i][:], cliHashes[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
return hs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) {
|
func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) {
|
||||||
|
@ -495,12 +594,40 @@ func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChe
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.ObjectPayloadRangeTZ(ctx, params, options...)
|
|
||||||
|
// FIXME: pretty bad approach but we should not mutate params through the pointer
|
||||||
|
// We need to set Tillich-Zemor algo.
|
||||||
|
params = copyRangeChecksumParams(params)
|
||||||
|
params.TZ()
|
||||||
|
|
||||||
|
res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.ObjectPayloadRangeTZ(ctx, params, opts...)
|
return p.ObjectPayloadRangeTZ(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cliHashes := res.Hashes()
|
||||||
|
|
||||||
|
hs := make([][client.TZSize]byte, len(cliHashes))
|
||||||
|
|
||||||
|
for i := range cliHashes {
|
||||||
|
if ln := len(cliHashes[i]); ln != client.TZSize {
|
||||||
|
return nil, fmt.Errorf("invalid TZ checksum size %d", ln)
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(hs[i][:], cliHashes[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
return hs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
||||||
|
@ -509,12 +636,22 @@ func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectPara
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.SearchObject(ctx, params, options...)
|
res, err := cp.client.SearchObjects(ctx, params, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.SearchObject(ctx, params, opts...)
|
return p.SearchObject(ctx, params, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.IDList(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
|
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
|
||||||
|
@ -524,11 +661,21 @@ func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.PutContainer(ctx, cnr, options...)
|
res, err := cp.client.PutContainer(ctx, cnr, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.PutContainer(ctx, cnr, opts...)
|
return p.PutContainer(ctx, cnr, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
|
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
|
||||||
|
@ -538,11 +685,21 @@ func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.GetContainer(ctx, cid, options...)
|
res, err := cp.client.GetContainer(ctx, cid, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.GetContainer(ctx, cid, opts...)
|
return p.GetContainer(ctx, cid, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Container(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
|
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
|
||||||
|
@ -552,11 +709,21 @@ func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...Ca
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.ListContainers(ctx, ownerID, options...)
|
res, err := cp.client.ListContainers(ctx, ownerID, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.ListContainers(ctx, ownerID, opts...)
|
return p.ListContainers(ctx, ownerID, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.IDList(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
|
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
|
||||||
|
@ -565,26 +732,44 @@ func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOpt
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = cp.client.DeleteContainer(ctx, cid, options...)
|
res, err := cp.client.DeleteContainer(ctx, cid, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.DeleteContainer(ctx, cid, opts...)
|
return p.DeleteContainer(ctx, cid, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// here err already carries both status and client errors
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*client.EACLWithSignature, error) {
|
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
|
||||||
cfg := cfgFromOpts(opts...)
|
cfg := cfgFromOpts(opts...)
|
||||||
cp, options, err := p.conn(ctx, cfg)
|
cp, options, err := p.conn(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := cp.client.GetEACL(ctx, cid, options...)
|
res, err := cp.client.EACL(ctx, cid, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.GetEACL(ctx, cid, opts...)
|
return p.GetEACL(ctx, cid, opts...)
|
||||||
}
|
}
|
||||||
return res, err
|
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Table(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
|
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
|
||||||
|
@ -593,11 +778,19 @@ func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOptio
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = cp.client.SetEACL(ctx, table, options...)
|
res, err := cp.client.SetEACL(ctx, table, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.SetEACL(ctx, table, opts...)
|
return p.SetEACL(ctx, table, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// here err already carries both status and client errors
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -607,11 +800,19 @@ func (p *pool) AnnounceContainerUsedSpace(ctx context.Context, announce []contai
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = cp.client.AnnounceContainerUsedSpace(ctx, announce, options...)
|
res, err := cp.client.AnnounceContainerUsedSpace(ctx, announce, options...)
|
||||||
|
if err == nil {
|
||||||
|
// reflect status failures in err
|
||||||
|
err = apistatus.ErrFromStatus(res.Status())
|
||||||
|
}
|
||||||
|
|
||||||
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry {
|
||||||
opts = append(opts, retry())
|
opts = append(opts, retry())
|
||||||
return p.AnnounceContainerUsedSpace(ctx, announce, opts...)
|
return p.AnnounceContainerUsedSpace(ctx, announce, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// here err already carries both status and client errors
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -647,3 +848,18 @@ func (p *pool) Close() {
|
||||||
p.cancel()
|
p.cancel()
|
||||||
<-p.closedCh
|
<-p.closedCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// creates new session token from CreateSession call result.
|
||||||
|
func (p *pool) newSessionToken(cliRes *client.CreateSessionRes) *session.Token {
|
||||||
|
return sessionTokenForOwner(p.owner, cliRes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// creates new session token with specified owner from CreateSession call result.
|
||||||
|
func sessionTokenForOwner(id *owner.ID, cliRes *client.CreateSessionRes) *session.Token {
|
||||||
|
st := session.NewToken()
|
||||||
|
st.SetOwnerID(id)
|
||||||
|
st.SetID(cliRes.ID())
|
||||||
|
st.SetSessionKey(cliRes.SessionKey())
|
||||||
|
|
||||||
|
return st
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue