ape: Implement boltdb storage for local overrides #820
15 changed files with 560 additions and 142 deletions
|
@ -29,6 +29,7 @@ import (
|
|||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -68,6 +69,7 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -518,10 +520,7 @@ type cfgObject struct {
|
|||
|
||||
eaclSource container.EACLSource
|
||||
|
||||
// Access policy chain source is used by object service to
|
||||
// check for operation permissions but this source is also shared with
|
||||
// control service that dispatches local overrides.
|
||||
apeChainSource container.AccessPolicyEngineChainSource
|
||||
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
||||
|
||||
pool cfgObjectRoutines
|
||||
|
||||
|
@ -542,6 +541,10 @@ type cfgLocalStorage struct {
|
|||
localStorage *engine.StorageEngine
|
||||
}
|
||||
|
||||
type cfgAccessPolicyEngine struct {
|
||||
accessPolicyEngine *accessPolicyEngine
|
||||
}
|
||||
|
||||
type cfgObjectRoutines struct {
|
||||
putRemote *ants.Pool
|
||||
|
||||
|
@ -970,6 +973,34 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
|||
})
|
||||
}
|
||||
|
||||
func initAccessPolicyEngine(_ context.Context, c *cfg) {
|
||||
var localOverrideDB chainbase.LocalOverrideDatabase
|
||||
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
|
||||
c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
|
||||
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
|
||||
} else {
|
||||
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
|
||||
chainbase.WithLogger(c.log),
|
||||
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
|
||||
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
|
||||
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
|
||||
)
|
||||
}
|
||||
|
||||
morphRuleStorage := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||
|
||||
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
|
||||
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
|
||||
|
||||
c.onShutdown(func() {
|
||||
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
|
||||
c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure,
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||
var err error
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package nodeconfig
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -30,11 +31,18 @@ type NotificationConfig struct {
|
|||
cfg *config.Config
|
||||
}
|
||||
|
||||
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
|
||||
// which provides access to persistent policy rules storage configuration of node.
|
||||
type PersistentPolicyRulesConfig struct {
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
const (
|
||||
subsection = "node"
|
||||
persistentSessionsSubsection = "persistent_sessions"
|
||||
persistentStateSubsection = "persistent_state"
|
||||
notificationSubsection = "notification"
|
||||
persistentPolicyRulesSubsection = "persistent_policy_rules"
|
||||
|
||||
attributePrefix = "attribute"
|
||||
|
||||
|
@ -245,3 +253,42 @@ func (n NotificationConfig) KeyPath() string {
|
|||
func (n NotificationConfig) CAPath() string {
|
||||
return config.StringSafe(n.cfg, "ca")
|
||||
}
|
||||
|
||||
const (
|
||||
// PermDefault is a default permission bits for local override storage file.
|
||||
PermDefault = 0o644
|
||||
)
|
||||
|
||||
// PersistentPolicyRules returns structure that provides access to "persistent_policy_rules"
|
||||
// subsection of "node" section.
|
||||
func PersistentPolicyRules(c *config.Config) PersistentPolicyRulesConfig {
|
||||
return PersistentPolicyRulesConfig{
|
||||
c.Sub(subsection).Sub(persistentPolicyRulesSubsection),
|
||||
}
|
||||
}
|
||||
|
||||
// Path returns the value of "path" config parameter.
|
||||
//
|
||||
// Returns empty string if missing, for compatibility with older configurations.
|
||||
func (l PersistentPolicyRulesConfig) Path() string {
|
||||
return config.StringSafe(l.cfg, "path")
|
||||
}
|
||||
|
||||
// Perm returns the value of "perm" config parameter as a fs.FileMode.
|
||||
//
|
||||
// Returns PermDefault if the value is not a positive number.
|
||||
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
||||
p := config.UintSafe((*config.Config)(l.cfg), "perm")
|
||||
if p == 0 {
|
||||
p = PermDefault
|
||||
}
|
||||
|
||||
return fs.FileMode(p)
|
||||
}
|
||||
|
||||
// NoSync returns the value of "no_sync" config parameter as a bool value.
|
||||
//
|
||||
// Returns false if the value is not a boolean.
|
||||
func (l PersistentPolicyRulesConfig) NoSync() bool {
|
||||
return config.BoolSafe((*config.Config)(l.cfg), "no_sync")
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ func initControlService(c *cfg) {
|
|||
controlSvc.WithTreeService(treeSynchronizer{
|
||||
c.treeService,
|
||||
}),
|
||||
controlSvc.WithAPEChainSource(c.cfgObject.apeChainSource),
|
||||
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
|
||||
)
|
||||
|
||||
lis, err := net.Listen("tcp", endpoint)
|
||||
|
|
|
@ -98,6 +98,12 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
|
||||
})
|
||||
|
||||
initAccessPolicyEngine(ctx, c)
|
||||
initAndLog(c, "access policy engine", func(c *cfg) {
|
||||
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Open(ctx))
|
||||
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Init())
|
||||
})
|
||||
|
||||
initAndLog(c, "gRPC", initGRPC)
|
||||
initAndLog(c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
||||
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
|
||||
|
|
|
@ -157,8 +157,6 @@ func initObjectService(c *cfg) {
|
|||
|
||||
c.replicator = createReplicator(c, keyStorage, c.bgClientCache)
|
||||
|
||||
c.cfgObject.apeChainSource = NewAPESource()
|
||||
|
||||
addPolicer(c, keyStorage, c.bgClientCache)
|
||||
|
||||
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||
|
@ -426,7 +424,7 @@ func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter, irFet
|
|||
c.cfgObject.eaclSource,
|
||||
eaclSDK.NewValidator(),
|
||||
ls),
|
||||
acl.NewAPEChecker(c.log, c.cfgObject.apeChainSource),
|
||||
acl.NewAPEChecker(c.log, c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.chainRouter),
|
||||
c.cfgObject.cnrSource,
|
||||
v2.WithLogger(c.log),
|
||||
)
|
||||
|
|
|
@ -3,33 +3,63 @@ package main
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/resource"
|
||||
)
|
||||
|
||||
type apeChainSourceImpl struct {
|
||||
mtx sync.Mutex
|
||||
localChainStorage map[cid.ID]engine.LocalOverrideEngine
|
||||
type accessPolicyEngine struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
chainRouter engine.ChainRouter
|
||||
|
||||
morphChainStorage engine.MorphRuleChainStorage
|
||||
|
||||
localOverrideDatabase chainbase.LocalOverrideDatabase
|
||||
}
|
||||
|
||||
func NewAPESource() container.AccessPolicyEngineChainSource {
|
||||
return &apeChainSourceImpl{
|
||||
localChainStorage: make(map[cid.ID]engine.LocalOverrideEngine),
|
||||
var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil)
|
||||
|
||||
func newAccessPolicyEngine(
|
||||
morphChainStorage engine.MorphRuleChainStorage,
|
||||
localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine {
|
||||
return &accessPolicyEngine{
|
||||
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
|
||||
morphChainStorage,
|
||||
localOverrideDatabase,
|
||||
),
|
||||
|
||||
morphChainStorage: morphChainStorage,
|
||||
|
||||
localOverrideDatabase: localOverrideDatabase,
|
||||
}
|
||||
}
|
||||
|
||||
var _ container.AccessPolicyEngineChainSource = (*apeChainSourceImpl)(nil)
|
||||
func (a *accessPolicyEngine) IsAllowed(name chain.Name, target engine.RequestTarget, r resource.Request) (status chain.Status, found bool, err error) {
|
||||
a.mtx.RLock()
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
func (c *apeChainSourceImpl) GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
return a.chainRouter.IsAllowed(name, target, r)
|
||||
}
|
||||
|
||||
s, ok := c.localChainStorage[cid]
|
||||
if ok {
|
||||
return s, nil
|
||||
func (a *accessPolicyEngine) MorphRuleChainStorage() engine.MorphRuleChainStorage {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
return a.morphChainStorage
|
||||
}
|
||||
c.localChainStorage[cid] = inmemory.NewInMemoryLocalOverrides()
|
||||
return c.localChainStorage[cid], nil
|
||||
|
||||
func (a *accessPolicyEngine) LocalStorage() engine.LocalOverrideStorage {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
return a.localOverrideDatabase
|
||||
}
|
||||
|
||||
func (a *accessPolicyEngine) LocalOverrideDatabaseCore() chainbase.DatabaseCore {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
return a.localOverrideDatabase
|
||||
}
|
||||
|
|
|
@ -429,7 +429,9 @@ const (
|
|||
FrostFSNodeFailedToAttachShardToEngine = "failed to attach shard to engine"
|
||||
FrostFSNodeShardAttachedToEngine = "shard attached to engine"
|
||||
FrostFSNodeClosingComponentsOfTheStorageEngine = "closing components of the storage engine..."
|
||||
FrostFSNodeAccessPolicyEngineClosingFailure = "ape closing failure"
|
||||
FrostFSNodeStorageEngineClosingFailure = "storage engine closing failure"
|
||||
FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed = "persistent rule storage db path is not set: in-memory will be used"
|
||||
FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully = "all components of the storage engine closed successfully"
|
||||
FrostFSNodeBootstrappingWithTheMaintenanceState = "bootstrapping with the maintenance state"
|
||||
FrostFSNodeBootstrappingWithOnlineState = "bootstrapping with online state"
|
||||
|
|
250
pkg/ape/chainbase/boltdb.go
Normal file
250
pkg/ape/chainbase/boltdb.go
Normal file
|
@ -0,0 +1,250 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type boltLocalOverrideStorage struct {
|
||||
*cfg
|
||||
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
var (
|
||||
chainBucket = []byte{0}
|
||||
)
|
||||
|
||||
var (
|
||||
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
|
||||
|
||||
ErrChainNotFound = logicerr.New("chain has not been found")
|
||||
|
||||
ErrGlobalNamespaceBucketNotFound = logicerr.New("global namespace bucket has not been found")
|
||||
|
||||
ErrTargetTypeBucketNotFound = logicerr.New("target type bucket has not been found")
|
||||
|
||||
ErrTargetNameBucketNotFound = logicerr.New("target name bucket has not been found")
|
||||
)
|
||||
|
||||
// NewBoltLocalOverrideDatabase returns storage wrapper for storing access policy engine
|
||||
// local overrides.
|
||||
//
|
||||
// chain storage (chainBucket):
|
||||
// -> global namespace bucket (nBucket):
|
||||
// --> target bucket (tBucket)
|
||||
// ---> target name (resource) bucket (rBucket):
|
||||
//
|
||||
// | Key | Value |
|
||||
// x---------------------x-------------------x
|
||||
// | chain id (string) | serialized chain |
|
||||
// x---------------------x-------------------x
|
||||
//
|
||||
//nolint:godot
|
||||
func NewBoltLocalOverrideDatabase(opts ...Option) LocalOverrideDatabase {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
return &boltLocalOverrideStorage{
|
||||
cfg: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) Init() error {
|
||||
return cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(chainBucket)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) Open(context.Context) error {
|
||||
err := util.MkdirAllX(filepath.Dir(cs.path), cs.perm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for the chain DB: %w", cs.path, err)
|
||||
}
|
||||
|
||||
opts := *bbolt.DefaultOptions
|
||||
opts.NoSync = cs.noSync
|
||||
opts.Timeout = 100 * time.Millisecond
|
||||
|
||||
cs.db, err = bbolt.Open(cs.path, cs.perm, &opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open the chain DB: %w", err)
|
||||
}
|
||||
|
||||
cs.db.MaxBatchSize = cs.maxBatchSize
|
||||
cs.db.MaxBatchDelay = cs.maxBatchDelay
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) Close() error {
|
||||
var err error
|
||||
if cs.db != nil {
|
||||
err = cs.db.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getTargetBucket(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
|
||||
cbucket := tx.Bucket(chainBucket)
|
||||
if cbucket == nil {
|
||||
return nil, ErrChainBucketNotFound
|
||||
}
|
||||
|
||||
nbucket := cbucket.Bucket([]byte(name))
|
||||
if nbucket == nil {
|
||||
return nil, fmt.Errorf("global namespace %s: %w", name, ErrGlobalNamespaceBucketNotFound)
|
||||
}
|
||||
|
||||
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
|
||||
if typeBucket == nil {
|
||||
return nil, fmt.Errorf("type bucket '%c': %w", target.Type, ErrTargetTypeBucketNotFound)
|
||||
}
|
||||
|
||||
rbucket := typeBucket.Bucket([]byte(target.Name))
|
||||
if rbucket == nil {
|
||||
return nil, fmt.Errorf("target name bucket %s: %w", target.Name, ErrTargetNameBucketNotFound)
|
||||
}
|
||||
|
||||
return rbucket, nil
|
||||
}
|
||||
|
||||
func getTargetBucketCreateIfEmpty(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
|
||||
cbucket := tx.Bucket(chainBucket)
|
||||
if cbucket == nil {
|
||||
return nil, ErrChainBucketNotFound
|
||||
}
|
||||
|
||||
nbucket := cbucket.Bucket([]byte(name))
|
||||
if nbucket == nil {
|
||||
var err error
|
||||
nbucket, err = cbucket.CreateBucket([]byte(name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a bucket for the global chain name %s: %w", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
|
||||
if typeBucket == nil {
|
||||
var err error
|
||||
typeBucket, err = cbucket.CreateBucket([]byte{byte(target.Type)})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a bucket for the target type '%c': %w", target.Type, err)
|
||||
}
|
||||
}
|
||||
|
||||
rbucket := typeBucket.Bucket([]byte(target.Name))
|
||||
if rbucket == nil {
|
||||
var err error
|
||||
rbucket, err = typeBucket.CreateBucket([]byte(target.Name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a bucket for the target name %s: %w", target.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return rbucket, nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) AddOverride(name chain.Name, target policyengine.Target, c *chain.Chain) (chain.ID, error) {
|
||||
if c.ID == "" {
|
||||
return "", fmt.Errorf("chain ID is not set")
|
||||
}
|
||||
|
||||
serializedChain := c.Bytes()
|
||||
|
||||
err := cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucketCreateIfEmpty(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rbuck.Put([]byte(c.ID), serializedChain)
|
||||
})
|
||||
|
||||
return c.ID, err
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) GetOverride(name chain.Name, target policyengine.Target, chainID chain.ID) (*chain.Chain, error) {
|
||||
var serializedChain []byte
|
||||
|
||||
if err := cs.db.View(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucket(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serializedChain = rbuck.Get([]byte(chainID))
|
||||
if serializedChain == nil {
|
||||
return ErrChainNotFound
|
||||
}
|
||||
serializedChain = slice.Copy(serializedChain)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &chain.Chain{}
|
||||
if err := json.Unmarshal(serializedChain, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) RemoveOverride(name chain.Name, target policyengine.Target, chainID chain.ID) error {
|
||||
return cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucket(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rbuck.Delete([]byte(chainID))
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) ListOverrides(name chain.Name, target policyengine.Target) ([]*chain.Chain, error) {
|
||||
var serializedChains [][]byte
|
||||
var serializedChain []byte
|
||||
if err := cs.db.View(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucket(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rbuck.ForEach(func(_, v []byte) error {
|
||||
serializedChain = slice.Copy(v)
|
||||
serializedChains = append(serializedChains, serializedChain)
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
if errors.Is(err, ErrGlobalNamespaceBucketNotFound) || errors.Is(err, ErrTargetNameBucketNotFound) {
|
||||
return []*chain.Chain{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
chains := make([]*chain.Chain, 0, len(serializedChains))
|
||||
for _, serializedChain = range serializedChains {
|
||||
c := &chain.Chain{}
|
||||
if err := json.Unmarshal(serializedChain, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
chains = append(chains, c)
|
||||
}
|
||||
return chains, nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) DropAllOverrides(name chain.Name) error {
|
||||
return cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.DeleteBucket([]byte(name))
|
||||
})
|
||||
}
|
30
pkg/ape/chainbase/inmemory.go
Normal file
30
pkg/ape/chainbase/inmemory.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
)
|
||||
|
||||
type inmemoryLocalOverrideStorage struct {
|
||||
engine.LocalOverrideStorage
|
||||
}
|
||||
|
||||
func NewInmemoryLocalOverrideDatabase() LocalOverrideDatabase {
|
||||
return &inmemoryLocalOverrideStorage{
|
||||
LocalOverrideStorage: inmemory.NewInmemoryLocalStorage(),
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *inmemoryLocalOverrideStorage) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *inmemoryLocalOverrideStorage) Open(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *inmemoryLocalOverrideStorage) Close() error {
|
||||
return nil
|
||||
}
|
22
pkg/ape/chainbase/interface.go
Normal file
22
pkg/ape/chainbase/interface.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
// DatabaseCore interface provides methods to initialize and manage local override storage
|
||||
// as database.
|
||||
type DatabaseCore interface {
|
||||
Init() error
|
||||
Open(context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// LocalOverrideDatabase interface provides methods to manage local override storage
|
||||
// as database and as the APE's local override storage.
|
||||
type LocalOverrideDatabase interface {
|
||||
DatabaseCore
|
||||
engine.LocalOverrideStorage
|
||||
}
|
67
pkg/ape/chainbase/option.go
Normal file
67
pkg/ape/chainbase/option.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
path string
|
||||
perm fs.FileMode
|
||||
noSync bool
|
||||
maxBatchDelay time.Duration
|
||||
maxBatchSize int
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
perm: os.ModePerm,
|
||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
func WithPath(path string) Option {
|
||||
return func(c *cfg) {
|
||||
c.path = path
|
||||
}
|
||||
}
|
||||
|
||||
func WithPerm(perm fs.FileMode) Option {
|
||||
return func(c *cfg) {
|
||||
c.perm = perm
|
||||
}
|
||||
}
|
||||
|
||||
func WithNoSync(noSync bool) Option {
|
||||
return func(c *cfg) {
|
||||
c.noSync = noSync
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxBatchDelay(maxBatchDelay time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxBatchDelay = maxBatchDelay
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxBatchSize(maxBatchSize int) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxBatchSize = maxBatchSize
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
}
|
||||
}
|
|
@ -7,7 +7,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
// Container groups information about the FrostFS container stored in the FrostFS network.
|
||||
|
@ -72,10 +71,3 @@ type EACLSource interface {
|
|||
// eACL table is not in source.
|
||||
GetEACL(cid.ID) (*EACL, error)
|
||||
}
|
||||
|
||||
// AccessPolicyEngineChainSource interface provides methods to access and manipulate
|
||||
// policy engine chain storage.
|
||||
type AccessPolicyEngineChainSource interface {
|
||||
// TODO (aarifullin): Better to use simpler interface instead CachedChainStorage.
|
||||
GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error)
|
||||
}
|
||||
|
|
|
@ -4,34 +4,24 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// extractCID extracts CID from the schema's pattern.
|
||||
// TODO (aarifullin): This is temporary solution should be replaced by
|
||||
// resource name validation.
|
||||
func extractCID(resource string) (cid.ID, error) {
|
||||
var cidStr string
|
||||
|
||||
// Sscanf requires to make tokens delimited by spaces.
|
||||
pattern := strings.Replace(nativeschema.ResourceFormatRootContainerObjects, "/", " ", -1)
|
||||
resource = strings.Replace(resource, "/", " ", -1)
|
||||
|
||||
if _, err := fmt.Sscanf(resource, pattern, &cidStr); err != nil {
|
||||
err = fmt.Errorf("could not parse the target name '%s' to CID: %w", resource, err)
|
||||
return cid.ID{}, err
|
||||
func apeTarget(chainTarget *control.ChainTarget) (engine.Target, error) {
|
||||
switch chainTarget.GetType() {
|
||||
case control.ChainTarget_CONTAINER:
|
||||
return engine.ContainerTarget(chainTarget.GetName()), nil
|
||||
case control.ChainTarget_NAMESPACE:
|
||||
return engine.NamespaceTarget(chainTarget.GetName()), nil
|
||||
default:
|
||||
}
|
||||
var cid cid.ID
|
||||
err := cid.DecodeString(cidStr)
|
||||
return cid, err
|
||||
return engine.Target{}, status.Error(codes.InvalidArgument,
|
||||
fmt.Errorf("target type is not supported: %s", chainTarget.GetType().String()).Error())
|
||||
}
|
||||
|
||||
func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainLocalOverrideRequest) (*control.AddChainLocalOverrideResponse, error) {
|
||||
|
@ -39,34 +29,23 @@ func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainL
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
var chain apechain.Chain
|
||||
if err = chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
|
||||
if err := chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
s.apeChainCounter.Add(1)
|
||||
// TODO (aarifullin): the such chain id is not well-designed yet.
|
||||
if chain.ID == "" {
|
||||
chain.ID = apechain.ID(fmt.Sprintf("%s:%d", apechain.Ingress, s.apeChainCounter.Load()))
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
if _, err = src.LocalStorage().AddOverride(apechain.Ingress, engine.ContainerTarget(resource), &chain); err != nil {
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = s.localOverrideStorage.LocalStorage().AddOverride(apechain.Ingress, target, &chain); err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
|
||||
|
@ -87,23 +66,11 @@ func (s *Server) GetChainLocalOverride(_ context.Context, req *control.GetChainL
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
chain, err := src.LocalStorage().GetOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId()))
|
||||
chain, err := s.localOverrideStorage.LocalStorage().GetOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId()))
|
||||
if err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
|
@ -125,23 +92,12 @@ func (s *Server) ListChainLocalOverrides(_ context.Context, req *control.ListCha
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
chains, err := src.LocalStorage().ListOverrides(apechain.Ingress, engine.ContainerTarget(resource))
|
||||
chains, err := s.localOverrideStorage.LocalStorage().ListOverrides(apechain.Ingress, target)
|
||||
if err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
|
@ -167,23 +123,12 @@ func (s *Server) RemoveChainLocalOverride(_ context.Context, req *control.Remove
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
if err = src.LocalStorage().RemoveOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId())); err != nil {
|
||||
if err = s.localOverrideStorage.LocalStorage().RemoveOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId())); err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
resp := &control.RemoveChainLocalOverrideResponse{
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
// Server is an entity that serves
|
||||
|
@ -51,6 +52,14 @@ type NodeState interface {
|
|||
ForceMaintenance() error
|
||||
}
|
||||
|
||||
// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine
|
||||
// interface methods.
|
||||
type LocalOverrideStorageDecorator interface {
|
||||
// LocalStorage method can be decorated by using sync primitives in the case if the local
|
||||
// override storage state should be consistent for chain router.
|
||||
LocalStorage() policyengine.LocalOverrideStorage
|
||||
}
|
||||
|
||||
// Option of the Server's constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
|
@ -65,7 +74,7 @@ type cfg struct {
|
|||
|
||||
cnrSrc container.Source
|
||||
|
||||
apeChainSrc container.AccessPolicyEngineChainSource
|
||||
localOverrideStorage LocalOverrideStorageDecorator
|
||||
|
||||
replicator *replicator.Replicator
|
||||
|
||||
|
@ -160,10 +169,10 @@ func WithTreeService(s TreeService) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithAPEChainSource returns the option to set access policy engine
|
||||
// chain source.
|
||||
func WithAPEChainSource(apeChainSrc container.AccessPolicyEngineChainSource) Option {
|
||||
// WithLocalOverrideStorage returns the option to set access policy engine
|
||||
// chain override storage.
|
||||
func WithLocalOverrideStorage(localOverrideStorage LocalOverrideStorageDecorator) Option {
|
||||
return func(c *cfg) {
|
||||
c.apeChainSrc = apeChainSrc
|
||||
c.localOverrideStorage = localOverrideStorage
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
package acl
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -12,34 +10,25 @@ import (
|
|||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
var errAPEChainNoSource = errors.New("could not get ape chain source for the container")
|
||||
|
||||
type apeCheckerImpl struct {
|
||||
log *logger.Logger
|
||||
apeSrc container.AccessPolicyEngineChainSource
|
||||
chainRouter policyengine.ChainRouter
|
||||
}
|
||||
|
||||
func NewAPEChecker(log *logger.Logger, apeSrc container.AccessPolicyEngineChainSource) v2.APEChainChecker {
|
||||
func NewAPEChecker(log *logger.Logger, chainRouter policyengine.ChainRouter) v2.APEChainChecker {
|
||||
return &apeCheckerImpl{
|
||||
log: log,
|
||||
apeSrc: apeSrc,
|
||||
chainRouter: chainRouter,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *apeCheckerImpl) CheckIfRequestPermitted(reqInfo v2.RequestInfo) error {
|
||||
cnr := reqInfo.ContainerID()
|
||||
|
||||
chainCache, err := c.apeSrc.GetChainSource(cnr)
|
||||
if err != nil {
|
||||
return errAPEChainNoSource
|
||||
}
|
||||
|
||||
request := new(Request)
|
||||
request.FromRequestInfo(reqInfo)
|
||||
|
||||
cnrTarget := getResource(reqInfo).Name()
|
||||
|
||||
status, ruleFound, err := chainCache.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
|
||||
status, ruleFound, err := c.chainRouter.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue