From 0f45e3d344da440797800c4adaa3f90f4e7c787b Mon Sep 17 00:00:00 2001 From: aarifullin Date: Mon, 20 Nov 2023 19:35:16 +0300 Subject: [PATCH] [#804] ape: Implement boltdb storage for local overrides Signed-off-by: Airat Arifullin --- cmd/frostfs-node/config.go | 39 ++- cmd/frostfs-node/config/node/config.go | 55 +++- cmd/frostfs-node/control.go | 2 +- cmd/frostfs-node/main.go | 6 + cmd/frostfs-node/object.go | 4 +- cmd/frostfs-node/policy_engine.go | 70 ++++-- internal/logs/logs.go | 2 + pkg/ape/chainbase/boltdb.go | 250 +++++++++++++++++++ pkg/ape/chainbase/inmemory.go | 30 +++ pkg/ape/chainbase/interface.go | 22 ++ pkg/ape/chainbase/option.go | 67 +++++ pkg/core/container/storage.go | 8 - pkg/services/control/server/policy_engine.go | 105 ++------ pkg/services/control/server/server.go | 19 +- pkg/services/object/acl/ape.go | 23 +- 15 files changed, 560 insertions(+), 142 deletions(-) create mode 100644 pkg/ape/chainbase/boltdb.go create mode 100644 pkg/ape/chainbase/inmemory.go create mode 100644 pkg/ape/chainbase/interface.go create mode 100644 pkg/ape/chainbase/option.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 2d80a4115..82b01b563 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -29,6 +29,7 @@ import ( replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -68,6 +69,7 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" neogoutil "github.com/nspcc-dev/neo-go/pkg/util" "github.com/panjf2000/ants/v2" @@ -518,10 +520,7 @@ type cfgObject struct { eaclSource container.EACLSource - // Access policy chain source is used by object service to - // check for operation permissions but this source is also shared with - // control service that dispatches local overrides. - apeChainSource container.AccessPolicyEngineChainSource + cfgAccessPolicyEngine cfgAccessPolicyEngine pool cfgObjectRoutines @@ -542,6 +541,10 @@ type cfgLocalStorage struct { localStorage *engine.StorageEngine } +type cfgAccessPolicyEngine struct { + accessPolicyEngine *accessPolicyEngine +} + type cfgObjectRoutines struct { putRemote *ants.Pool @@ -970,6 +973,34 @@ func initLocalStorage(ctx context.Context, c *cfg) { }) } +func initAccessPolicyEngine(_ context.Context, c *cfg) { + var localOverrideDB chainbase.LocalOverrideDatabase + if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" { + c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed) + localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase() + } else { + localOverrideDB = chainbase.NewBoltLocalOverrideDatabase( + chainbase.WithLogger(c.log), + chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()), + chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()), + chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()), + ) + } + + morphRuleStorage := inmemory.NewInmemoryMorphRuleChainStorage() + + ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB) + c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape + + c.onShutdown(func() { + if err := ape.LocalOverrideDatabaseCore().Close(); err != nil { + c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure, + zap.Error(err), + ) + } + }) +} + func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error diff --git a/cmd/frostfs-node/config/node/config.go b/cmd/frostfs-node/config/node/config.go index 9dfe8ddf4..ac76ad47e 100644 --- a/cmd/frostfs-node/config/node/config.go +++ b/cmd/frostfs-node/config/node/config.go @@ -2,6 +2,7 @@ package nodeconfig import ( "fmt" + "io/fs" "os" "strconv" "time" @@ -30,11 +31,18 @@ type NotificationConfig struct { cfg *config.Config } +// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section +// which provides access to persistent policy rules storage configuration of node. +type PersistentPolicyRulesConfig struct { + cfg *config.Config +} + const ( - subsection = "node" - persistentSessionsSubsection = "persistent_sessions" - persistentStateSubsection = "persistent_state" - notificationSubsection = "notification" + subsection = "node" + persistentSessionsSubsection = "persistent_sessions" + persistentStateSubsection = "persistent_state" + notificationSubsection = "notification" + persistentPolicyRulesSubsection = "persistent_policy_rules" attributePrefix = "attribute" @@ -245,3 +253,42 @@ func (n NotificationConfig) KeyPath() string { func (n NotificationConfig) CAPath() string { return config.StringSafe(n.cfg, "ca") } + +const ( + // PermDefault is a default permission bits for local override storage file. + PermDefault = 0o644 +) + +// PersistentPolicyRules returns structure that provides access to "persistent_policy_rules" +// subsection of "node" section. +func PersistentPolicyRules(c *config.Config) PersistentPolicyRulesConfig { + return PersistentPolicyRulesConfig{ + c.Sub(subsection).Sub(persistentPolicyRulesSubsection), + } +} + +// Path returns the value of "path" config parameter. +// +// Returns empty string if missing, for compatibility with older configurations. +func (l PersistentPolicyRulesConfig) Path() string { + return config.StringSafe(l.cfg, "path") +} + +// Perm returns the value of "perm" config parameter as a fs.FileMode. +// +// Returns PermDefault if the value is not a positive number. +func (l PersistentPolicyRulesConfig) Perm() fs.FileMode { + p := config.UintSafe((*config.Config)(l.cfg), "perm") + if p == 0 { + p = PermDefault + } + + return fs.FileMode(p) +} + +// NoSync returns the value of "no_sync" config parameter as a bool value. +// +// Returns false if the value is not a boolean. +func (l PersistentPolicyRulesConfig) NoSync() bool { + return config.BoolSafe((*config.Config)(l.cfg), "no_sync") +} diff --git a/cmd/frostfs-node/control.go b/cmd/frostfs-node/control.go index 30d644803..4f5c5d780 100644 --- a/cmd/frostfs-node/control.go +++ b/cmd/frostfs-node/control.go @@ -51,7 +51,7 @@ func initControlService(c *cfg) { controlSvc.WithTreeService(treeSynchronizer{ c.treeService, }), - controlSvc.WithAPEChainSource(c.cfgObject.apeChainSource), + controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine), ) lis, err := net.Listen("tcp", endpoint) diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index 2791efcef..d4dfb7e60 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -98,6 +98,12 @@ func initApp(ctx context.Context, c *cfg) { fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx)) }) + initAccessPolicyEngine(ctx, c) + initAndLog(c, "access policy engine", func(c *cfg) { + fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Open(ctx)) + fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Init()) + }) + initAndLog(c, "gRPC", initGRPC) initAndLog(c, "netmap", func(c *cfg) { initNetmapService(ctx, c) }) initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) }) diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 79f492190..59827179b 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -157,8 +157,6 @@ func initObjectService(c *cfg) { c.replicator = createReplicator(c, keyStorage, c.bgClientCache) - c.cfgObject.apeChainSource = NewAPESource() - addPolicer(c, keyStorage, c.bgClientCache) traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) @@ -426,7 +424,7 @@ func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter, irFet c.cfgObject.eaclSource, eaclSDK.NewValidator(), ls), - acl.NewAPEChecker(c.log, c.cfgObject.apeChainSource), + acl.NewAPEChecker(c.log, c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.chainRouter), c.cfgObject.cnrSource, v2.WithLogger(c.log), ) diff --git a/cmd/frostfs-node/policy_engine.go b/cmd/frostfs-node/policy_engine.go index 248cddb11..d04322c2e 100644 --- a/cmd/frostfs-node/policy_engine.go +++ b/cmd/frostfs-node/policy_engine.go @@ -3,33 +3,63 @@ package main import ( "sync" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase" + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" - "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory" + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/resource" ) -type apeChainSourceImpl struct { - mtx sync.Mutex - localChainStorage map[cid.ID]engine.LocalOverrideEngine +type accessPolicyEngine struct { + mtx sync.RWMutex + + chainRouter engine.ChainRouter + + morphChainStorage engine.MorphRuleChainStorage + + localOverrideDatabase chainbase.LocalOverrideDatabase } -func NewAPESource() container.AccessPolicyEngineChainSource { - return &apeChainSourceImpl{ - localChainStorage: make(map[cid.ID]engine.LocalOverrideEngine), +var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil) + +func newAccessPolicyEngine( + morphChainStorage engine.MorphRuleChainStorage, + localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine { + return &accessPolicyEngine{ + chainRouter: engine.NewDefaultChainRouterWithLocalOverrides( + morphChainStorage, + localOverrideDatabase, + ), + + morphChainStorage: morphChainStorage, + + localOverrideDatabase: localOverrideDatabase, } } -var _ container.AccessPolicyEngineChainSource = (*apeChainSourceImpl)(nil) +func (a *accessPolicyEngine) IsAllowed(name chain.Name, target engine.RequestTarget, r resource.Request) (status chain.Status, found bool, err error) { + a.mtx.RLock() + defer a.mtx.RUnlock() -func (c *apeChainSourceImpl) GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error) { - c.mtx.Lock() - defer c.mtx.Unlock() - - s, ok := c.localChainStorage[cid] - if ok { - return s, nil - } - c.localChainStorage[cid] = inmemory.NewInMemoryLocalOverrides() - return c.localChainStorage[cid], nil + return a.chainRouter.IsAllowed(name, target, r) +} + +func (a *accessPolicyEngine) MorphRuleChainStorage() engine.MorphRuleChainStorage { + a.mtx.Lock() + defer a.mtx.Unlock() + + return a.morphChainStorage +} + +func (a *accessPolicyEngine) LocalStorage() engine.LocalOverrideStorage { + a.mtx.Lock() + defer a.mtx.Unlock() + + return a.localOverrideDatabase +} + +func (a *accessPolicyEngine) LocalOverrideDatabaseCore() chainbase.DatabaseCore { + a.mtx.Lock() + defer a.mtx.Unlock() + + return a.localOverrideDatabase } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index e1c2cfdee..befa7342a 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -429,7 +429,9 @@ const ( FrostFSNodeFailedToAttachShardToEngine = "failed to attach shard to engine" FrostFSNodeShardAttachedToEngine = "shard attached to engine" FrostFSNodeClosingComponentsOfTheStorageEngine = "closing components of the storage engine..." + FrostFSNodeAccessPolicyEngineClosingFailure = "ape closing failure" FrostFSNodeStorageEngineClosingFailure = "storage engine closing failure" + FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed = "persistent rule storage db path is not set: in-memory will be used" FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully = "all components of the storage engine closed successfully" FrostFSNodeBootstrappingWithTheMaintenanceState = "bootstrapping with the maintenance state" FrostFSNodeBootstrappingWithOnlineState = "bootstrapping with online state" diff --git a/pkg/ape/chainbase/boltdb.go b/pkg/ape/chainbase/boltdb.go new file mode 100644 index 000000000..c6cd2c014 --- /dev/null +++ b/pkg/ape/chainbase/boltdb.go @@ -0,0 +1,250 @@ +package chainbase + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "path/filepath" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" + policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" + "github.com/nspcc-dev/neo-go/pkg/util/slice" + "go.etcd.io/bbolt" +) + +type boltLocalOverrideStorage struct { + *cfg + + db *bbolt.DB +} + +var ( + chainBucket = []byte{0} +) + +var ( + ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found") + + ErrChainNotFound = logicerr.New("chain has not been found") + + ErrGlobalNamespaceBucketNotFound = logicerr.New("global namespace bucket has not been found") + + ErrTargetTypeBucketNotFound = logicerr.New("target type bucket has not been found") + + ErrTargetNameBucketNotFound = logicerr.New("target name bucket has not been found") +) + +// NewBoltLocalOverrideDatabase returns storage wrapper for storing access policy engine +// local overrides. +// +// chain storage (chainBucket): +// -> global namespace bucket (nBucket): +// --> target bucket (tBucket) +// ---> target name (resource) bucket (rBucket): +// +// | Key | Value | +// x---------------------x-------------------x +// | chain id (string) | serialized chain | +// x---------------------x-------------------x +// +//nolint:godot +func NewBoltLocalOverrideDatabase(opts ...Option) LocalOverrideDatabase { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &boltLocalOverrideStorage{ + cfg: c, + } +} + +func (cs *boltLocalOverrideStorage) Init() error { + return cs.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(chainBucket) + return err + }) +} + +func (cs *boltLocalOverrideStorage) Open(context.Context) error { + err := util.MkdirAllX(filepath.Dir(cs.path), cs.perm) + if err != nil { + return fmt.Errorf("can't create dir %s for the chain DB: %w", cs.path, err) + } + + opts := *bbolt.DefaultOptions + opts.NoSync = cs.noSync + opts.Timeout = 100 * time.Millisecond + + cs.db, err = bbolt.Open(cs.path, cs.perm, &opts) + if err != nil { + return fmt.Errorf("can't open the chain DB: %w", err) + } + + cs.db.MaxBatchSize = cs.maxBatchSize + cs.db.MaxBatchDelay = cs.maxBatchDelay + + return nil +} + +func (cs *boltLocalOverrideStorage) Close() error { + var err error + if cs.db != nil { + err = cs.db.Close() + } + return err +} + +func getTargetBucket(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) { + cbucket := tx.Bucket(chainBucket) + if cbucket == nil { + return nil, ErrChainBucketNotFound + } + + nbucket := cbucket.Bucket([]byte(name)) + if nbucket == nil { + return nil, fmt.Errorf("global namespace %s: %w", name, ErrGlobalNamespaceBucketNotFound) + } + + typeBucket := nbucket.Bucket([]byte{byte(target.Type)}) + if typeBucket == nil { + return nil, fmt.Errorf("type bucket '%c': %w", target.Type, ErrTargetTypeBucketNotFound) + } + + rbucket := typeBucket.Bucket([]byte(target.Name)) + if rbucket == nil { + return nil, fmt.Errorf("target name bucket %s: %w", target.Name, ErrTargetNameBucketNotFound) + } + + return rbucket, nil +} + +func getTargetBucketCreateIfEmpty(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) { + cbucket := tx.Bucket(chainBucket) + if cbucket == nil { + return nil, ErrChainBucketNotFound + } + + nbucket := cbucket.Bucket([]byte(name)) + if nbucket == nil { + var err error + nbucket, err = cbucket.CreateBucket([]byte(name)) + if err != nil { + return nil, fmt.Errorf("could not create a bucket for the global chain name %s: %w", name, err) + } + } + + typeBucket := nbucket.Bucket([]byte{byte(target.Type)}) + if typeBucket == nil { + var err error + typeBucket, err = cbucket.CreateBucket([]byte{byte(target.Type)}) + if err != nil { + return nil, fmt.Errorf("could not create a bucket for the target type '%c': %w", target.Type, err) + } + } + + rbucket := typeBucket.Bucket([]byte(target.Name)) + if rbucket == nil { + var err error + rbucket, err = typeBucket.CreateBucket([]byte(target.Name)) + if err != nil { + return nil, fmt.Errorf("could not create a bucket for the target name %s: %w", target.Name, err) + } + } + + return rbucket, nil +} + +func (cs *boltLocalOverrideStorage) AddOverride(name chain.Name, target policyengine.Target, c *chain.Chain) (chain.ID, error) { + if c.ID == "" { + return "", fmt.Errorf("chain ID is not set") + } + + serializedChain := c.Bytes() + + err := cs.db.Update(func(tx *bbolt.Tx) error { + rbuck, err := getTargetBucketCreateIfEmpty(tx, name, target) + if err != nil { + return err + } + return rbuck.Put([]byte(c.ID), serializedChain) + }) + + return c.ID, err +} + +func (cs *boltLocalOverrideStorage) GetOverride(name chain.Name, target policyengine.Target, chainID chain.ID) (*chain.Chain, error) { + var serializedChain []byte + + if err := cs.db.View(func(tx *bbolt.Tx) error { + rbuck, err := getTargetBucket(tx, name, target) + if err != nil { + return err + } + serializedChain = rbuck.Get([]byte(chainID)) + if serializedChain == nil { + return ErrChainNotFound + } + serializedChain = slice.Copy(serializedChain) + return nil + }); err != nil { + return nil, err + } + + c := &chain.Chain{} + if err := json.Unmarshal(serializedChain, c); err != nil { + return nil, err + } + return c, nil +} + +func (cs *boltLocalOverrideStorage) RemoveOverride(name chain.Name, target policyengine.Target, chainID chain.ID) error { + return cs.db.Update(func(tx *bbolt.Tx) error { + rbuck, err := getTargetBucket(tx, name, target) + if err != nil { + return err + } + return rbuck.Delete([]byte(chainID)) + }) +} + +func (cs *boltLocalOverrideStorage) ListOverrides(name chain.Name, target policyengine.Target) ([]*chain.Chain, error) { + var serializedChains [][]byte + var serializedChain []byte + if err := cs.db.View(func(tx *bbolt.Tx) error { + rbuck, err := getTargetBucket(tx, name, target) + if err != nil { + return err + } + return rbuck.ForEach(func(_, v []byte) error { + serializedChain = slice.Copy(v) + serializedChains = append(serializedChains, serializedChain) + return nil + }) + }); err != nil { + if errors.Is(err, ErrGlobalNamespaceBucketNotFound) || errors.Is(err, ErrTargetNameBucketNotFound) { + return []*chain.Chain{}, nil + } + return nil, err + } + chains := make([]*chain.Chain, 0, len(serializedChains)) + for _, serializedChain = range serializedChains { + c := &chain.Chain{} + if err := json.Unmarshal(serializedChain, c); err != nil { + return nil, err + } + chains = append(chains, c) + } + return chains, nil +} + +func (cs *boltLocalOverrideStorage) DropAllOverrides(name chain.Name) error { + return cs.db.Update(func(tx *bbolt.Tx) error { + return tx.DeleteBucket([]byte(name)) + }) +} diff --git a/pkg/ape/chainbase/inmemory.go b/pkg/ape/chainbase/inmemory.go new file mode 100644 index 000000000..27712d959 --- /dev/null +++ b/pkg/ape/chainbase/inmemory.go @@ -0,0 +1,30 @@ +package chainbase + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory" +) + +type inmemoryLocalOverrideStorage struct { + engine.LocalOverrideStorage +} + +func NewInmemoryLocalOverrideDatabase() LocalOverrideDatabase { + return &inmemoryLocalOverrideStorage{ + LocalOverrideStorage: inmemory.NewInmemoryLocalStorage(), + } +} + +func (cs *inmemoryLocalOverrideStorage) Init() error { + return nil +} + +func (cs *inmemoryLocalOverrideStorage) Open(_ context.Context) error { + return nil +} + +func (cs *inmemoryLocalOverrideStorage) Close() error { + return nil +} diff --git a/pkg/ape/chainbase/interface.go b/pkg/ape/chainbase/interface.go new file mode 100644 index 000000000..ee445f22c --- /dev/null +++ b/pkg/ape/chainbase/interface.go @@ -0,0 +1,22 @@ +package chainbase + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" +) + +// DatabaseCore interface provides methods to initialize and manage local override storage +// as database. +type DatabaseCore interface { + Init() error + Open(context.Context) error + Close() error +} + +// LocalOverrideDatabase interface provides methods to manage local override storage +// as database and as the APE's local override storage. +type LocalOverrideDatabase interface { + DatabaseCore + engine.LocalOverrideStorage +} diff --git a/pkg/ape/chainbase/option.go b/pkg/ape/chainbase/option.go new file mode 100644 index 000000000..e547701fb --- /dev/null +++ b/pkg/ape/chainbase/option.go @@ -0,0 +1,67 @@ +package chainbase + +import ( + "io/fs" + "os" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +type Option func(*cfg) + +type cfg struct { + path string + perm fs.FileMode + noSync bool + maxBatchDelay time.Duration + maxBatchSize int + log *logger.Logger +} + +func defaultCfg() *cfg { + return &cfg{ + perm: os.ModePerm, + maxBatchDelay: bbolt.DefaultMaxBatchDelay, + maxBatchSize: bbolt.DefaultMaxBatchSize, + log: &logger.Logger{Logger: zap.L()}, + } +} + +func WithPath(path string) Option { + return func(c *cfg) { + c.path = path + } +} + +func WithPerm(perm fs.FileMode) Option { + return func(c *cfg) { + c.perm = perm + } +} + +func WithNoSync(noSync bool) Option { + return func(c *cfg) { + c.noSync = noSync + } +} + +func WithMaxBatchDelay(maxBatchDelay time.Duration) Option { + return func(c *cfg) { + c.maxBatchDelay = maxBatchDelay + } +} + +func WithMaxBatchSize(maxBatchSize int) Option { + return func(c *cfg) { + c.maxBatchSize = maxBatchSize + } +} + +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.log = l + } +} diff --git a/pkg/core/container/storage.go b/pkg/core/container/storage.go index cc358b436..69854f495 100644 --- a/pkg/core/container/storage.go +++ b/pkg/core/container/storage.go @@ -7,7 +7,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" - "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" ) // Container groups information about the FrostFS container stored in the FrostFS network. @@ -72,10 +71,3 @@ type EACLSource interface { // eACL table is not in source. GetEACL(cid.ID) (*EACL, error) } - -// AccessPolicyEngineChainSource interface provides methods to access and manipulate -// policy engine chain storage. -type AccessPolicyEngineChainSource interface { - // TODO (aarifullin): Better to use simpler interface instead CachedChainStorage. - GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error) -} diff --git a/pkg/services/control/server/policy_engine.go b/pkg/services/control/server/policy_engine.go index cbb6ba803..7ffa392a8 100644 --- a/pkg/services/control/server/policy_engine.go +++ b/pkg/services/control/server/policy_engine.go @@ -4,34 +4,24 @@ import ( "context" "errors" "fmt" - "strings" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" - nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -// extractCID extracts CID from the schema's pattern. -// TODO (aarifullin): This is temporary solution should be replaced by -// resource name validation. -func extractCID(resource string) (cid.ID, error) { - var cidStr string - - // Sscanf requires to make tokens delimited by spaces. - pattern := strings.Replace(nativeschema.ResourceFormatRootContainerObjects, "/", " ", -1) - resource = strings.Replace(resource, "/", " ", -1) - - if _, err := fmt.Sscanf(resource, pattern, &cidStr); err != nil { - err = fmt.Errorf("could not parse the target name '%s' to CID: %w", resource, err) - return cid.ID{}, err +func apeTarget(chainTarget *control.ChainTarget) (engine.Target, error) { + switch chainTarget.GetType() { + case control.ChainTarget_CONTAINER: + return engine.ContainerTarget(chainTarget.GetName()), nil + case control.ChainTarget_NAMESPACE: + return engine.NamespaceTarget(chainTarget.GetName()), nil + default: } - var cid cid.ID - err := cid.DecodeString(cidStr) - return cid, err + return engine.Target{}, status.Error(codes.InvalidArgument, + fmt.Errorf("target type is not supported: %s", chainTarget.GetType().String()).Error()) } func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainLocalOverrideRequest) (*control.AddChainLocalOverrideResponse, error) { @@ -39,34 +29,23 @@ func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainL return nil, status.Error(codes.PermissionDenied, err.Error()) } - target := req.GetBody().GetTarget() - if target.Type != control.ChainTarget_CONTAINER { - return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error()) - } - - cid, err := extractCID(target.GetName()) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - var chain apechain.Chain - if err = chain.DecodeBytes(req.GetBody().GetChain()); err != nil { + if err := chain.DecodeBytes(req.GetBody().GetChain()); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } - src, err := s.apeChainSrc.GetChainSource(cid) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - s.apeChainCounter.Add(1) // TODO (aarifullin): the such chain id is not well-designed yet. if chain.ID == "" { chain.ID = apechain.ID(fmt.Sprintf("%s:%d", apechain.Ingress, s.apeChainCounter.Load())) } - resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString()) - if _, err = src.LocalStorage().AddOverride(apechain.Ingress, engine.ContainerTarget(resource), &chain); err != nil { + target, err := apeTarget(req.GetBody().GetTarget()) + if err != nil { + return nil, err + } + + if _, err = s.localOverrideStorage.LocalStorage().AddOverride(apechain.Ingress, target, &chain); err != nil { return nil, status.Error(getCodeByLocalStorageErr(err), err.Error()) } @@ -87,23 +66,11 @@ func (s *Server) GetChainLocalOverride(_ context.Context, req *control.GetChainL return nil, status.Error(codes.PermissionDenied, err.Error()) } - target := req.GetBody().GetTarget() - if target.Type != control.ChainTarget_CONTAINER { - return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error()) - } - - cid, err := extractCID(target.GetName()) + target, err := apeTarget(req.GetBody().GetTarget()) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, err } - - src, err := s.apeChainSrc.GetChainSource(cid) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString()) - chain, err := src.LocalStorage().GetOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId())) + chain, err := s.localOverrideStorage.LocalStorage().GetOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId())) if err != nil { return nil, status.Error(getCodeByLocalStorageErr(err), err.Error()) } @@ -125,23 +92,12 @@ func (s *Server) ListChainLocalOverrides(_ context.Context, req *control.ListCha return nil, status.Error(codes.PermissionDenied, err.Error()) } - target := req.GetBody().GetTarget() - if target.Type != control.ChainTarget_CONTAINER { - return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error()) - } - - cid, err := extractCID(target.GetName()) + target, err := apeTarget(req.GetBody().GetTarget()) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, err } - src, err := s.apeChainSrc.GetChainSource(cid) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString()) - chains, err := src.LocalStorage().ListOverrides(apechain.Ingress, engine.ContainerTarget(resource)) + chains, err := s.localOverrideStorage.LocalStorage().ListOverrides(apechain.Ingress, target) if err != nil { return nil, status.Error(getCodeByLocalStorageErr(err), err.Error()) } @@ -167,23 +123,12 @@ func (s *Server) RemoveChainLocalOverride(_ context.Context, req *control.Remove return nil, status.Error(codes.PermissionDenied, err.Error()) } - target := req.GetBody().GetTarget() - if target.Type != control.ChainTarget_CONTAINER { - return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error()) - } - - cid, err := extractCID(target.GetName()) + target, err := apeTarget(req.GetBody().GetTarget()) if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, err } - src, err := s.apeChainSrc.GetChainSource(cid) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString()) - if err = src.LocalStorage().RemoveOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId())); err != nil { + if err = s.localOverrideStorage.LocalStorage().RemoveOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId())); err != nil { return nil, status.Error(getCodeByLocalStorageErr(err), err.Error()) } resp := &control.RemoveChainLocalOverrideResponse{ diff --git a/pkg/services/control/server/server.go b/pkg/services/control/server/server.go index 48bc85200..7cfa93f05 100644 --- a/pkg/services/control/server/server.go +++ b/pkg/services/control/server/server.go @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" ) // Server is an entity that serves @@ -51,6 +52,14 @@ type NodeState interface { ForceMaintenance() error } +// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine +// interface methods. +type LocalOverrideStorageDecorator interface { + // LocalStorage method can be decorated by using sync primitives in the case if the local + // override storage state should be consistent for chain router. + LocalStorage() policyengine.LocalOverrideStorage +} + // Option of the Server's constructor. type Option func(*cfg) @@ -65,7 +74,7 @@ type cfg struct { cnrSrc container.Source - apeChainSrc container.AccessPolicyEngineChainSource + localOverrideStorage LocalOverrideStorageDecorator replicator *replicator.Replicator @@ -160,10 +169,10 @@ func WithTreeService(s TreeService) Option { } } -// WithAPEChainSource returns the option to set access policy engine -// chain source. -func WithAPEChainSource(apeChainSrc container.AccessPolicyEngineChainSource) Option { +// WithLocalOverrideStorage returns the option to set access policy engine +// chain override storage. +func WithLocalOverrideStorage(localOverrideStorage LocalOverrideStorageDecorator) Option { return func(c *cfg) { - c.apeChainSrc = apeChainSrc + c.localOverrideStorage = localOverrideStorage } } diff --git a/pkg/services/object/acl/ape.go b/pkg/services/object/acl/ape.go index a48bb5c9f..e2a884c96 100644 --- a/pkg/services/object/acl/ape.go +++ b/pkg/services/object/acl/ape.go @@ -1,10 +1,8 @@ package acl import ( - "errors" "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -12,34 +10,25 @@ import ( policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" ) -var errAPEChainNoSource = errors.New("could not get ape chain source for the container") - type apeCheckerImpl struct { - log *logger.Logger - apeSrc container.AccessPolicyEngineChainSource + log *logger.Logger + chainRouter policyengine.ChainRouter } -func NewAPEChecker(log *logger.Logger, apeSrc container.AccessPolicyEngineChainSource) v2.APEChainChecker { +func NewAPEChecker(log *logger.Logger, chainRouter policyengine.ChainRouter) v2.APEChainChecker { return &apeCheckerImpl{ - log: log, - apeSrc: apeSrc, + log: log, + chainRouter: chainRouter, } } func (c *apeCheckerImpl) CheckIfRequestPermitted(reqInfo v2.RequestInfo) error { - cnr := reqInfo.ContainerID() - - chainCache, err := c.apeSrc.GetChainSource(cnr) - if err != nil { - return errAPEChainNoSource - } - request := new(Request) request.FromRequestInfo(reqInfo) cnrTarget := getResource(reqInfo).Name() - status, ruleFound, err := chainCache.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request) + status, ruleFound, err := c.chainRouter.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request) if err != nil { return err }