[#529] objectcore: Fix object content validation

There are old objects where the owner of the object
may not match the one who issued the token.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-07-28 15:44:35 +03:00
parent ab2614ec2d
commit ae81d6660a
10 changed files with 535 additions and 42 deletions

View file

@ -511,6 +511,8 @@ type cfgObject struct {
cfgLocalStorage cfgLocalStorage
tombstoneLifetime uint64
skipSessionTokenIssuerVerification bool
}
type cfgNotifications struct {
@ -677,8 +679,9 @@ func initCfgGRPC() cfgGRPC {
func initCfgObject(appCfg *config.Config) cfgObject {
return cfgObject{
pool: initObjectPool(appCfg),
tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg),
pool: initObjectPool(appCfg),
tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg),
skipSessionTokenIssuerVerification: objectconfig.Put(appCfg).SkipSessionTokenIssuerVerification(),
}
}

View file

@ -51,3 +51,8 @@ func (g PutConfig) PoolSizeLocal() int {
return PutPoolSizeDefault
}
// SkipSessionTokenIssuerVerification returns the value of "skip_session_token_issuer_verification" config parameter or `false“ if is not defined.
func (g PutConfig) SkipSessionTokenIssuerVerification() bool {
return config.BoolSafe(g.cfg, "skip_session_token_issuer_verification")
}

View file

@ -16,6 +16,7 @@ func TestObjectSection(t *testing.T) {
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeRemote())
require.Equal(t, objectconfig.PutPoolSizeDefault, objectconfig.Put(empty).PoolSizeLocal())
require.EqualValues(t, objectconfig.DefaultTombstoneLifetime, objectconfig.TombstoneLifetime(empty))
require.False(t, objectconfig.Put(empty).SkipSessionTokenIssuerVerification())
})
const path = "../../../../config/example/node"
@ -24,6 +25,7 @@ func TestObjectSection(t *testing.T) {
require.Equal(t, 100, objectconfig.Put(c).PoolSizeRemote())
require.Equal(t, 200, objectconfig.Put(c).PoolSizeLocal())
require.EqualValues(t, 10, objectconfig.TombstoneLifetime(c))
require.True(t, objectconfig.Put(c).SkipSessionTokenIssuerVerification())
}
configtest.ForEachFileType(path, fileConfigTest)

View file

@ -160,8 +160,9 @@ func initObjectService(c *cfg) {
addPolicer(c, keyStorage, c.bgClientCache)
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
irFetcher := newCachedIRFetcher(createInnerRingFetcher(c))
sPut := createPutSvc(c, keyStorage)
sPut := createPutSvc(c, keyStorage, &irFetcher)
sPutV2 := createPutSvcV2(sPut, keyStorage)
@ -184,7 +185,7 @@ func initObjectService(c *cfg) {
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
aclSvc := createACLServiceV2(c, splitSvc)
aclSvc := createACLServiceV2(c, splitSvc, &irFetcher)
var commonSvc objectService.Common
commonSvc.Init(&c.internals, aclSvc)
@ -295,7 +296,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
)
}
func createPutSvc(c *cfg, keyStorage *util.KeyStorage) *putsvc.Service {
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
var os putsvc.ObjectStorage = engineWithoutNotifications{
@ -320,8 +321,10 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage) *putsvc.Service {
c.netMapSource,
c,
c.cfgNetmap.state,
irFetcher,
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log),
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
)
}
@ -405,14 +408,13 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi
)
}
func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter) v2.Service {
func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter, irFetcher *cachedIRFetcher) v2.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
irFetcher := createInnerRingFetcher(c)
return v2.New(
splitSvc,
c.netMapSource,
newCachedIRFetcher(irFetcher),
irFetcher,
acl.NewChecker(
c.cfgNetmap.state,
c.cfgObject.eaclSource,

View file

@ -86,6 +86,7 @@ FROSTFS_REPLICATOR_POOL_SIZE=10
# Object service section
FROSTFS_OBJECT_PUT_POOL_SIZE_REMOTE=100
FROSTFS_OBJECT_PUT_POOL_SIZE_LOCAL=200
FROSTFS_OBJECT_PUT_SKIP_SESSION_TOKEN_ISSUER_VERIFICATION=true
FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section

View file

@ -130,7 +130,8 @@
},
"put": {
"pool_size_remote": 100,
"pool_size_local": 200
"pool_size_local": 200,
"skip_session_token_issuer_verification": true
}
},
"storage": {

View file

@ -110,6 +110,7 @@ object:
put:
pool_size_remote: 100 # number of async workers for remote PUT operations
pool_size_local: 200 # number of async workers for local PUT operations
skip_session_token_issuer_verification: true # session token issuer verification will be skipped if true
storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)

