forked from TrueCloudLab/frostfs-node
[#1255] node/session: Add persistent session storage
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
929c9851a6
commit
455b9fb325
7 changed files with 331 additions and 20 deletions
72
pkg/services/session/storage/persistent/executor.go
Normal file
72
pkg/services/session/storage/persistent/executor.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package persistent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Create inits a new private session token using information
|
||||
// from corresponding request, saves it to bolt database (and
|
||||
// encrypts private keys if storage has been configured so).
|
||||
// Returns response that is filled with just created token's
|
||||
// ID and public key for it.
|
||||
func (s *TokenStore) Create(ctx context.Context, body *session.CreateRequestBody) (*session.CreateResponseBody, error) {
|
||||
ownerBytes, err := owner.NewIDFromV2(body.GetOwnerID()).Marshal()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
uidBytes, err := storage.NewTokenID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate token ID: %w", err)
|
||||
}
|
||||
|
||||
sk, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value, err := packToken(body.GetExpiration(), &sk.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.db.Update(func(tx *bbolt.Tx) error {
|
||||
rootBucket := tx.Bucket(sessionsBucket)
|
||||
|
||||
ownerBucket, err := rootBucket.CreateBucketIfNotExists(ownerBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"could not get/create %s owner bucket: %w",
|
||||
hex.EncodeToString(ownerBytes),
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
err = ownerBucket.Put(uidBytes, value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not put session token for %s oid: %w",
|
||||
hex.EncodeToString(ownerBytes),
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not save token to persistent storage: %w", err)
|
||||
}
|
||||
|
||||
res := new(session.CreateResponseBody)
|
||||
res.SetID(uidBytes)
|
||||
res.SetSessionKey(sk.PublicKey().Bytes())
|
||||
|
||||
return res, nil
|
||||
}
|
38
pkg/services/session/storage/persistent/options.go
Normal file
38
pkg/services/session/storage/persistent/options.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package persistent
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cfg struct {
|
||||
l *zap.Logger
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// Option allows setting optional parameters of the TokenStore.
|
||||
type Option func(*cfg)
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
l: zap.L(),
|
||||
timeout: 100 * time.Millisecond,
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns an option to specify
|
||||
// logger.
|
||||
func WithLogger(v *zap.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.l = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout returns option to specify
|
||||
// database connection timeout.
|
||||
func WithTimeout(v time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.timeout = v
|
||||
}
|
||||
}
|
130
pkg/services/session/storage/persistent/storage.go
Normal file
130
pkg/services/session/storage/persistent/storage.go
Normal file
|
@ -0,0 +1,130 @@
|
|||
package persistent
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
|
||||
ownerSDK "github.com/nspcc-dev/neofs-sdk-go/owner"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// TokenStore is a wrapper around persistent K:V db that
|
||||
// allows creating (storing), retrieving and expiring
|
||||
// (removing) session tokens.
|
||||
type TokenStore struct {
|
||||
db *bbolt.DB
|
||||
|
||||
l *zap.Logger
|
||||
}
|
||||
|
||||
var sessionsBucket = []byte("sessions")
|
||||
|
||||
// NewTokenStore creates, initializes and returns a new TokenStore instance.
|
||||
//
|
||||
// The elements of the instance are stored in bolt DB.
|
||||
func NewTokenStore(path string, opts ...Option) (*TokenStore, error) {
|
||||
cfg := defaultCfg()
|
||||
|
||||
for _, o := range opts {
|
||||
o(cfg)
|
||||
}
|
||||
|
||||
db, err := bbolt.Open(path, 0600,
|
||||
&bbolt.Options{
|
||||
Timeout: cfg.timeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't open bbolt at %s: %w", path, err)
|
||||
}
|
||||
|
||||
err = db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(sessionsBucket)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
|
||||
return nil, fmt.Errorf("could not init session bucket: %w", err)
|
||||
}
|
||||
|
||||
return &TokenStore{db: db, l: cfg.l}, nil
|
||||
}
|
||||
|
||||
// Get returns private token corresponding to the given identifiers.
|
||||
//
|
||||
// Returns nil is there is no element in storage.
|
||||
func (s *TokenStore) Get(ownerID *ownerSDK.ID, tokenID []byte) (t *storage.PrivateToken) {
|
||||
ownerBytes, err := ownerID.Marshal()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = s.db.View(func(tx *bbolt.Tx) error {
|
||||
rootBucket := tx.Bucket(sessionsBucket)
|
||||
|
||||
ownerBucket := rootBucket.Bucket(ownerBytes)
|
||||
if ownerBucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rawToken := ownerBucket.Get(tokenID)
|
||||
if rawToken == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
t, err = unpackToken(rawToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
s.l.Error("could not get session from persistent storage",
|
||||
zap.Error(err),
|
||||
zap.Stringer("ownerID", ownerID),
|
||||
zap.String("tokenID", hex.EncodeToString(tokenID)),
|
||||
)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveOld removes all tokens expired since provided epoch.
|
||||
func (s *TokenStore) RemoveOld(epoch uint64) {
|
||||
err := s.db.Update(func(tx *bbolt.Tx) error {
|
||||
rootBucket := tx.Bucket(sessionsBucket)
|
||||
|
||||
// iterating over ownerIDs
|
||||
return iterateNestedBuckets(rootBucket, func(b *bbolt.Bucket) error {
|
||||
c := b.Cursor()
|
||||
var err error
|
||||
|
||||
// iterating over fixed ownerID's tokens
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if epochFromToken(v) <= epoch {
|
||||
err = c.Delete()
|
||||
if err != nil {
|
||||
s.l.Error("could not delete %s token",
|
||||
zap.String("token_id", hex.EncodeToString(k)),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
s.l.Error("could not clean up expired tokens",
|
||||
zap.Uint64("epoch", epoch),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes database connection.
|
||||
func (s *TokenStore) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
59
pkg/services/session/storage/persistent/util.go
Normal file
59
pkg/services/session/storage/persistent/util.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package persistent
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/x509"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const expOffset = 8
|
||||
|
||||
func packToken(exp uint64, key *ecdsa.PrivateKey) ([]byte, error) {
|
||||
rawKey, err := x509.MarshalECPrivateKey(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not marshal private key: %w", err)
|
||||
}
|
||||
|
||||
res := make([]byte, expOffset, expOffset+len(rawKey))
|
||||
binary.LittleEndian.PutUint64(res, exp)
|
||||
|
||||
res = append(res, rawKey...)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func unpackToken(raw []byte) (*storage.PrivateToken, error) {
|
||||
epoch := binary.LittleEndian.Uint64(raw[:expOffset])
|
||||
|
||||
key, err := x509.ParseECPrivateKey(raw[expOffset:])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal private key: %w", err)
|
||||
}
|
||||
|
||||
return storage.NewPrivateToken(key, epoch), nil
|
||||
}
|
||||
|
||||
func epochFromToken(rawToken []byte) uint64 {
|
||||
return binary.LittleEndian.Uint64(rawToken)
|
||||
}
|
||||
|
||||
func iterateNestedBuckets(b *bbolt.Bucket, fn func(b *bbolt.Bucket) error) error {
|
||||
c := b.Cursor()
|
||||
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
// nil value is a hallmark
|
||||
// of the nested buckets
|
||||
if v == nil {
|
||||
err := fn(b.Bucket(k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
|
@ -18,30 +17,21 @@ func (s *TokenStore) Create(ctx context.Context, body *session.CreateRequestBody
|
|||
panic(err)
|
||||
}
|
||||
|
||||
uid, err := uuid.NewRandom()
|
||||
uidBytes, err := storage.NewTokenID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate token ID: %w", err)
|
||||
}
|
||||
|
||||
uidBytes, err := uid.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not marshal token ID: %w", err)
|
||||
}
|
||||
|
||||
sk, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
privateToken := new(storage.PrivateToken)
|
||||
privateToken.SetSessionKey(&sk.PrivateKey)
|
||||
privateToken.SetExpiredAt(body.GetExpiration())
|
||||
|
||||
s.mtx.Lock()
|
||||
s.tokens[key{
|
||||
tokenID: base58.Encode(uidBytes),
|
||||
ownerID: base58.Encode(ownerBytes),
|
||||
}] = privateToken
|
||||
}] = storage.NewPrivateToken(&sk.PrivateKey, body.GetExpiration())
|
||||
s.mtx.Unlock()
|
||||
|
||||
res := new(session.CreateResponseBody)
|
||||
|
|
|
@ -11,14 +11,13 @@ type PrivateToken struct {
|
|||
exp uint64
|
||||
}
|
||||
|
||||
// SetSessionKey sets a private session key.
|
||||
func (t *PrivateToken) SetSessionKey(sessionKey *ecdsa.PrivateKey) {
|
||||
t.sessionKey = sessionKey
|
||||
// NewPrivateToken returns new private token based on the
|
||||
// passed values.
|
||||
func NewPrivateToken(sk *ecdsa.PrivateKey, exp uint64) *PrivateToken {
|
||||
return &PrivateToken{
|
||||
sessionKey: sk,
|
||||
exp: exp,
|
||||
}
|
||||
|
||||
// SetExpiredAt sets epoch number until token is valid.
|
||||
func (t *PrivateToken) SetExpiredAt(exp uint64) {
|
||||
t.exp = exp
|
||||
}
|
||||
|
||||
// SessionKey returns the private session key.
|
||||
|
|
23
pkg/services/session/storage/util.go
Normal file
23
pkg/services/session/storage/util.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// NewTokenID generates new ID for a token
|
||||
// based on UUID.
|
||||
func NewTokenID() ([]byte, error) {
|
||||
uid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate UUID: %w", err)
|
||||
}
|
||||
|
||||
uidBytes, err := uid.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not marshal marshal UUID: %w", err)
|
||||
}
|
||||
|
||||
return uidBytes, nil
|
||||
}
|
Loading…
Reference in a new issue