object/put: Persist session token till the end of a session

Previously a token could've expired in the middle of an object.PUT
stream, leading to upload being interrupted. This is bad, because user
doesn't always now what is the right values for the session token
lifetime. More than that, setting it to a very high value will
eventually blow up the session token database.

In this commit we read the session token once and reuse it for the whole
stream duration.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-03-14 13:31:30 +03:00
parent 4e244686cf
commit 2b6435a80c
2 changed files with 15 additions and 19 deletions

View file

@ -2,6 +2,7 @@ package putsvc
import ( import (
"context" "context"
"crypto/ecdsa"
"fmt" "fmt"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
@ -17,7 +18,7 @@ import (
type remoteTarget struct { type remoteTarget struct {
ctx context.Context ctx context.Context
keyStorage *util.KeyStorage privateKey *ecdsa.PrivateKey
commonPrm *util.CommonPrm commonPrm *util.CommonPrm
@ -50,20 +51,6 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
} }
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
var sessionInfo *util.SessionInfo
if tok := t.commonPrm.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
key, err := t.keyStorage.GetKey(sessionInfo)
if err != nil {
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
}
c, err := t.clientConstructor.Get(t.nodeInfo) c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
@ -73,7 +60,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
prm.SetContext(t.ctx) prm.SetContext(t.ctx)
prm.SetClient(c) prm.SetClient(c)
prm.SetPrivateKey(key) prm.SetPrivateKey(t.privateKey)
prm.SetSessionToken(t.commonPrm.SessionToken()) prm.SetSessionToken(t.commonPrm.SessionToken())
prm.SetBearerToken(t.commonPrm.BearerToken()) prm.SetBearerToken(t.commonPrm.BearerToken())
prm.SetXHeaders(t.commonPrm.XHeaders()) prm.SetXHeaders(t.commonPrm.XHeaders())
@ -116,13 +103,18 @@ func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm {
// PutObject sends object to remote node. // PutObject sends object to remote node.
func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return err
}
t := &remoteTarget{ t := &remoteTarget{
ctx: ctx, ctx: ctx,
keyStorage: s.keyStorage, privateKey: key,
clientConstructor: s.clientConstructor, clientConstructor: s.clientConstructor,
} }
err := clientcore.NodeInfoFromRawNetmapElement(&t.nodeInfo, netmapCore.Node(p.node)) err = clientcore.NodeInfoFromRawNetmapElement(&t.nodeInfo, netmapCore.Node(p.node))
if err != nil { if err != nil {
return fmt.Errorf("parse client node info: %w", err) return fmt.Errorf("parse client node info: %w", err)
} }

View file

@ -2,6 +2,7 @@ package putsvc
import ( import (
"context" "context"
"crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
@ -20,6 +21,8 @@ type Streamer struct {
ctx context.Context ctx context.Context
sessionKey *ecdsa.PrivateKey
target transformer.ObjectTarget target transformer.ObjectTarget
relay func(client.NodeInfo, client.MultiAddressClient) error relay func(client.NodeInfo, client.MultiAddressClient) error
@ -115,6 +118,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
} }
} }
p.sessionKey = sessionKey
p.target = &validatingTarget{ p.target = &validatingTarget{
fmt: p.fmtValidator, fmt: p.fmtValidator,
unpreparedObject: true, unpreparedObject: true,
@ -227,7 +231,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
rt := &remoteTarget{ rt := &remoteTarget{
ctx: p.ctx, ctx: p.ctx,
keyStorage: p.keyStorage, privateKey: p.sessionKey,
commonPrm: prm.common, commonPrm: prm.common,
clientConstructor: p.clientConstructor, clientConstructor: p.clientConstructor,
} }