View file

@ -1,20 +1,26 @@
package object
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/sha256"
"errors"
"fmt"
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// FormatValidator represents an object format validator.
@ -26,8 +32,16 @@ type FormatValidator struct {
type FormatValidatorOption func(*cfg)
type cfg struct {
netState netmap.State
e LockSource
netState netmap.State
e LockSource
ir InnerRing
netmap netmap.Source
containers container.Source
verifyTokenIssuer bool
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
// DeleteHandler is an interface of delete queue processor.
@ -159,13 +173,117 @@ func (v *FormatValidator) validateSignatureKey(obj *objectSDK.Object) error {
return v.checkOwnerKey(ownerID, key)
}
if !token.Issuer().Equals(ownerID) {
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", v, token.Issuer(), ownerID)
if v.verifyTokenIssuer {
signerIsIROrContainerNode, err := v.isIROrContainerNode(obj, binKey)
if err != nil {
return err
}
if signerIsIROrContainerNode {
return nil
}
if !token.Issuer().Equals(ownerID) {
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", v, token.Issuer(), ownerID)
}
return nil
}
return nil
}
func (v *FormatValidator) isIROrContainerNode(obj *objectSDK.Object, signerKey []byte) (bool, error) {
pKey, err := keys.NewPublicKeyFromBytes(signerKey, elliptic.P256())
if err != nil {
return false, fmt.Errorf("(%T) failed to unmarshal signer public key: %w", v, err)
}
isIR, err := v.isInnerRingKey(pKey.Bytes())
if err != nil {
return false, fmt.Errorf("(%T) failed to check if signer is inner ring node: %w", v, err)
}
if isIR {
return true, nil
}
isContainerNode, err := v.isContainerNode(pKey.Bytes(), obj)
if err != nil {
return false, fmt.Errorf("(%T) failed to check if signer is container node: %w", v, err)
}
return isContainerNode, nil
}
func (v *FormatValidator) isInnerRingKey(key []byte) (bool, error) {
innerRingKeys, err := v.ir.InnerRingKeys()
if err != nil {
return false, err
}
for i := range innerRingKeys {
if bytes.Equal(innerRingKeys[i], key) {
return true, nil
}
}
return false, nil
}
func (v *FormatValidator) isContainerNode(key []byte, obj *objectSDK.Object) (bool, error) {
cnrID, containerIDSet := obj.ContainerID()
if !containerIDSet {
return false, errNilCID
}
cnrIDBin := make([]byte, sha256.Size)
cnrID.Encode(cnrIDBin)
cnr, err := v.containers.Get(cnrID)
if err != nil {
return false, fmt.Errorf("failed to get container (id=%s): %w", cnrID.EncodeToString(), err)
}
lastNetmap, err := netmap.GetLatestNetworkMap(v.netmap)
if err != nil {
return false, fmt.Errorf("failed to get latest netmap: %w", err)
}
isContainerNode, err := v.isContainerNodeKey(lastNetmap, cnr, cnrIDBin, key)
if err != nil {
return false, fmt.Errorf("failed to check latest netmap for container nodes: %w", err)
}
if isContainerNode {
return true, nil
}
previousNetmap, err := netmap.GetPreviousNetworkMap(v.netmap)
if err != nil {
return false, fmt.Errorf("failed to get previous netmap: %w", err)
}
isContainerNode, err = v.isContainerNodeKey(previousNetmap, cnr, cnrIDBin, key)
if err != nil {
return false, fmt.Errorf("failed to check previous netmap for container nodes: %w", err)
}
return isContainerNode, nil
}
func (v *FormatValidator) isContainerNodeKey(nm *netmapSDK.NetMap, cnr *container.Container, cnrIDBin, key []byte) (bool, error) {
cnrVectors, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), cnrIDBin)
if err != nil {
return false, err
}
for i := range cnrVectors {
for j := range cnrVectors[i] {
if bytes.Equal(cnrVectors[i][j].PublicKey(), key) {
return true, nil
}
}
}
return false, nil
}
func (v *FormatValidator) checkOwnerKey(id user.ID, key frostfsecdsa.PublicKey) error {
var id2 user.ID
user.IDFromKey(&id2, (ecdsa.PublicKey)(key))
@ -387,3 +505,31 @@ func WithLockSource(e LockSource) FormatValidatorOption {
c.e = e
}
}
// WithInnerRing return option to set Inner Ring source.
func WithInnerRing(ir InnerRing) FormatValidatorOption {
return func(c *cfg) {
c.ir = ir
}
}
// WithNetmapSource return option to set Netmap source.
func WithNetmapSource(ns netmap.Source) FormatValidatorOption {
return func(c *cfg) {
c.netmap = ns
}
}
// WithContainersSource return option to set Containers source.
func WithContainersSource(cs container.Source) FormatValidatorOption {
return func(c *cfg) {
c.containers = cs
}
}
// WithVerifySessionTokenIssuer return option to set verify session token issuer value.
func WithVerifySessionTokenIssuer(verifySessionTokenIssuer bool) FormatValidatorOption {
return func(c *cfg) {
c.verifyTokenIssuer = verifySessionTokenIssuer
}
}

