forked from TrueCloudLab/frostfs-sdk-go
[#159] Add object context to session tokens
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
70c3644e2b
commit
61f9c3392b
1 changed files with 66 additions and 28 deletions
94
pool/pool.go
94
pool/pool.go
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
sessionv2 "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/accounting"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/container"
|
||||
|
@ -182,6 +183,8 @@ type CallOption func(config *callConfig)
|
|||
|
||||
type callConfig struct {
|
||||
useDefaultSession bool
|
||||
verb sessionv2.ObjectSessionVerb
|
||||
addr *address.Address
|
||||
|
||||
key *ecdsa.PrivateKey
|
||||
btoken *token.BearerToken
|
||||
|
@ -212,6 +215,18 @@ func useDefaultSession() CallOption {
|
|||
}
|
||||
}
|
||||
|
||||
func useAddress(addr *address.Address) CallOption {
|
||||
return func(config *callConfig) {
|
||||
config.addr = addr
|
||||
}
|
||||
}
|
||||
|
||||
func useVerb(verb sessionv2.ObjectSessionVerb) CallOption {
|
||||
return func(config *callConfig) {
|
||||
config.verb = verb
|
||||
}
|
||||
}
|
||||
|
||||
func cfgFromOpts(opts ...CallOption) *callConfig {
|
||||
var cfg = new(callConfig)
|
||||
for _, opt := range opts {
|
||||
|
@ -283,11 +298,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (Pool, error) {
|
|||
} else if err == nil {
|
||||
healthy, atLeastOneHealthy = true, true
|
||||
st := sessionTokenForOwner(ownerID, cliRes)
|
||||
|
||||
// sign the session token and cache it on success
|
||||
if err = st.Sign(options.Key); err == nil {
|
||||
_ = cache.Put(formCacheKey(addr, options.Key), st)
|
||||
}
|
||||
_ = cache.Put(formCacheKey(addr, options.Key), st)
|
||||
}
|
||||
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
|
||||
}
|
||||
|
@ -580,6 +591,7 @@ type callContext struct {
|
|||
// flag to open default session if session token is missing
|
||||
sessionDefault bool
|
||||
sessionTarget func(session.Token)
|
||||
sessionContext *session.ObjectContext
|
||||
}
|
||||
|
||||
func (p *pool) initCallContext(ctx *callContext, cfg *callConfig) error {
|
||||
|
@ -603,6 +615,11 @@ func (p *pool) initCallContext(ctx *callContext, cfg *callConfig) error {
|
|||
|
||||
// note that we don't override session provided by the caller
|
||||
ctx.sessionDefault = cfg.stoken == nil && cfg.useDefaultSession
|
||||
if ctx.sessionDefault {
|
||||
ctx.sessionContext = session.NewObjectContext()
|
||||
ctx.sessionContext.ToV2().SetVerb(cfg.verb)
|
||||
ctx.sessionContext.ApplyTo(cfg.addr)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -631,30 +648,27 @@ func (p *pool) openDefaultSession(ctx *callContext) error {
|
|||
cacheKey := formCacheKey(ctx.endpoint, ctx.key)
|
||||
|
||||
tok := p.cache.Get(cacheKey)
|
||||
if tok != nil {
|
||||
// use cached token
|
||||
ctx.sessionTarget(*tok)
|
||||
return nil
|
||||
if tok == nil {
|
||||
// open new session
|
||||
cliRes, err := createSessionTokenForDuration(ctx, ctx.client, p.stokenDuration)
|
||||
if err != nil {
|
||||
return fmt.Errorf("session API client: %w", err)
|
||||
}
|
||||
|
||||
tok = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes)
|
||||
// cache the opened session
|
||||
p.cache.Put(cacheKey, tok)
|
||||
}
|
||||
|
||||
// open new session
|
||||
cliRes, err := createSessionTokenForDuration(ctx, ctx.client, p.stokenDuration)
|
||||
if err != nil {
|
||||
return fmt.Errorf("session API client: %w", err)
|
||||
}
|
||||
|
||||
tok = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes)
|
||||
tokToSign := *tok
|
||||
tokToSign.SetContext(ctx.sessionContext)
|
||||
|
||||
// sign the token
|
||||
err = tok.Sign(ctx.key)
|
||||
if err != nil {
|
||||
if err := tokToSign.Sign(ctx.key); err != nil {
|
||||
return fmt.Errorf("sign token of the opened session: %w", err)
|
||||
}
|
||||
|
||||
// cache the opened session
|
||||
p.cache.Put(cacheKey, tok)
|
||||
|
||||
ctx.sessionTarget(*tok)
|
||||
ctx.sessionTarget(tokToSign)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -685,7 +699,10 @@ func (p *pool) callWithRetry(ctx *callContextWithRetry, f func() error) error {
|
|||
}
|
||||
|
||||
func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Reader, opts ...CallOption) (*oid.ID, error) {
|
||||
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
||||
cfg := cfgFromOpts(append(opts,
|
||||
useDefaultSession(),
|
||||
useVerb(sessionv2.ObjectVerbPut),
|
||||
useAddress(newAddressFromCnrID(hdr.ContainerID())))...)
|
||||
|
||||
// Put object is different from other object service methods. Put request
|
||||
// can't be resent in case of session token failures (i.e. session token is
|
||||
|
@ -791,7 +808,10 @@ func (p *pool) PutObject(ctx context.Context, hdr object.Object, payload io.Read
|
|||
}
|
||||
|
||||
func (p *pool) DeleteObject(ctx context.Context, addr address.Address, opts ...CallOption) error {
|
||||
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
||||
cfg := cfgFromOpts(append(opts,
|
||||
useDefaultSession(),
|
||||
useVerb(sessionv2.ObjectVerbDelete),
|
||||
useAddress(&addr))...)
|
||||
|
||||
var prm client.PrmObjectDelete
|
||||
|
||||
|
@ -843,7 +863,10 @@ type ResGetObject struct {
|
|||
}
|
||||
|
||||
func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...CallOption) (*ResGetObject, error) {
|
||||
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
||||
cfg := cfgFromOpts(append(opts,
|
||||
useDefaultSession(),
|
||||
useVerb(sessionv2.ObjectVerbGet),
|
||||
useAddress(&addr))...)
|
||||
|
||||
var prm client.PrmObjectGet
|
||||
|
||||
|
@ -892,7 +915,10 @@ func (p *pool) GetObject(ctx context.Context, addr address.Address, opts ...Call
|
|||
}
|
||||
|
||||
func (p *pool) HeadObject(ctx context.Context, addr address.Address, opts ...CallOption) (*object.Object, error) {
|
||||
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
||||
cfg := cfgFromOpts(append(opts,
|
||||
useDefaultSession(),
|
||||
useVerb(sessionv2.ObjectVerbHead),
|
||||
useAddress(&addr))...)
|
||||
|
||||
var prm client.PrmObjectHead
|
||||
|
||||
|
@ -951,7 +977,10 @@ func (x *ResObjectRange) Close() error {
|
|||
}
|
||||
|
||||
func (p *pool) ObjectRange(ctx context.Context, addr address.Address, off, ln uint64, opts ...CallOption) (*ResObjectRange, error) {
|
||||
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
||||
cfg := cfgFromOpts(append(opts,
|
||||
useDefaultSession(),
|
||||
useVerb(sessionv2.ObjectVerbRange),
|
||||
useAddress(&addr))...)
|
||||
|
||||
var prm client.PrmObjectRange
|
||||
|
||||
|
@ -1024,7 +1053,10 @@ func (x *ResObjectSearch) Close() {
|
|||
}
|
||||
|
||||
func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.SearchFilters, opts ...CallOption) (*ResObjectSearch, error) {
|
||||
cfg := cfgFromOpts(append(opts, useDefaultSession())...)
|
||||
cfg := cfgFromOpts(append(opts,
|
||||
useDefaultSession(),
|
||||
useVerb(sessionv2.ObjectVerbSearch),
|
||||
useAddress(newAddressFromCnrID(&idCnr)))...)
|
||||
|
||||
var prm client.PrmObjectSearch
|
||||
|
||||
|
@ -1265,3 +1297,9 @@ func sessionTokenForOwner(id *owner.ID, cliRes *client.ResSessionCreate) *sessio
|
|||
|
||||
return st
|
||||
}
|
||||
|
||||
func newAddressFromCnrID(cnrID *cid.ID) *address.Address {
|
||||
addr := address.NewAddress()
|
||||
addr.SetContainerID(cnrID)
|
||||
return addr
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue