ape: Implement boltdb storage for local overrides #820

Merged
fyrchik merged 1 commit from aarifullin/frostfs-node:feature/804_override_storage into master 2023-12-11 09:41:56 +00:00
15 changed files with 560 additions and 142 deletions

View file

@ -29,6 +29,7 @@ import (
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -68,6 +69,7 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2"
@ -518,10 +520,7 @@ type cfgObject struct {
eaclSource container.EACLSource
// Access policy chain source is used by object service to
// check for operation permissions but this source is also shared with
// control service that dispatches local overrides.
apeChainSource container.AccessPolicyEngineChainSource
cfgAccessPolicyEngine cfgAccessPolicyEngine
pool cfgObjectRoutines
@ -542,6 +541,10 @@ type cfgLocalStorage struct {
localStorage *engine.StorageEngine
}
type cfgAccessPolicyEngine struct {
accessPolicyEngine *accessPolicyEngine
}
type cfgObjectRoutines struct {
putRemote *ants.Pool
@ -970,6 +973,34 @@ func initLocalStorage(ctx context.Context, c *cfg) {
})
}
func initAccessPolicyEngine(_ context.Context, c *cfg) {
var localOverrideDB chainbase.LocalOverrideDatabase
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
dstepanov-yadro marked this conversation as resolved Outdated

Info -> Warn

Info -> Warn

Fixed

Fixed
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
} else {
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
chainbase.WithLogger(c.log),
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
)
}
morphRuleStorage := inmemory.NewInmemoryMorphRuleChainStorage()
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
c.onShutdown(func() {
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure,
zap.Error(err),
)
}
})
}
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
var err error

View file

@ -2,6 +2,7 @@ package nodeconfig
import (
"fmt"
"io/fs"
"os"
"strconv"
"time"
@ -30,11 +31,18 @@ type NotificationConfig struct {
cfg *config.Config
}
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
// which provides access to persistent policy rules storage configuration of node.
type PersistentPolicyRulesConfig struct {
cfg *config.Config
}
const (
subsection = "node"
persistentSessionsSubsection = "persistent_sessions"
persistentStateSubsection = "persistent_state"
notificationSubsection = "notification"
subsection = "node"
persistentSessionsSubsection = "persistent_sessions"
persistentStateSubsection = "persistent_state"
fyrchik marked this conversation as resolved Outdated

It is too generic for a top-level name, can we use sth like persistent_policy_rules?

It is too generic for a top-level name, can we use sth like `persistent_policy_rules`?

Fixed

Fixed
notificationSubsection = "notification"
persistentPolicyRulesSubsection = "persistent_policy_rules"
attributePrefix = "attribute"
@ -245,3 +253,42 @@ func (n NotificationConfig) KeyPath() string {
func (n NotificationConfig) CAPath() string {
return config.StringSafe(n.cfg, "ca")
}
const (
fyrchik marked this conversation as resolved Outdated

Why not 0o640?

Why not `0o640`?

Fixed

Fixed
// PermDefault is a default permission bits for local override storage file.
PermDefault = 0o644
)
// PersistentPolicyRules returns structure that provides access to "persistent_policy_rules"
// subsection of "node" section.
func PersistentPolicyRules(c *config.Config) PersistentPolicyRulesConfig {
return PersistentPolicyRulesConfig{
c.Sub(subsection).Sub(persistentPolicyRulesSubsection),
}
}
// Path returns the value of "path" config parameter.
//
// Returns empty string if missing, for compatibility with older configurations.
func (l PersistentPolicyRulesConfig) Path() string {
return config.StringSafe(l.cfg, "path")
}
// Perm returns the value of "perm" config parameter as a fs.FileMode.
//
// Returns PermDefault if the value is not a positive number.
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
p := config.UintSafe((*config.Config)(l.cfg), "perm")
if p == 0 {
p = PermDefault
}
return fs.FileMode(p)
}
// NoSync returns the value of "no_sync" config parameter as a bool value.
//
// Returns false if the value is not a boolean.
func (l PersistentPolicyRulesConfig) NoSync() bool {
return config.BoolSafe((*config.Config)(l.cfg), "no_sync")
}

View file

@ -51,7 +51,7 @@ func initControlService(c *cfg) {
controlSvc.WithTreeService(treeSynchronizer{
c.treeService,
}),
controlSvc.WithAPEChainSource(c.cfgObject.apeChainSource),
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
)
lis, err := net.Listen("tcp", endpoint)

View file

@ -98,6 +98,12 @@ func initApp(ctx context.Context, c *cfg) {
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
})
initAccessPolicyEngine(ctx, c)
initAndLog(c, "access policy engine", func(c *cfg) {
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Open(ctx))
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Init())
})
initAndLog(c, "gRPC", initGRPC)
initAndLog(c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })

View file

@ -157,8 +157,6 @@ func initObjectService(c *cfg) {
c.replicator = createReplicator(c, keyStorage, c.bgClientCache)
c.cfgObject.apeChainSource = NewAPESource()
addPolicer(c, keyStorage, c.bgClientCache)
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
@ -426,7 +424,7 @@ func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter, irFet
c.cfgObject.eaclSource,
eaclSDK.NewValidator(),
ls),
acl.NewAPEChecker(c.log, c.cfgObject.apeChainSource),
acl.NewAPEChecker(c.log, c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.chainRouter),
c.cfgObject.cnrSource,
v2.WithLogger(c.log),
)

View file

@ -3,33 +3,63 @@ package main
import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/resource"
)
type apeChainSourceImpl struct {
mtx sync.Mutex
localChainStorage map[cid.ID]engine.LocalOverrideEngine
type accessPolicyEngine struct {
mtx sync.RWMutex
chainRouter engine.ChainRouter
morphChainStorage engine.MorphRuleChainStorage
localOverrideDatabase chainbase.LocalOverrideDatabase
}
func NewAPESource() container.AccessPolicyEngineChainSource {
return &apeChainSourceImpl{
localChainStorage: make(map[cid.ID]engine.LocalOverrideEngine),
var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil)
func newAccessPolicyEngine(
morphChainStorage engine.MorphRuleChainStorage,
fyrchik marked this conversation as resolved Outdated
  1. Why is it public?
  2. What is the benefit of using functional options here?
1. Why is it public? 2. What is the benefit of using functional options here?
  1. I've made it private
  2. inmemory implementations are used as default if no options are passed
1. I've made it private 2. `inmemory` implementations are used as default if no options are passed

I mean why do we use functional options and not some struct?
If path to db is empty we can use in-memory and log. Frankly, I would refuse to start unless we explicitly want in-memory (most likely not).

I mean why do we use _functional_ options and not some struct? If path to db is empty we can use in-memory and log. Frankly, I would refuse to start unless we explicitly want in-memory (most likely not).

If path to db is empty we can use in-memory

I liked the idea. I remove func options at all and pass only two parameters to the constructor that are initialized explicitly

> If path to db is empty we can use in-memory I liked the idea. I remove func options at all and pass only two parameters to the constructor that are initialized explicitly
localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine {
return &accessPolicyEngine{
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
morphChainStorage,
localOverrideDatabase,
),
morphChainStorage: morphChainStorage,
localOverrideDatabase: localOverrideDatabase,
}
}
var _ container.AccessPolicyEngineChainSource = (*apeChainSourceImpl)(nil)
func (a *accessPolicyEngine) IsAllowed(name chain.Name, target engine.RequestTarget, r resource.Request) (status chain.Status, found bool, err error) {
a.mtx.RLock()
defer a.mtx.RUnlock()
func (c *apeChainSourceImpl) GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
s, ok := c.localChainStorage[cid]
if ok {
return s, nil
}
c.localChainStorage[cid] = inmemory.NewInMemoryLocalOverrides()
return c.localChainStorage[cid], nil
return a.chainRouter.IsAllowed(name, target, r)
}
func (a *accessPolicyEngine) MorphRuleChainStorage() engine.MorphRuleChainStorage {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.morphChainStorage
}
dstepanov-yadro marked this conversation as resolved Outdated

I think it must be RLock too.

I think it must be `RLock` too.

I have put a.mtx.Lock() because MorphRuleChainStorage is changable. We may add rules to policy-contract via engine.MorphRuleChainStorage methods.

If it is RLock-ed for adding new rules (write) then chain routing for a request will be incorrect

I have put `a.mtx.Lock()` because `MorphRuleChainStorage` is changable. We may add rules to policy-contract via `engine.MorphRuleChainStorage` methods. If it is `RLock`-ed for adding new rules (write) then chain routing for a request will be incorrect
func (a *accessPolicyEngine) LocalStorage() engine.LocalOverrideStorage {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.localOverrideDatabase
}
func (a *accessPolicyEngine) LocalOverrideDatabaseCore() chainbase.DatabaseCore {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.localOverrideDatabase
}

View file

@ -429,7 +429,9 @@ const (
FrostFSNodeFailedToAttachShardToEngine = "failed to attach shard to engine"
FrostFSNodeShardAttachedToEngine = "shard attached to engine"
FrostFSNodeClosingComponentsOfTheStorageEngine = "closing components of the storage engine..."
FrostFSNodeAccessPolicyEngineClosingFailure = "ape closing failure"
FrostFSNodeStorageEngineClosingFailure = "storage engine closing failure"
FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed = "persistent rule storage db path is not set: in-memory will be used"
FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully = "all components of the storage engine closed successfully"
FrostFSNodeBootstrappingWithTheMaintenanceState = "bootstrapping with the maintenance state"
FrostFSNodeBootstrappingWithOnlineState = "bootstrapping with online state"

250
pkg/ape/chainbase/boltdb.go Normal file
View file

@ -0,0 +1,250 @@
package chainbase
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
)
type boltLocalOverrideStorage struct {
*cfg
db *bbolt.DB
}
var (
chainBucket = []byte{0}
)
var (
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
ErrChainNotFound = logicerr.New("chain has not been found")
ErrGlobalNamespaceBucketNotFound = logicerr.New("global namespace bucket has not been found")
ErrTargetTypeBucketNotFound = logicerr.New("target type bucket has not been found")
ErrTargetNameBucketNotFound = logicerr.New("target name bucket has not been found")
)
// NewBoltLocalOverrideDatabase returns storage wrapper for storing access policy engine
// local overrides.
//
// chain storage (chainBucket):
// -> global namespace bucket (nBucket):
// --> target bucket (tBucket)
// ---> target name (resource) bucket (rBucket):
//
// | Key | Value |
// x---------------------x-------------------x
// | chain id (string) | serialized chain |
// x---------------------x-------------------x
//
//nolint:godot
func NewBoltLocalOverrideDatabase(opts ...Option) LocalOverrideDatabase {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &boltLocalOverrideStorage{
cfg: c,
}
}
func (cs *boltLocalOverrideStorage) Init() error {
return cs.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(chainBucket)
return err
})
}
func (cs *boltLocalOverrideStorage) Open(context.Context) error {
err := util.MkdirAllX(filepath.Dir(cs.path), cs.perm)
if err != nil {
return fmt.Errorf("can't create dir %s for the chain DB: %w", cs.path, err)
}
opts := *bbolt.DefaultOptions
opts.NoSync = cs.noSync
opts.Timeout = 100 * time.Millisecond
cs.db, err = bbolt.Open(cs.path, cs.perm, &opts)
if err != nil {
return fmt.Errorf("can't open the chain DB: %w", err)
}
cs.db.MaxBatchSize = cs.maxBatchSize
cs.db.MaxBatchDelay = cs.maxBatchDelay
return nil
}
func (cs *boltLocalOverrideStorage) Close() error {
var err error
if cs.db != nil {
err = cs.db.Close()
}
return err
}
func getTargetBucket(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
cbucket := tx.Bucket(chainBucket)
if cbucket == nil {
return nil, ErrChainBucketNotFound
}
nbucket := cbucket.Bucket([]byte(name))
if nbucket == nil {
return nil, fmt.Errorf("global namespace %s: %w", name, ErrGlobalNamespaceBucketNotFound)
}
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
if typeBucket == nil {
return nil, fmt.Errorf("type bucket '%c': %w", target.Type, ErrTargetTypeBucketNotFound)
}
rbucket := typeBucket.Bucket([]byte(target.Name))
if rbucket == nil {
return nil, fmt.Errorf("target name bucket %s: %w", target.Name, ErrTargetNameBucketNotFound)
}
return rbucket, nil
}
fyrchik marked this conversation as resolved Outdated

Marshaling inside Update is a bad idea, can we move it outside?

Marshaling inside `Update` is a bad idea, can we move it outside?

You are correct. Placing the unmarshalling within transaction is really bad idea. I've taken these out of Update and View for all methods

You are correct. Placing the unmarshalling within transaction is really bad idea. I've taken these out of `Update` and `View` for all methods

For View it is not so bad, it blocks only remapping.

For `View` it is not so bad, it blocks only remapping.
func getTargetBucketCreateIfEmpty(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
cbucket := tx.Bucket(chainBucket)
if cbucket == nil {
return nil, ErrChainBucketNotFound
}
nbucket := cbucket.Bucket([]byte(name))
if nbucket == nil {
var err error
nbucket, err = cbucket.CreateBucket([]byte(name))
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the global chain name %s: %w", name, err)
}
}
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
if typeBucket == nil {
var err error
fyrchik marked this conversation as resolved Outdated

What about storing chain by key? This way we have O(1) update and listing can be done with iteration.

What about storing chain by key? This way we have O(1) update and listing can be done with iteration.

OK.

I have made Bucket: Name -> Bucket: Resource -> key (chainID): value (marshalled chain)

OK. I have made `Bucket: Name` -> `Bucket: Resource` -> `key (chainID): value (marshalled chain)`
typeBucket, err = cbucket.CreateBucket([]byte{byte(target.Type)})
if err != nil {
dstepanov-yadro marked this conversation as resolved Outdated

Get result must be copied to other slice: returned value is valid only for transaction lifetime.

`Get` result must be copied to other slice: returned value is valid only for transaction lifetime.

You are correct. The doc really says

"The returned value is only valid for the life of the transaction".

Fixed

You are correct. The doc really says > "The returned value is only valid for the life of the transaction". Fixed
return nil, fmt.Errorf("could not create a bucket for the target type '%c': %w", target.Type, err)
}
}
rbucket := typeBucket.Bucket([]byte(target.Name))
if rbucket == nil {
var err error
rbucket, err = typeBucket.CreateBucket([]byte(target.Name))
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the target name %s: %w", target.Name, err)
}
}
return rbucket, nil
}
func (cs *boltLocalOverrideStorage) AddOverride(name chain.Name, target policyengine.Target, c *chain.Chain) (chain.ID, error) {
if c.ID == "" {
return "", fmt.Errorf("chain ID is not set")
}
serializedChain := c.Bytes()
err := cs.db.Update(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucketCreateIfEmpty(tx, name, target)
if err != nil {
return err
}
return rbuck.Put([]byte(c.ID), serializedChain)
})
return c.ID, err
}
func (cs *boltLocalOverrideStorage) GetOverride(name chain.Name, target policyengine.Target, chainID chain.ID) (*chain.Chain, error) {
var serializedChain []byte
dstepanov-yadro marked this conversation as resolved Outdated

This []byte slice requires to be copied too

This `[]byte` slice requires to be copied too

Oh, thanks! Fixed

Oh, thanks! Fixed
if err := cs.db.View(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
serializedChain = rbuck.Get([]byte(chainID))
if serializedChain == nil {
return ErrChainNotFound
}
serializedChain = slice.Copy(serializedChain)
return nil
}); err != nil {
return nil, err
}
c := &chain.Chain{}
if err := json.Unmarshal(serializedChain, c); err != nil {
return nil, err
}
return c, nil
}
func (cs *boltLocalOverrideStorage) RemoveOverride(name chain.Name, target policyengine.Target, chainID chain.ID) error {
return cs.db.Update(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
return rbuck.Delete([]byte(chainID))
})
}
func (cs *boltLocalOverrideStorage) ListOverrides(name chain.Name, target policyengine.Target) ([]*chain.Chain, error) {
var serializedChains [][]byte
var serializedChain []byte
if err := cs.db.View(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
return rbuck.ForEach(func(_, v []byte) error {
serializedChain = slice.Copy(v)
serializedChains = append(serializedChains, serializedChain)
return nil
})
}); err != nil {
if errors.Is(err, ErrGlobalNamespaceBucketNotFound) || errors.Is(err, ErrTargetNameBucketNotFound) {
return []*chain.Chain{}, nil
}
return nil, err
}
chains := make([]*chain.Chain, 0, len(serializedChains))
for _, serializedChain = range serializedChains {
c := &chain.Chain{}
if err := json.Unmarshal(serializedChain, c); err != nil {
return nil, err
}
chains = append(chains, c)
}
return chains, nil
}
func (cs *boltLocalOverrideStorage) DropAllOverrides(name chain.Name) error {
return cs.db.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(name))
})
}

View file

@ -0,0 +1,30 @@
package chainbase
import (
"context"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
)
type inmemoryLocalOverrideStorage struct {
engine.LocalOverrideStorage
}
func NewInmemoryLocalOverrideDatabase() LocalOverrideDatabase {
return &inmemoryLocalOverrideStorage{
LocalOverrideStorage: inmemory.NewInmemoryLocalStorage(),
}
}
func (cs *inmemoryLocalOverrideStorage) Init() error {
return nil
}
func (cs *inmemoryLocalOverrideStorage) Open(_ context.Context) error {
return nil
}
func (cs *inmemoryLocalOverrideStorage) Close() error {
return nil
}

View file

@ -0,0 +1,22 @@
package chainbase
import (
"context"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
// DatabaseCore interface provides methods to initialize and manage local override storage
// as database.
type DatabaseCore interface {
Init() error
Open(context.Context) error
Close() error
}
// LocalOverrideDatabase interface provides methods to manage local override storage
// as database and as the APE's local override storage.
type LocalOverrideDatabase interface {
DatabaseCore
engine.LocalOverrideStorage
}

View file

@ -0,0 +1,67 @@
package chainbase
import (
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type Option func(*cfg)
type cfg struct {
path string
perm fs.FileMode
noSync bool
maxBatchDelay time.Duration
maxBatchSize int
log *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize,
log: &logger.Logger{Logger: zap.L()},
}
}
func WithPath(path string) Option {
return func(c *cfg) {
c.path = path
}
}
func WithPerm(perm fs.FileMode) Option {
return func(c *cfg) {
c.perm = perm
}
}
func WithNoSync(noSync bool) Option {
return func(c *cfg) {
c.noSync = noSync
}
}
func WithMaxBatchDelay(maxBatchDelay time.Duration) Option {
return func(c *cfg) {
c.maxBatchDelay = maxBatchDelay
}
}
func WithMaxBatchSize(maxBatchSize int) Option {
return func(c *cfg) {
c.maxBatchSize = maxBatchSize
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
fyrchik marked this conversation as resolved Outdated

What is the usecase for this option?

What is the usecase for this option?

Removed

Removed
c.log = l
}
}

View file

@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
// Container groups information about the FrostFS container stored in the FrostFS network.
@ -72,10 +71,3 @@ type EACLSource interface {
// eACL table is not in source.
GetEACL(cid.ID) (*EACL, error)
}
// AccessPolicyEngineChainSource interface provides methods to access and manipulate
// policy engine chain storage.
type AccessPolicyEngineChainSource interface {
// TODO (aarifullin): Better to use simpler interface instead CachedChainStorage.
GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error)
}

View file

@ -4,34 +4,24 @@ import (
"context"
"errors"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// extractCID extracts CID from the schema's pattern.
// TODO (aarifullin): This is temporary solution should be replaced by
// resource name validation.
func extractCID(resource string) (cid.ID, error) {
var cidStr string
// Sscanf requires to make tokens delimited by spaces.
pattern := strings.Replace(nativeschema.ResourceFormatRootContainerObjects, "/", " ", -1)
resource = strings.Replace(resource, "/", " ", -1)
if _, err := fmt.Sscanf(resource, pattern, &cidStr); err != nil {
err = fmt.Errorf("could not parse the target name '%s' to CID: %w", resource, err)
return cid.ID{}, err
func apeTarget(chainTarget *control.ChainTarget) (engine.Target, error) {
switch chainTarget.GetType() {
case control.ChainTarget_CONTAINER:
return engine.ContainerTarget(chainTarget.GetName()), nil
case control.ChainTarget_NAMESPACE:
return engine.NamespaceTarget(chainTarget.GetName()), nil
default:
}
var cid cid.ID
err := cid.DecodeString(cidStr)
return cid, err
return engine.Target{}, status.Error(codes.InvalidArgument,
fmt.Errorf("target type is not supported: %s", chainTarget.GetType().String()).Error())
}
func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainLocalOverrideRequest) (*control.AddChainLocalOverrideResponse, error) {
@ -39,34 +29,23 @@ func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainL
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
var chain apechain.Chain
if err = chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
if err := chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
s.apeChainCounter.Add(1)
// TODO (aarifullin): the such chain id is not well-designed yet.
if chain.ID == "" {
chain.ID = apechain.ID(fmt.Sprintf("%s:%d", apechain.Ingress, s.apeChainCounter.Load()))
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
if _, err = src.LocalStorage().AddOverride(apechain.Ingress, engine.ContainerTarget(resource), &chain); err != nil {
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, err
}
if _, err = s.localOverrideStorage.LocalStorage().AddOverride(apechain.Ingress, target, &chain); err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
@ -87,23 +66,11 @@ func (s *Server) GetChainLocalOverride(_ context.Context, req *control.GetChainL
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
return nil, err
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
chain, err := src.LocalStorage().GetOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId()))
chain, err := s.localOverrideStorage.LocalStorage().GetOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId()))
if err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
@ -125,23 +92,12 @@ func (s *Server) ListChainLocalOverrides(_ context.Context, req *control.ListCha
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
return nil, err
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
chains, err := src.LocalStorage().ListOverrides(apechain.Ingress, engine.ContainerTarget(resource))
chains, err := s.localOverrideStorage.LocalStorage().ListOverrides(apechain.Ingress, target)
if err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
@ -167,23 +123,12 @@ func (s *Server) RemoveChainLocalOverride(_ context.Context, req *control.Remove
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
return nil, err
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
if err = src.LocalStorage().RemoveOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId())); err != nil {
if err = s.localOverrideStorage.LocalStorage().RemoveOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId())); err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
resp := &control.RemoveChainLocalOverrideResponse{

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
// Server is an entity that serves
@ -51,6 +52,14 @@ type NodeState interface {
ForceMaintenance() error
}
// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine
// interface methods.
type LocalOverrideStorageDecorator interface {
dstepanov-yadro marked this conversation as resolved Outdated

interface looks redundant: using struct can be enough i think. not required to fix.

interface looks redundant: using struct can be enough i think. not required to fix.

If you are talking about using

type accessPolicyEngine struct { /*...*/ }

instead LocalOverrideStorageDecorator, then I can agree your point is fair, but I really do not want to let control service manage database, i.e. provide access to Init, Open, Close and give only LocalStorage methods

If you are talking about using ```go type accessPolicyEngine struct { /*...*/ } ``` instead `LocalOverrideStorageDecorator`, then I can agree your point is fair, but I really do not want to let control service manage database, i.e. provide access to `Init`, `Open`, `Close` and give only `LocalStorage` methods
// LocalStorage method can be decorated by using sync primitives in the case if the local
// override storage state should be consistent for chain router.
LocalStorage() policyengine.LocalOverrideStorage
}
// Option of the Server's constructor.
type Option func(*cfg)
@ -65,7 +74,7 @@ type cfg struct {
cnrSrc container.Source
apeChainSrc container.AccessPolicyEngineChainSource
localOverrideStorage LocalOverrideStorageDecorator
replicator *replicator.Replicator
@ -160,10 +169,10 @@ func WithTreeService(s TreeService) Option {
}
}
// WithAPEChainSource returns the option to set access policy engine
// chain source.
func WithAPEChainSource(apeChainSrc container.AccessPolicyEngineChainSource) Option {
// WithLocalOverrideStorage returns the option to set access policy engine
// chain override storage.
func WithLocalOverrideStorage(localOverrideStorage LocalOverrideStorageDecorator) Option {
return func(c *cfg) {
c.apeChainSrc = apeChainSrc
c.localOverrideStorage = localOverrideStorage
}
}

View file

@ -1,10 +1,8 @@
package acl
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -12,34 +10,25 @@ import (
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
var errAPEChainNoSource = errors.New("could not get ape chain source for the container")
type apeCheckerImpl struct {
log *logger.Logger
apeSrc container.AccessPolicyEngineChainSource
log *logger.Logger
chainRouter policyengine.ChainRouter
}
func NewAPEChecker(log *logger.Logger, apeSrc container.AccessPolicyEngineChainSource) v2.APEChainChecker {
func NewAPEChecker(log *logger.Logger, chainRouter policyengine.ChainRouter) v2.APEChainChecker {
return &apeCheckerImpl{
log: log,
apeSrc: apeSrc,
log: log,
chainRouter: chainRouter,
}
}
func (c *apeCheckerImpl) CheckIfRequestPermitted(reqInfo v2.RequestInfo) error {
cnr := reqInfo.ContainerID()
chainCache, err := c.apeSrc.GetChainSource(cnr)
if err != nil {
return errAPEChainNoSource
}
request := new(Request)
request.FromRequestInfo(reqInfo)
cnrTarget := getResource(reqInfo).Name()
status, ruleFound, err := chainCache.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
status, ruleFound, err := c.chainRouter.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
if err != nil {
return err
}