From 639ab1e443f54a47b469591e6cb9b075a840462f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 17 Nov 2021 14:22:32 +0300 Subject: [PATCH] [#83] pool: Support changes of `client.Client` interface Handle status results and return status failures in `error` return. Signed-off-by: Leonard Lyubich --- pool/pool.go | 276 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 246 insertions(+), 30 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 1315d257..24d7a123 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -3,6 +3,7 @@ package pool import ( "context" "crypto/ecdsa" + "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -13,6 +14,7 @@ import ( "time" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" @@ -111,7 +113,7 @@ type Container interface { GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error - GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*client.EACLWithSignature, error) + GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error AnnounceContainerUsedSpace(ctx context.Context, announce []container.UsedSpaceAnnouncement, opts ...CallOption) error } @@ -178,11 +180,18 @@ type pool struct { } func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { + wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey) + if err != nil { + return nil, err + } + cache, err := NewCache() if err != nil { return nil, fmt.Errorf("couldn't create cache: %w", err) } + ownerID := owner.NewIDFromNeo3Wallet(wallet) + clientPacks := make([]*clientPack, len(options.weights)) var atLeastOneHealthy bool for i, address := range options.addresses { @@ -193,13 +202,16 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { return nil, err } var healthy bool - st, err := c.CreateSession(ctx, options.SessionExpirationEpoch) + cliRes, err := c.CreateSession(ctx, options.SessionExpirationEpoch) if err != nil && options.Logger != nil { options.Logger.Warn("failed to create neofs session token for client", zap.String("address", address), zap.Error(err)) } else if err == nil { healthy, atLeastOneHealthy = true, true + + st := sessionTokenForOwner(ownerID, cliRes) + _ = cache.Put(formCacheKey(address, options.Key), st) } clientPacks[i] = &clientPack{client: c, healthy: healthy, address: address} @@ -211,11 +223,6 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) { source := rand.NewSource(time.Now().UnixNano()) sampler := NewSampler(options.weights, source) - wallet, err := owner.NEO3WalletFromPublicKey(&options.Key.PublicKey) - if err != nil { - return nil, err - } - ownerID := owner.NewIDFromNeo3Wallet(wallet) ctx, cancel := context.WithCancel(ctx) pool := &pool{ @@ -272,10 +279,12 @@ func updateNodesHealth(ctx context.Context, p *pool, options *BuilderOptions, bu if ok { bufferWeights[i] = options.weights[i] if !cp.healthy { - if tkn, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil { + if cliRes, err := client.CreateSession(ctx, options.SessionExpirationEpoch); err != nil { ok = false bufferWeights[i] = 0 } else { + tkn := p.newSessionToken(cliRes) + _ = p.cache.Put(formCacheKey(cp.address, p.key), tkn) } } @@ -376,10 +385,13 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client cacheKey := formCacheKey(cp.address, key) sessionToken = p.cache.Get(cacheKey) if sessionToken == nil { - sessionToken, err = cp.client.CreateSession(ctx, math.MaxUint32, clientCallOptions...) + cliRes, err := cp.client.CreateSession(ctx, math.MaxUint32, clientCallOptions...) if err != nil { return nil, nil, err } + + sessionToken = p.newSessionToken(cliRes) + _ = p.cache.Put(cacheKey, sessionToken) } } @@ -412,11 +424,21 @@ func (p *pool) PutObject(ctx context.Context, params *client.PutObjectParams, op return nil, err } res, err := cp.client.PutObject(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.PutObject(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.ID(), nil } func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectParams, opts ...CallOption) error { @@ -425,11 +447,20 @@ func (p *pool) DeleteObject(ctx context.Context, params *client.DeleteObjectPara if err != nil { return err } - err = cp.client.DeleteObject(ctx, params, options...) + + res, err := cp.client.DeleteObject(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.DeleteObject(ctx, params, opts...) } + + // here err already carries both status and client errors + return err } @@ -440,11 +471,21 @@ func (p *pool) GetObject(ctx context.Context, params *client.GetObjectParams, op return nil, err } res, err := cp.client.GetObject(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.GetObject(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.Object(), nil } func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderParams, opts ...CallOption) (*object.Object, error) { @@ -453,12 +494,22 @@ func (p *pool) GetObjectHeader(ctx context.Context, params *client.ObjectHeaderP if err != nil { return nil, err } - res, err := cp.client.GetObjectHeader(ctx, params, options...) + res, err := cp.client.HeadObject(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.GetObjectHeader(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.Object(), nil } func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeDataParams, opts ...CallOption) ([]byte, error) { @@ -468,11 +519,31 @@ func (p *pool) ObjectPayloadRangeData(ctx context.Context, params *client.RangeD return nil, err } res, err := cp.client.ObjectPayloadRangeData(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ObjectPayloadRangeData(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.Data(), nil +} + +func copyRangeChecksumParams(prm *client.RangeChecksumParams) *client.RangeChecksumParams { + var prmCopy client.RangeChecksumParams + + prmCopy.WithAddress(prm.Address()) + prmCopy.WithSalt(prm.Salt()) + prmCopy.WithRangeList(prm.RangeList()...) + + return &prmCopy } func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][32]byte, error) { @@ -481,12 +552,40 @@ func (p *pool) ObjectPayloadRangeSHA256(ctx context.Context, params *client.Rang if err != nil { return nil, err } - res, err := cp.client.ObjectPayloadRangeSHA256(ctx, params, options...) + + // FIXME: pretty bad approach but we should not mutate params through the pointer + // If non-SHA256 algo is set then we need to reset it. + params = copyRangeChecksumParams(params) + // SHA256 by default, no need to do smth + + res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ObjectPayloadRangeSHA256(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + cliHashes := res.Hashes() + + hs := make([][sha256.Size]byte, len(cliHashes)) + + for i := range cliHashes { + if ln := len(cliHashes[i]); ln != sha256.Size { + return nil, fmt.Errorf("invalid SHA256 checksum size %d", ln) + } + + copy(hs[i][:], cliHashes[i]) + } + + return hs, nil } func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChecksumParams, opts ...CallOption) ([][64]byte, error) { @@ -495,12 +594,40 @@ func (p *pool) ObjectPayloadRangeTZ(ctx context.Context, params *client.RangeChe if err != nil { return nil, err } - res, err := cp.client.ObjectPayloadRangeTZ(ctx, params, options...) + + // FIXME: pretty bad approach but we should not mutate params through the pointer + // We need to set Tillich-Zemor algo. + params = copyRangeChecksumParams(params) + params.TZ() + + res, err := cp.client.HashObjectPayloadRanges(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ObjectPayloadRangeTZ(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + cliHashes := res.Hashes() + + hs := make([][client.TZSize]byte, len(cliHashes)) + + for i := range cliHashes { + if ln := len(cliHashes[i]); ln != client.TZSize { + return nil, fmt.Errorf("invalid TZ checksum size %d", ln) + } + + copy(hs[i][:], cliHashes[i]) + } + + return hs, nil } func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectParams, opts ...CallOption) ([]*object.ID, error) { @@ -509,12 +636,22 @@ func (p *pool) SearchObject(ctx context.Context, params *client.SearchObjectPara if err != nil { return nil, err } - res, err := cp.client.SearchObject(ctx, params, options...) + res, err := cp.client.SearchObjects(ctx, params, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.SearchObject(ctx, params, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.IDList(), nil } func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) { @@ -524,11 +661,21 @@ func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts return nil, err } res, err := cp.client.PutContainer(ctx, cnr, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.PutContainer(ctx, cnr, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.ID(), nil } func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) { @@ -538,11 +685,21 @@ func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption return nil, err } res, err := cp.client.GetContainer(ctx, cid, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.GetContainer(ctx, cid, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.Container(), nil } func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) { @@ -552,11 +709,21 @@ func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...Ca return nil, err } res, err := cp.client.ListContainers(ctx, ownerID, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.ListContainers(ctx, ownerID, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.IDList(), nil } func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error { @@ -565,26 +732,44 @@ func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOpt if err != nil { return err } - err = cp.client.DeleteContainer(ctx, cid, options...) + res, err := cp.client.DeleteContainer(ctx, cid, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.DeleteContainer(ctx, cid, opts...) } + + // here err already carries both status and client errors + return err } -func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*client.EACLWithSignature, error) { +func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) { cfg := cfgFromOpts(opts...) cp, options, err := p.conn(ctx, cfg) if err != nil { return nil, err } - res, err := cp.client.GetEACL(ctx, cid, options...) + res, err := cp.client.EACL(ctx, cid, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.GetEACL(ctx, cid, opts...) } - return res, err + + if err != nil { // here err already carries both status and client errors + return nil, err + } + + return res.Table(), nil } func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error { @@ -593,11 +778,19 @@ func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOptio if err != nil { return err } - err = cp.client.SetEACL(ctx, table, options...) + res, err := cp.client.SetEACL(ctx, table, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.SetEACL(ctx, table, opts...) } + + // here err already carries both status and client errors + return err } @@ -607,11 +800,19 @@ func (p *pool) AnnounceContainerUsedSpace(ctx context.Context, announce []contai if err != nil { return err } - err = cp.client.AnnounceContainerUsedSpace(ctx, announce, options...) + res, err := cp.client.AnnounceContainerUsedSpace(ctx, announce, options...) + if err == nil { + // reflect status failures in err + err = apistatus.ErrFromStatus(res.Status()) + } + if p.checkSessionTokenErr(err, cp.address) && !cfg.isRetry { opts = append(opts, retry()) return p.AnnounceContainerUsedSpace(ctx, announce, opts...) } + + // here err already carries both status and client errors + return err } @@ -647,3 +848,18 @@ func (p *pool) Close() { p.cancel() <-p.closedCh } + +// creates new session token from CreateSession call result. +func (p *pool) newSessionToken(cliRes *client.CreateSessionRes) *session.Token { + return sessionTokenForOwner(p.owner, cliRes) +} + +// creates new session token with specified owner from CreateSession call result. +func sessionTokenForOwner(id *owner.ID, cliRes *client.CreateSessionRes) *session.Token { + st := session.NewToken() + st.SetOwnerID(id) + st.SetID(cliRes.ID()) + st.SetSessionKey(cliRes.SessionKey()) + + return st +}