ape: Implement boltdb storage for local overrides #820
|
@ -29,6 +29,7 @@ import (
|
|||
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
||||
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -68,6 +69,7 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
|
@ -518,10 +520,7 @@ type cfgObject struct {
|
|||
|
||||
eaclSource container.EACLSource
|
||||
|
||||
// Access policy chain source is used by object service to
|
||||
// check for operation permissions but this source is also shared with
|
||||
// control service that dispatches local overrides.
|
||||
apeChainSource container.AccessPolicyEngineChainSource
|
||||
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
||||
|
||||
pool cfgObjectRoutines
|
||||
|
||||
|
@ -542,6 +541,10 @@ type cfgLocalStorage struct {
|
|||
localStorage *engine.StorageEngine
|
||||
}
|
||||
|
||||
type cfgAccessPolicyEngine struct {
|
||||
accessPolicyEngine *accessPolicyEngine
|
||||
}
|
||||
|
||||
type cfgObjectRoutines struct {
|
||||
putRemote *ants.Pool
|
||||
|
||||
|
@ -970,6 +973,34 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
|||
})
|
||||
}
|
||||
|
||||
func initAccessPolicyEngine(_ context.Context, c *cfg) {
|
||||
var localOverrideDB chainbase.LocalOverrideDatabase
|
||||
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
|
||||
c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
|
||||
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
|
||||
} else {
|
||||
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
|
||||
chainbase.WithLogger(c.log),
|
||||
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
|
||||
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
|
||||
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
|
||||
)
|
||||
}
|
||||
|
||||
morphRuleStorage := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||
|
||||
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
|
||||
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
|
||||
|
||||
c.onShutdown(func() {
|
||||
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
|
||||
c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure,
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
||||
var err error
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package nodeconfig
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -30,11 +31,18 @@ type NotificationConfig struct {
|
|||
cfg *config.Config
|
||||
}
|
||||
|
||||
// PersistentPolicyRulesConfig is a wrapper over "persistent_policy_rules" config section
|
||||
// which provides access to persistent policy rules storage configuration of node.
|
||||
type PersistentPolicyRulesConfig struct {
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
const (
|
||||
subsection = "node"
|
||||
persistentSessionsSubsection = "persistent_sessions"
|
||||
persistentStateSubsection = "persistent_state"
|
||||
notificationSubsection = "notification"
|
||||
subsection = "node"
|
||||
persistentSessionsSubsection = "persistent_sessions"
|
||||
persistentStateSubsection = "persistent_state"
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is too generic for a top-level name, can we use sth like It is too generic for a top-level name, can we use sth like `persistent_policy_rules`?
aarifullin
commented
Fixed Fixed
|
||||
notificationSubsection = "notification"
|
||||
persistentPolicyRulesSubsection = "persistent_policy_rules"
|
||||
|
||||
attributePrefix = "attribute"
|
||||
|
||||
|
@ -245,3 +253,42 @@ func (n NotificationConfig) KeyPath() string {
|
|||
func (n NotificationConfig) CAPath() string {
|
||||
return config.StringSafe(n.cfg, "ca")
|
||||
}
|
||||
|
||||
const (
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why not Why not `0o640`?
aarifullin
commented
Fixed Fixed
|
||||
// PermDefault is a default permission bits for local override storage file.
|
||||
PermDefault = 0o644
|
||||
)
|
||||
|
||||
// PersistentPolicyRules returns structure that provides access to "persistent_policy_rules"
|
||||
// subsection of "node" section.
|
||||
func PersistentPolicyRules(c *config.Config) PersistentPolicyRulesConfig {
|
||||
return PersistentPolicyRulesConfig{
|
||||
c.Sub(subsection).Sub(persistentPolicyRulesSubsection),
|
||||
}
|
||||
}
|
||||
|
||||
// Path returns the value of "path" config parameter.
|
||||
//
|
||||
// Returns empty string if missing, for compatibility with older configurations.
|
||||
func (l PersistentPolicyRulesConfig) Path() string {
|
||||
return config.StringSafe(l.cfg, "path")
|
||||
}
|
||||
|
||||
// Perm returns the value of "perm" config parameter as a fs.FileMode.
|
||||
//
|
||||
// Returns PermDefault if the value is not a positive number.
|
||||
func (l PersistentPolicyRulesConfig) Perm() fs.FileMode {
|
||||
p := config.UintSafe((*config.Config)(l.cfg), "perm")
|
||||
if p == 0 {
|
||||
p = PermDefault
|
||||
}
|
||||
|
||||
return fs.FileMode(p)
|
||||
}
|
||||
|
||||
// NoSync returns the value of "no_sync" config parameter as a bool value.
|
||||
//
|
||||
// Returns false if the value is not a boolean.
|
||||
func (l PersistentPolicyRulesConfig) NoSync() bool {
|
||||
return config.BoolSafe((*config.Config)(l.cfg), "no_sync")
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ func initControlService(c *cfg) {
|
|||
controlSvc.WithTreeService(treeSynchronizer{
|
||||
c.treeService,
|
||||
}),
|
||||
controlSvc.WithAPEChainSource(c.cfgObject.apeChainSource),
|
||||
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
|
||||
)
|
||||
|
||||
lis, err := net.Listen("tcp", endpoint)
|
||||
|
|
|
@ -98,6 +98,12 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
|
||||
})
|
||||
|
||||
initAccessPolicyEngine(ctx, c)
|
||||
initAndLog(c, "access policy engine", func(c *cfg) {
|
||||
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Open(ctx))
|
||||
fatalOnErr(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalOverrideDatabaseCore().Init())
|
||||
})
|
||||
|
||||
initAndLog(c, "gRPC", initGRPC)
|
||||
initAndLog(c, "netmap", func(c *cfg) { initNetmapService(ctx, c) })
|
||||
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
|
||||
|
|
|
@ -157,8 +157,6 @@ func initObjectService(c *cfg) {
|
|||
|
||||
c.replicator = createReplicator(c, keyStorage, c.bgClientCache)
|
||||
|
||||
c.cfgObject.apeChainSource = NewAPESource()
|
||||
|
||||
addPolicer(c, keyStorage, c.bgClientCache)
|
||||
|
||||
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||
|
@ -426,7 +424,7 @@ func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter, irFet
|
|||
c.cfgObject.eaclSource,
|
||||
eaclSDK.NewValidator(),
|
||||
ls),
|
||||
acl.NewAPEChecker(c.log, c.cfgObject.apeChainSource),
|
||||
acl.NewAPEChecker(c.log, c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.chainRouter),
|
||||
c.cfgObject.cnrSource,
|
||||
v2.WithLogger(c.log),
|
||||
)
|
||||
|
|
|
@ -3,33 +3,63 @@ package main
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/resource"
|
||||
)
|
||||
|
||||
type apeChainSourceImpl struct {
|
||||
mtx sync.Mutex
|
||||
localChainStorage map[cid.ID]engine.LocalOverrideEngine
|
||||
type accessPolicyEngine struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
chainRouter engine.ChainRouter
|
||||
|
||||
morphChainStorage engine.MorphRuleChainStorage
|
||||
|
||||
localOverrideDatabase chainbase.LocalOverrideDatabase
|
||||
}
|
||||
|
||||
func NewAPESource() container.AccessPolicyEngineChainSource {
|
||||
return &apeChainSourceImpl{
|
||||
localChainStorage: make(map[cid.ID]engine.LocalOverrideEngine),
|
||||
var _ engine.LocalOverrideEngine = (*accessPolicyEngine)(nil)
|
||||
|
||||
func newAccessPolicyEngine(
|
||||
morphChainStorage engine.MorphRuleChainStorage,
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
1. Why is it public?
2. What is the benefit of using functional options here?
aarifullin
commented
1. I've made it private
2. `inmemory` implementations are used as default if no options are passed
fyrchik
commented
I mean why do we use functional options and not some struct? I mean why do we use _functional_ options and not some struct?
If path to db is empty we can use in-memory and log. Frankly, I would refuse to start unless we explicitly want in-memory (most likely not).
aarifullin
commented
I liked the idea. I remove func options at all and pass only two parameters to the constructor that are initialized explicitly > If path to db is empty we can use in-memory
I liked the idea. I remove func options at all and pass only two parameters to the constructor that are initialized explicitly
|
||||
localOverrideDatabase chainbase.LocalOverrideDatabase) *accessPolicyEngine {
|
||||
return &accessPolicyEngine{
|
||||
chainRouter: engine.NewDefaultChainRouterWithLocalOverrides(
|
||||
morphChainStorage,
|
||||
localOverrideDatabase,
|
||||
),
|
||||
|
||||
morphChainStorage: morphChainStorage,
|
||||
|
||||
localOverrideDatabase: localOverrideDatabase,
|
||||
}
|
||||
}
|
||||
|
||||
var _ container.AccessPolicyEngineChainSource = (*apeChainSourceImpl)(nil)
|
||||
func (a *accessPolicyEngine) IsAllowed(name chain.Name, target engine.RequestTarget, r resource.Request) (status chain.Status, found bool, err error) {
|
||||
a.mtx.RLock()
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
func (c *apeChainSourceImpl) GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
s, ok := c.localChainStorage[cid]
|
||||
if ok {
|
||||
return s, nil
|
||||
}
|
||||
c.localChainStorage[cid] = inmemory.NewInMemoryLocalOverrides()
|
||||
return c.localChainStorage[cid], nil
|
||||
return a.chainRouter.IsAllowed(name, target, r)
|
||||
}
|
||||
|
||||
func (a *accessPolicyEngine) MorphRuleChainStorage() engine.MorphRuleChainStorage {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
return a.morphChainStorage
|
||||
}
|
||||
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
I think it must be I think it must be `RLock` too.
aarifullin
commented
I have put If it is I have put `a.mtx.Lock()` because `MorphRuleChainStorage` is changable. We may add rules to policy-contract via `engine.MorphRuleChainStorage` methods.
If it is `RLock`-ed for adding new rules (write) then chain routing for a request will be incorrect
|
||||
func (a *accessPolicyEngine) LocalStorage() engine.LocalOverrideStorage {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
return a.localOverrideDatabase
|
||||
}
|
||||
|
||||
func (a *accessPolicyEngine) LocalOverrideDatabaseCore() chainbase.DatabaseCore {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
return a.localOverrideDatabase
|
||||
}
|
||||
|
|
|
@ -429,7 +429,9 @@ const (
|
|||
FrostFSNodeFailedToAttachShardToEngine = "failed to attach shard to engine"
|
||||
FrostFSNodeShardAttachedToEngine = "shard attached to engine"
|
||||
FrostFSNodeClosingComponentsOfTheStorageEngine = "closing components of the storage engine..."
|
||||
FrostFSNodeAccessPolicyEngineClosingFailure = "ape closing failure"
|
||||
FrostFSNodeStorageEngineClosingFailure = "storage engine closing failure"
|
||||
FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed = "persistent rule storage db path is not set: in-memory will be used"
|
||||
FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully = "all components of the storage engine closed successfully"
|
||||
FrostFSNodeBootstrappingWithTheMaintenanceState = "bootstrapping with the maintenance state"
|
||||
FrostFSNodeBootstrappingWithOnlineState = "bootstrapping with online state"
|
||||
|
|
250
pkg/ape/chainbase/boltdb.go
Normal file
|
@ -0,0 +1,250 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type boltLocalOverrideStorage struct {
|
||||
*cfg
|
||||
|
||||
db *bbolt.DB
|
||||
}
|
||||
|
||||
var (
|
||||
chainBucket = []byte{0}
|
||||
)
|
||||
|
||||
var (
|
||||
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
|
||||
|
||||
ErrChainNotFound = logicerr.New("chain has not been found")
|
||||
|
||||
ErrGlobalNamespaceBucketNotFound = logicerr.New("global namespace bucket has not been found")
|
||||
|
||||
ErrTargetTypeBucketNotFound = logicerr.New("target type bucket has not been found")
|
||||
|
||||
ErrTargetNameBucketNotFound = logicerr.New("target name bucket has not been found")
|
||||
)
|
||||
|
||||
// NewBoltLocalOverrideDatabase returns storage wrapper for storing access policy engine
|
||||
// local overrides.
|
||||
//
|
||||
// chain storage (chainBucket):
|
||||
// -> global namespace bucket (nBucket):
|
||||
// --> target bucket (tBucket)
|
||||
// ---> target name (resource) bucket (rBucket):
|
||||
//
|
||||
// | Key | Value |
|
||||
// x---------------------x-------------------x
|
||||
// | chain id (string) | serialized chain |
|
||||
// x---------------------x-------------------x
|
||||
//
|
||||
//nolint:godot
|
||||
func NewBoltLocalOverrideDatabase(opts ...Option) LocalOverrideDatabase {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
return &boltLocalOverrideStorage{
|
||||
cfg: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) Init() error {
|
||||
return cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(chainBucket)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) Open(context.Context) error {
|
||||
err := util.MkdirAllX(filepath.Dir(cs.path), cs.perm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for the chain DB: %w", cs.path, err)
|
||||
}
|
||||
|
||||
opts := *bbolt.DefaultOptions
|
||||
opts.NoSync = cs.noSync
|
||||
opts.Timeout = 100 * time.Millisecond
|
||||
|
||||
cs.db, err = bbolt.Open(cs.path, cs.perm, &opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open the chain DB: %w", err)
|
||||
}
|
||||
|
||||
cs.db.MaxBatchSize = cs.maxBatchSize
|
||||
cs.db.MaxBatchDelay = cs.maxBatchDelay
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) Close() error {
|
||||
var err error
|
||||
if cs.db != nil {
|
||||
err = cs.db.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getTargetBucket(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
|
||||
cbucket := tx.Bucket(chainBucket)
|
||||
if cbucket == nil {
|
||||
return nil, ErrChainBucketNotFound
|
||||
}
|
||||
|
||||
nbucket := cbucket.Bucket([]byte(name))
|
||||
if nbucket == nil {
|
||||
return nil, fmt.Errorf("global namespace %s: %w", name, ErrGlobalNamespaceBucketNotFound)
|
||||
}
|
||||
|
||||
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
|
||||
if typeBucket == nil {
|
||||
return nil, fmt.Errorf("type bucket '%c': %w", target.Type, ErrTargetTypeBucketNotFound)
|
||||
}
|
||||
|
||||
rbucket := typeBucket.Bucket([]byte(target.Name))
|
||||
if rbucket == nil {
|
||||
return nil, fmt.Errorf("target name bucket %s: %w", target.Name, ErrTargetNameBucketNotFound)
|
||||
}
|
||||
|
||||
return rbucket, nil
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Marshaling inside Marshaling inside `Update` is a bad idea, can we move it outside?
aarifullin
commented
You are correct. Placing the unmarshalling within transaction is really bad idea. I've taken these out of You are correct. Placing the unmarshalling within transaction is really bad idea. I've taken these out of `Update` and `View` for all methods
fyrchik
commented
For For `View` it is not so bad, it blocks only remapping.
|
||||
|
||||
func getTargetBucketCreateIfEmpty(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
|
||||
cbucket := tx.Bucket(chainBucket)
|
||||
if cbucket == nil {
|
||||
return nil, ErrChainBucketNotFound
|
||||
}
|
||||
|
||||
nbucket := cbucket.Bucket([]byte(name))
|
||||
if nbucket == nil {
|
||||
var err error
|
||||
nbucket, err = cbucket.CreateBucket([]byte(name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a bucket for the global chain name %s: %w", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
|
||||
if typeBucket == nil {
|
||||
var err error
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What about storing chain by key? This way we have O(1) update and listing can be done with iteration. What about storing chain by key? This way we have O(1) update and listing can be done with iteration.
aarifullin
commented
OK. I have made OK.
I have made `Bucket: Name` -> `Bucket: Resource` -> `key (chainID): value (marshalled chain)`
|
||||
typeBucket, err = cbucket.CreateBucket([]byte{byte(target.Type)})
|
||||
if err != nil {
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
`Get` result must be copied to other slice: returned value is valid only for transaction lifetime.
aarifullin
commented
You are correct. The doc really says
Fixed You are correct. The doc really says
> "The returned value is only valid for the life of the transaction".
Fixed
|
||||
return nil, fmt.Errorf("could not create a bucket for the target type '%c': %w", target.Type, err)
|
||||
}
|
||||
}
|
||||
|
||||
rbucket := typeBucket.Bucket([]byte(target.Name))
|
||||
if rbucket == nil {
|
||||
var err error
|
||||
rbucket, err = typeBucket.CreateBucket([]byte(target.Name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a bucket for the target name %s: %w", target.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return rbucket, nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) AddOverride(name chain.Name, target policyengine.Target, c *chain.Chain) (chain.ID, error) {
|
||||
if c.ID == "" {
|
||||
return "", fmt.Errorf("chain ID is not set")
|
||||
}
|
||||
|
||||
serializedChain := c.Bytes()
|
||||
|
||||
err := cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucketCreateIfEmpty(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rbuck.Put([]byte(c.ID), serializedChain)
|
||||
})
|
||||
|
||||
return c.ID, err
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) GetOverride(name chain.Name, target policyengine.Target, chainID chain.ID) (*chain.Chain, error) {
|
||||
var serializedChain []byte
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
This This `[]byte` slice requires to be copied too
aarifullin
commented
Oh, thanks! Fixed Oh, thanks! Fixed
|
||||
|
||||
if err := cs.db.View(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucket(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serializedChain = rbuck.Get([]byte(chainID))
|
||||
if serializedChain == nil {
|
||||
return ErrChainNotFound
|
||||
}
|
||||
serializedChain = slice.Copy(serializedChain)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &chain.Chain{}
|
||||
if err := json.Unmarshal(serializedChain, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) RemoveOverride(name chain.Name, target policyengine.Target, chainID chain.ID) error {
|
||||
return cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucket(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rbuck.Delete([]byte(chainID))
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) ListOverrides(name chain.Name, target policyengine.Target) ([]*chain.Chain, error) {
|
||||
var serializedChains [][]byte
|
||||
var serializedChain []byte
|
||||
if err := cs.db.View(func(tx *bbolt.Tx) error {
|
||||
rbuck, err := getTargetBucket(tx, name, target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rbuck.ForEach(func(_, v []byte) error {
|
||||
serializedChain = slice.Copy(v)
|
||||
serializedChains = append(serializedChains, serializedChain)
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
if errors.Is(err, ErrGlobalNamespaceBucketNotFound) || errors.Is(err, ErrTargetNameBucketNotFound) {
|
||||
return []*chain.Chain{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
chains := make([]*chain.Chain, 0, len(serializedChains))
|
||||
for _, serializedChain = range serializedChains {
|
||||
c := &chain.Chain{}
|
||||
if err := json.Unmarshal(serializedChain, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
chains = append(chains, c)
|
||||
}
|
||||
return chains, nil
|
||||
}
|
||||
|
||||
func (cs *boltLocalOverrideStorage) DropAllOverrides(name chain.Name) error {
|
||||
return cs.db.Update(func(tx *bbolt.Tx) error {
|
||||
return tx.DeleteBucket([]byte(name))
|
||||
})
|
||||
}
|
30
pkg/ape/chainbase/inmemory.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
|
||||
)
|
||||
|
||||
type inmemoryLocalOverrideStorage struct {
|
||||
engine.LocalOverrideStorage
|
||||
}
|
||||
|
||||
func NewInmemoryLocalOverrideDatabase() LocalOverrideDatabase {
|
||||
return &inmemoryLocalOverrideStorage{
|
||||
LocalOverrideStorage: inmemory.NewInmemoryLocalStorage(),
|
||||
}
|
||||
}
|
||||
|
||||
func (cs *inmemoryLocalOverrideStorage) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *inmemoryLocalOverrideStorage) Open(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *inmemoryLocalOverrideStorage) Close() error {
|
||||
return nil
|
||||
}
|
22
pkg/ape/chainbase/interface.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
// DatabaseCore interface provides methods to initialize and manage local override storage
|
||||
// as database.
|
||||
type DatabaseCore interface {
|
||||
Init() error
|
||||
Open(context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// LocalOverrideDatabase interface provides methods to manage local override storage
|
||||
// as database and as the APE's local override storage.
|
||||
type LocalOverrideDatabase interface {
|
||||
DatabaseCore
|
||||
engine.LocalOverrideStorage
|
||||
}
|
67
pkg/ape/chainbase/option.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package chainbase
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
path string
|
||||
perm fs.FileMode
|
||||
noSync bool
|
||||
maxBatchDelay time.Duration
|
||||
maxBatchSize int
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
perm: os.ModePerm,
|
||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
func WithPath(path string) Option {
|
||||
return func(c *cfg) {
|
||||
c.path = path
|
||||
}
|
||||
}
|
||||
|
||||
func WithPerm(perm fs.FileMode) Option {
|
||||
return func(c *cfg) {
|
||||
c.perm = perm
|
||||
}
|
||||
}
|
||||
|
||||
func WithNoSync(noSync bool) Option {
|
||||
return func(c *cfg) {
|
||||
c.noSync = noSync
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxBatchDelay(maxBatchDelay time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxBatchDelay = maxBatchDelay
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxBatchSize(maxBatchSize int) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxBatchSize = maxBatchSize
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What is the usecase for this option? What is the usecase for this option?
aarifullin
commented
Removed Removed
|
||||
c.log = l
|
||||
}
|
||||
}
|
|
@ -7,7 +7,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
// Container groups information about the FrostFS container stored in the FrostFS network.
|
||||
|
@ -72,10 +71,3 @@ type EACLSource interface {
|
|||
// eACL table is not in source.
|
||||
GetEACL(cid.ID) (*EACL, error)
|
||||
}
|
||||
|
||||
// AccessPolicyEngineChainSource interface provides methods to access and manipulate
|
||||
// policy engine chain storage.
|
||||
type AccessPolicyEngineChainSource interface {
|
||||
// TODO (aarifullin): Better to use simpler interface instead CachedChainStorage.
|
||||
GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error)
|
||||
}
|
||||
|
|
|
@ -4,34 +4,24 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// extractCID extracts CID from the schema's pattern.
|
||||
// TODO (aarifullin): This is temporary solution should be replaced by
|
||||
// resource name validation.
|
||||
func extractCID(resource string) (cid.ID, error) {
|
||||
var cidStr string
|
||||
|
||||
// Sscanf requires to make tokens delimited by spaces.
|
||||
pattern := strings.Replace(nativeschema.ResourceFormatRootContainerObjects, "/", " ", -1)
|
||||
resource = strings.Replace(resource, "/", " ", -1)
|
||||
|
||||
if _, err := fmt.Sscanf(resource, pattern, &cidStr); err != nil {
|
||||
err = fmt.Errorf("could not parse the target name '%s' to CID: %w", resource, err)
|
||||
return cid.ID{}, err
|
||||
func apeTarget(chainTarget *control.ChainTarget) (engine.Target, error) {
|
||||
switch chainTarget.GetType() {
|
||||
case control.ChainTarget_CONTAINER:
|
||||
return engine.ContainerTarget(chainTarget.GetName()), nil
|
||||
case control.ChainTarget_NAMESPACE:
|
||||
return engine.NamespaceTarget(chainTarget.GetName()), nil
|
||||
default:
|
||||
}
|
||||
var cid cid.ID
|
||||
err := cid.DecodeString(cidStr)
|
||||
return cid, err
|
||||
return engine.Target{}, status.Error(codes.InvalidArgument,
|
||||
fmt.Errorf("target type is not supported: %s", chainTarget.GetType().String()).Error())
|
||||
}
|
||||
|
||||
func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainLocalOverrideRequest) (*control.AddChainLocalOverrideResponse, error) {
|
||||
|
@ -39,34 +29,23 @@ func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainL
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
var chain apechain.Chain
|
||||
if err = chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
|
||||
if err := chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
s.apeChainCounter.Add(1)
|
||||
// TODO (aarifullin): the such chain id is not well-designed yet.
|
||||
if chain.ID == "" {
|
||||
chain.ID = apechain.ID(fmt.Sprintf("%s:%d", apechain.Ingress, s.apeChainCounter.Load()))
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
if _, err = src.LocalStorage().AddOverride(apechain.Ingress, engine.ContainerTarget(resource), &chain); err != nil {
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = s.localOverrideStorage.LocalStorage().AddOverride(apechain.Ingress, target, &chain); err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
|
||||
|
@ -87,23 +66,11 @@ func (s *Server) GetChainLocalOverride(_ context.Context, req *control.GetChainL
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
chain, err := src.LocalStorage().GetOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId()))
|
||||
chain, err := s.localOverrideStorage.LocalStorage().GetOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId()))
|
||||
if err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
|
@ -125,23 +92,12 @@ func (s *Server) ListChainLocalOverrides(_ context.Context, req *control.ListCha
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
chains, err := src.LocalStorage().ListOverrides(apechain.Ingress, engine.ContainerTarget(resource))
|
||||
chains, err := s.localOverrideStorage.LocalStorage().ListOverrides(apechain.Ingress, target)
|
||||
if err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
|
@ -167,23 +123,12 @@ func (s *Server) RemoveChainLocalOverride(_ context.Context, req *control.Remove
|
|||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
target := req.GetBody().GetTarget()
|
||||
if target.Type != control.ChainTarget_CONTAINER {
|
||||
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
|
||||
}
|
||||
|
||||
cid, err := extractCID(target.GetName())
|
||||
target, err := apeTarget(req.GetBody().GetTarget())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := s.apeChainSrc.GetChainSource(cid)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
|
||||
if err = src.LocalStorage().RemoveOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId())); err != nil {
|
||||
if err = s.localOverrideStorage.LocalStorage().RemoveOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId())); err != nil {
|
||||
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
|
||||
}
|
||||
resp := &control.RemoveChainLocalOverrideResponse{
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
// Server is an entity that serves
|
||||
|
@ -51,6 +52,14 @@ type NodeState interface {
|
|||
ForceMaintenance() error
|
||||
}
|
||||
|
||||
// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine
|
||||
// interface methods.
|
||||
type LocalOverrideStorageDecorator interface {
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
interface looks redundant: using struct can be enough i think. not required to fix. interface looks redundant: using struct can be enough i think. not required to fix.
aarifullin
commented
If you are talking about using
instead If you are talking about using
```go
type accessPolicyEngine struct { /*...*/ }
```
instead `LocalOverrideStorageDecorator`, then I can agree your point is fair, but I really do not want to let control service manage database, i.e. provide access to `Init`, `Open`, `Close` and give only `LocalStorage` methods
|
||||
// LocalStorage method can be decorated by using sync primitives in the case if the local
|
||||
// override storage state should be consistent for chain router.
|
||||
LocalStorage() policyengine.LocalOverrideStorage
|
||||
}
|
||||
|
||||
// Option of the Server's constructor.
|
||||
type Option func(*cfg)
|
||||
|
||||
|
@ -65,7 +74,7 @@ type cfg struct {
|
|||
|
||||
cnrSrc container.Source
|
||||
|
||||
apeChainSrc container.AccessPolicyEngineChainSource
|
||||
localOverrideStorage LocalOverrideStorageDecorator
|
||||
|
||||
replicator *replicator.Replicator
|
||||
|
||||
|
@ -160,10 +169,10 @@ func WithTreeService(s TreeService) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithAPEChainSource returns the option to set access policy engine
|
||||
// chain source.
|
||||
func WithAPEChainSource(apeChainSrc container.AccessPolicyEngineChainSource) Option {
|
||||
// WithLocalOverrideStorage returns the option to set access policy engine
|
||||
// chain override storage.
|
||||
func WithLocalOverrideStorage(localOverrideStorage LocalOverrideStorageDecorator) Option {
|
||||
return func(c *cfg) {
|
||||
c.apeChainSrc = apeChainSrc
|
||||
c.localOverrideStorage = localOverrideStorage
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
package acl
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -12,34 +10,25 @@ import (
|
|||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
)
|
||||
|
||||
var errAPEChainNoSource = errors.New("could not get ape chain source for the container")
|
||||
|
||||
type apeCheckerImpl struct {
|
||||
log *logger.Logger
|
||||
apeSrc container.AccessPolicyEngineChainSource
|
||||
log *logger.Logger
|
||||
chainRouter policyengine.ChainRouter
|
||||
}
|
||||
|
||||
func NewAPEChecker(log *logger.Logger, apeSrc container.AccessPolicyEngineChainSource) v2.APEChainChecker {
|
||||
func NewAPEChecker(log *logger.Logger, chainRouter policyengine.ChainRouter) v2.APEChainChecker {
|
||||
return &apeCheckerImpl{
|
||||
log: log,
|
||||
apeSrc: apeSrc,
|
||||
log: log,
|
||||
chainRouter: chainRouter,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *apeCheckerImpl) CheckIfRequestPermitted(reqInfo v2.RequestInfo) error {
|
||||
cnr := reqInfo.ContainerID()
|
||||
|
||||
chainCache, err := c.apeSrc.GetChainSource(cnr)
|
||||
if err != nil {
|
||||
return errAPEChainNoSource
|
||||
}
|
||||
|
||||
request := new(Request)
|
||||
request.FromRequestInfo(reqInfo)
|
||||
|
||||
cnrTarget := getResource(reqInfo).Name()
|
||||
|
||||
status, ruleFound, err := chainCache.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
|
||||
status, ruleFound, err := c.chainRouter.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Info -> Warn
Fixed