View file

@ -3,12 +3,17 @@ package object
import (
"context"
"crypto/ecdsa"
"fmt"
"strconv"
"testing"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -106,34 +111,6 @@ func TestFormatValidator_Validate(t *testing.T) {
require.NoError(t, v.Validate(context.Background(), obj, false))
})
t.Run("invalid w/ session token", func(t *testing.T) {
var idOwner user.ID
user.IDFromKey(&idOwner, ownerKey.PrivateKey.PublicKey)
var randomUserID user.ID
randPrivKey, err := keys.NewPrivateKey()
require.NoError(t, err)
user.IDFromKey(&randomUserID, randPrivKey.PrivateKey.PublicKey)
tok := sessiontest.Object()
fsPubKey := frostfsecdsa.PublicKey(*ownerKey.PublicKey())
tok.SetID(uuid.New())
tok.SetAuthKey(&fsPubKey)
tok.SetExp(100500)
tok.SetIat(1)
tok.SetNbf(1)
err = tok.Sign(ownerKey.PrivateKey)
require.NoError(t, err)
obj := objectSDK.New()
obj.SetContainerID(cidtest.ID())
obj.SetSessionToken(tok)
obj.SetOwnerID(&randomUserID)
require.NoError(t, objectSDK.SetIDWithSignature(ownerKey.PrivateKey, obj))
require.Error(t, v.Validate(context.Background(), obj, false)) //invalid owner
})
t.Run("correct w/o session token", func(t *testing.T) {
obj := blankValidObject(&ownerKey.PrivateKey)
@ -284,3 +261,334 @@ func TestFormatValidator_Validate(t *testing.T) {
})
})
}
func TestFormatValidator_ValidateTokenIssuer(t *testing.T) {
const curEpoch = 13
ls := testLockSource{
m: make(map[oid.Address]bool),
}
signer, err := keys.NewPrivateKey()
require.NoError(t, err)
var owner user.ID
ownerPrivKey, err := keys.NewPrivateKey()
require.NoError(t, err)
user.IDFromKey(&owner, ownerPrivKey.PrivateKey.PublicKey)
t.Run("different issuer and owner, verify issuer disabled", func(t *testing.T) {
t.Parallel()
v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
WithVerifySessionTokenIssuer(false),
)
tok := sessiontest.Object()
fsPubKey := frostfsecdsa.PublicKey(*signer.PublicKey())
tok.SetID(uuid.New())
tok.SetAuthKey(&fsPubKey)
tok.SetExp(100500)
tok.SetIat(1)
tok.SetNbf(1)
require.NoError(t, tok.Sign(signer.PrivateKey))
obj := objectSDK.New()
obj.SetContainerID(cidtest.ID())
obj.SetSessionToken(tok)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
require.NoError(t, v.Validate(context.Background(), obj, false))
})
t.Run("different issuer and owner, issuer is IR node, verify issuer enabled", func(t *testing.T) {
t.Parallel()
v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
WithVerifySessionTokenIssuer(true),
WithInnerRing(&testIRSource{
irNodes: [][]byte{signer.PublicKey().Bytes()},
}),
)
tok := sessiontest.Object()
fsPubKey := frostfsecdsa.PublicKey(*signer.PublicKey())
tok.SetID(uuid.New())
tok.SetAuthKey(&fsPubKey)
tok.SetExp(100500)
tok.SetIat(1)
tok.SetNbf(1)
require.NoError(t, tok.Sign(signer.PrivateKey))
obj := objectSDK.New()
obj.SetContainerID(cidtest.ID())
obj.SetSessionToken(tok)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
require.NoError(t, v.Validate(context.Background(), obj, false))
})
t.Run("different issuer and owner, issuer is container node in current epoch, verify issuer enabled", func(t *testing.T) {
t.Parallel()
tok := sessiontest.Object()
fsPubKey := frostfsecdsa.PublicKey(*signer.PublicKey())
tok.SetID(uuid.New())
tok.SetAuthKey(&fsPubKey)
tok.SetExp(100500)
tok.SetIat(1)
tok.SetNbf(1)
require.NoError(t, tok.Sign(signer.PrivateKey))
cnrID := cidtest.ID()
cont := containerSDK.Container{}
cont.Init()
pp := netmap.PlacementPolicy{}
require.NoError(t, pp.DecodeString("REP 1"))
cont.SetPlacementPolicy(pp)
var node netmap.NodeInfo
node.SetPublicKey(signer.PublicKey().Bytes())
currentEpochNM := &netmap.NetMap{}
currentEpochNM.SetEpoch(curEpoch)
currentEpochNM.SetNodes([]netmap.NodeInfo{node})
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
WithVerifySessionTokenIssuer(true),
WithInnerRing(&testIRSource{
irNodes: [][]byte{},
}),
WithContainersSource(
&testContainerSource{
containers: map[cid.ID]*container.Container{
cnrID: {
Value: cont,
},
},
},
),
WithNetmapSource(
&testNetmapSource{
netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM,
},
currentEpoch: curEpoch,
},
),
)
require.NoError(t, v.Validate(context.Background(), obj, false))
})
t.Run("different issuer and owner, issuer is container node in previous epoch, verify issuer enabled", func(t *testing.T) {
t.Parallel()
tok := sessiontest.Object()
fsPubKey := frostfsecdsa.PublicKey(*signer.PublicKey())
tok.SetID(uuid.New())
tok.SetAuthKey(&fsPubKey)
tok.SetExp(100500)
tok.SetIat(1)
tok.SetNbf(1)
require.NoError(t, tok.Sign(signer.PrivateKey))
cnrID := cidtest.ID()
cont := containerSDK.Container{}
cont.Init()
pp := netmap.PlacementPolicy{}
require.NoError(t, pp.DecodeString("REP 1"))
cont.SetPlacementPolicy(pp)
var issuerNode netmap.NodeInfo
issuerNode.SetPublicKey(signer.PublicKey().Bytes())
var nonIssuerNode netmap.NodeInfo
nonIssuerKey, err := keys.NewPrivateKey()
require.NoError(t, err)
nonIssuerNode.SetPublicKey(nonIssuerKey.PublicKey().Bytes())
currentEpochNM := &netmap.NetMap{}
currentEpochNM.SetEpoch(curEpoch)
currentEpochNM.SetNodes([]netmap.NodeInfo{nonIssuerNode})
previousEpochNM := &netmap.NetMap{}
previousEpochNM.SetEpoch(curEpoch - 1)
previousEpochNM.SetNodes([]netmap.NodeInfo{issuerNode})
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
WithVerifySessionTokenIssuer(true),
WithInnerRing(&testIRSource{
irNodes: [][]byte{},
}),
WithContainersSource(
&testContainerSource{
containers: map[cid.ID]*container.Container{
cnrID: {
Value: cont,
},
},
},
),
WithNetmapSource(
&testNetmapSource{
netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM,
curEpoch - 1: previousEpochNM,
},
currentEpoch: curEpoch,
},
),
)
require.NoError(t, v.Validate(context.Background(), obj, false))
})
t.Run("different issuer and owner, issuer is unknown, verify issuer enabled", func(t *testing.T) {
t.Parallel()
tok := sessiontest.Object()
fsPubKey := frostfsecdsa.PublicKey(*signer.PublicKey())
tok.SetID(uuid.New())
tok.SetAuthKey(&fsPubKey)
tok.SetExp(100500)
tok.SetIat(1)
tok.SetNbf(1)
require.NoError(t, tok.Sign(signer.PrivateKey))
cnrID := cidtest.ID()
cont := containerSDK.Container{}
cont.Init()
pp := netmap.PlacementPolicy{}
require.NoError(t, pp.DecodeString("REP 1"))
cont.SetPlacementPolicy(pp)
var nonIssuerNode1 netmap.NodeInfo
nonIssuerKey1, err := keys.NewPrivateKey()
require.NoError(t, err)
nonIssuerNode1.SetPublicKey(nonIssuerKey1.PublicKey().Bytes())
var nonIssuerNode2 netmap.NodeInfo
nonIssuerKey2, err := keys.NewPrivateKey()
require.NoError(t, err)
nonIssuerNode2.SetPublicKey(nonIssuerKey2.PublicKey().Bytes())
currentEpochNM := &netmap.NetMap{}
currentEpochNM.SetEpoch(curEpoch)
currentEpochNM.SetNodes([]netmap.NodeInfo{nonIssuerNode1})
previousEpochNM := &netmap.NetMap{}
previousEpochNM.SetEpoch(curEpoch - 1)
previousEpochNM.SetNodes([]netmap.NodeInfo{nonIssuerNode2})
obj := objectSDK.New()
obj.SetContainerID(cnrID)
obj.SetSessionToken(tok)
obj.SetOwnerID(&owner)
require.NoError(t, objectSDK.SetIDWithSignature(signer.PrivateKey, obj))
v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
WithLockSource(ls),
WithVerifySessionTokenIssuer(true),
WithInnerRing(&testIRSource{
irNodes: [][]byte{},
}),
WithContainersSource(
&testContainerSource{
containers: map[cid.ID]*container.Container{
cnrID: {
Value: cont,
},
},
},
),
WithNetmapSource(
&testNetmapSource{
netmaps: map[uint64]*netmap.NetMap{
curEpoch: currentEpochNM,
curEpoch - 1: previousEpochNM,
},
currentEpoch: curEpoch,
},
),
)
require.Error(t, v.Validate(context.Background(), obj, false))
})
}
type testIRSource struct {
irNodes [][]byte
}
func (s *testIRSource) InnerRingKeys() ([][]byte, error) {
return s.irNodes, nil
}
type testContainerSource struct {
containers map[cid.ID]*container.Container
}
func (s *testContainerSource) Get(cnrID cid.ID) (*container.Container, error) {
if cnr, found := s.containers[cnrID]; found {
return cnr, nil
}
return nil, fmt.Errorf("container not found")
}
func (s *testContainerSource) DeletionInfo(cid.ID) (*container.DelInfo, error) {
return nil, nil
}
type testNetmapSource struct {
netmaps map[uint64]*netmap.NetMap
currentEpoch uint64
}
func (s *testNetmapSource) GetNetMap(diff uint64) (*netmap.NetMap, error) {
if diff >= s.currentEpoch {
return nil, fmt.Errorf("invalid diff")
}
return s.GetNetMapByEpoch(s.currentEpoch - diff)
}
func (s *testNetmapSource) GetNetMapByEpoch(epoch uint64) (*netmap.NetMap, error) {
if nm, found := s.netmaps[epoch]; found {
return nm, nil
}
return nil, fmt.Errorf("netmap not found")
}
func (s *testNetmapSource) Epoch() (uint64, error) {
return s.currentEpoch, nil
}

View file

@ -29,6 +29,14 @@ type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
type cfg struct {
keyStorage *objutil.KeyStorage
@ -51,6 +59,8 @@ type cfg struct {
clientConstructor ClientConstructor
log *logger.Logger
verifySessionTokenIssuer bool
}
func NewService(ks *objutil.KeyStorage,
@ -61,6 +71,7 @@ func NewService(ks *objutil.KeyStorage,
ns netmap.Source,
nk netmap.AnnouncedKeys,
nst netmap.State,
ir InnerRing,
opts ...Option) *Service {
c := &cfg{
remotePool: util.NewPseudoWorkerPool(),
@ -80,7 +91,14 @@ func NewService(ks *objutil.KeyStorage,
opts[i](c)
}
c.fmtValidator = object.NewFormatValidator(object.WithLockSource(os), object.WithNetState(nst))
c.fmtValidator = object.NewFormatValidator(
object.WithLockSource(os),
object.WithNetState(nst),
object.WithInnerRing(ir),
object.WithNetmapSource(ns),
object.WithContainersSource(cs),
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer),
)
return &Service{
cfg: c,
@ -104,3 +122,9 @@ func WithLogger(l *logger.Logger) Option {
c.log = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *cfg) {
c.verifySessionTokenIssuer = v
}
}