forked from TrueCloudLab/frostfs-sdk-go
[#137] pool: simplify session creation
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
df0573d521
commit
e50e6d2828
4 changed files with 20 additions and 31 deletions
|
@ -21,7 +21,7 @@ func (x ResponseMetaInfo) ResponderKey() []byte {
|
||||||
return x.key
|
return x.key
|
||||||
}
|
}
|
||||||
|
|
||||||
// Epoch return current epoch.
|
// Epoch returns local NeoFS epoch of the server.
|
||||||
func (x ResponseMetaInfo) Epoch() uint64 {
|
func (x ResponseMetaInfo) Epoch() uint64 {
|
||||||
return x.epoch
|
return x.epoch
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,8 +28,6 @@ type ResSessionCreate struct {
|
||||||
id []byte
|
id []byte
|
||||||
|
|
||||||
sessionKey []byte
|
sessionKey []byte
|
||||||
|
|
||||||
exp uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *ResSessionCreate) setID(id []byte) {
|
func (x *ResSessionCreate) setID(id []byte) {
|
||||||
|
@ -52,15 +50,6 @@ func (x ResSessionCreate) PublicKey() []byte {
|
||||||
return x.sessionKey
|
return x.sessionKey
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *ResSessionCreate) setExp(exp uint64) {
|
|
||||||
x.exp = exp
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expiration returns epoch number of the token expiration.
|
|
||||||
func (x ResSessionCreate) Expiration() uint64 {
|
|
||||||
return x.exp
|
|
||||||
}
|
|
||||||
|
|
||||||
// SessionCreate opens a session with the node server on the remote endpoint.
|
// SessionCreate opens a session with the node server on the remote endpoint.
|
||||||
// The session lifetime coincides with the server lifetime. Results can be written
|
// The session lifetime coincides with the server lifetime. Results can be written
|
||||||
// to session token which can be later attached to the requests.
|
// to session token which can be later attached to the requests.
|
||||||
|
@ -115,7 +104,6 @@ func (c *Client) SessionCreate(ctx context.Context, prm PrmSessionCreate) (*ResS
|
||||||
|
|
||||||
res.setID(body.GetID())
|
res.setID(body.GetID())
|
||||||
res.setSessionKey(body.GetSessionKey())
|
res.setSessionKey(body.GetSessionKey())
|
||||||
res.setExp(reqBody.GetExpiration())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// process call
|
// process call
|
||||||
|
|
|
@ -64,7 +64,7 @@ func (c *sessionCache) DeleteByPrefix(prefix string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sessionCache) UpdateEpoch(newEpoch uint64) {
|
func (c *sessionCache) updateEpoch(newEpoch uint64) {
|
||||||
epoch := atomic.LoadUint64(&c.currentEpoch)
|
epoch := atomic.LoadUint64(&c.currentEpoch)
|
||||||
if newEpoch > epoch {
|
if newEpoch > epoch {
|
||||||
atomic.StoreUint64(&c.currentEpoch, newEpoch)
|
atomic.StoreUint64(&c.currentEpoch, newEpoch)
|
||||||
|
|
35
pool/pool.go
35
pool/pool.go
|
@ -532,14 +532,13 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var healthy bool
|
var healthy bool
|
||||||
cliRes, err := createSessionTokenForDuration(ctx, c, p.rebalanceParams.sessionExpirationDuration)
|
st, err := createSessionTokenForDuration(ctx, c, p.owner, p.rebalanceParams.sessionExpirationDuration)
|
||||||
if err != nil && p.logger != nil {
|
if err != nil && p.logger != nil {
|
||||||
p.logger.Warn("failed to create neofs session token for client",
|
p.logger.Warn("failed to create neofs session token for client",
|
||||||
zap.String("Address", addr),
|
zap.String("Address", addr),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
healthy, atLeastOneHealthy = true, true
|
healthy, atLeastOneHealthy = true, true
|
||||||
st := sessionTokenForOwner(p.owner, cliRes)
|
|
||||||
_ = p.cache.Put(formCacheKey(addr, p.key), st)
|
_ = p.cache.Put(formCacheKey(addr, p.key), st)
|
||||||
}
|
}
|
||||||
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
|
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
|
||||||
|
@ -587,7 +586,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||||
prmInit.ResolveNeoFSFailures()
|
prmInit.ResolveNeoFSFailures()
|
||||||
prmInit.SetDefaultPrivateKey(*params.key)
|
prmInit.SetDefaultPrivateKey(*params.key)
|
||||||
prmInit.SetResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
prmInit.SetResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
|
||||||
cache.UpdateEpoch(info.Epoch())
|
cache.updateEpoch(info.Epoch())
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -787,7 +786,7 @@ func (p *Pool) checkSessionTokenErr(err error, address string) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSessionTokenForDuration(ctx context.Context, c client, dur uint64) (*sdkClient.ResSessionCreate, error) {
|
func createSessionTokenForDuration(ctx context.Context, c client, ownerID *owner.ID, dur uint64) (*session.Token, error) {
|
||||||
ni, err := c.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
ni, err := c.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -795,14 +794,21 @@ func createSessionTokenForDuration(ctx context.Context, c client, dur uint64) (*
|
||||||
|
|
||||||
epoch := ni.Info().CurrentEpoch()
|
epoch := ni.Info().CurrentEpoch()
|
||||||
|
|
||||||
var prm sdkClient.PrmSessionCreate
|
var exp uint64
|
||||||
if math.MaxUint64-epoch < dur {
|
if math.MaxUint64-epoch < dur {
|
||||||
prm.SetExp(math.MaxUint64)
|
exp = math.MaxUint64
|
||||||
} else {
|
} else {
|
||||||
prm.SetExp(epoch + dur)
|
exp = epoch + dur
|
||||||
|
}
|
||||||
|
var prm sdkClient.PrmSessionCreate
|
||||||
|
prm.SetExp(exp)
|
||||||
|
|
||||||
|
res, err := c.SessionCreate(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.SessionCreate(ctx, prm)
|
return sessionTokenForOwner(ownerID, res, exp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type callContext struct {
|
type callContext struct {
|
||||||
|
@ -860,13 +866,13 @@ func (p *Pool) openDefaultSession(ctx *callContext) error {
|
||||||
|
|
||||||
tok := p.cache.Get(cacheKey)
|
tok := p.cache.Get(cacheKey)
|
||||||
if tok == nil {
|
if tok == nil {
|
||||||
|
var err error
|
||||||
// open new session
|
// open new session
|
||||||
cliRes, err := createSessionTokenForDuration(ctx, ctx.client, p.stokenDuration)
|
tok, err = createSessionTokenForDuration(ctx, ctx.client, owner.NewIDFromPublicKey(&ctx.key.PublicKey), p.stokenDuration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("session API client: %w", err)
|
return fmt.Errorf("session API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tok = sessionTokenForOwner(owner.NewIDFromPublicKey(&ctx.key.PublicKey), cliRes)
|
|
||||||
// cache the opened session
|
// cache the opened session
|
||||||
p.cache.Put(cacheKey, tok)
|
p.cache.Put(cacheKey, tok)
|
||||||
}
|
}
|
||||||
|
@ -1576,18 +1582,13 @@ func (p *Pool) Close() {
|
||||||
<-p.closedCh
|
<-p.closedCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// creates new session token from SessionCreate call result.
|
|
||||||
func (p *Pool) newSessionToken(cliRes *sdkClient.ResSessionCreate) *session.Token {
|
|
||||||
return sessionTokenForOwner(p.owner, cliRes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// creates new session token with specified owner from SessionCreate call result.
|
// creates new session token with specified owner from SessionCreate call result.
|
||||||
func sessionTokenForOwner(id *owner.ID, cliRes *sdkClient.ResSessionCreate) *session.Token {
|
func sessionTokenForOwner(id *owner.ID, cliRes *sdkClient.ResSessionCreate, exp uint64) *session.Token {
|
||||||
st := session.NewToken()
|
st := session.NewToken()
|
||||||
st.SetOwnerID(id)
|
st.SetOwnerID(id)
|
||||||
st.SetID(cliRes.ID())
|
st.SetID(cliRes.ID())
|
||||||
st.SetSessionKey(cliRes.PublicKey())
|
st.SetSessionKey(cliRes.PublicKey())
|
||||||
st.SetExp(cliRes.Expiration())
|
st.SetExp(exp)
|
||||||
|
|
||||||
return st
|
return st
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue