[#804] ape: Implement boltdb storage for local overrides

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2023-11-20 19:35:16 +03:00 committed by Airat Arifullin
parent e361e017f3
commit 0f45e3d344
15 changed files with 560 additions and 142 deletions

250
pkg/ape/chainbase/boltdb.go Normal file
View file

@ -0,0 +1,250 @@
package chainbase
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
)
type boltLocalOverrideStorage struct {
*cfg
db *bbolt.DB
}
var (
chainBucket = []byte{0}
)
var (
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
ErrChainNotFound = logicerr.New("chain has not been found")
ErrGlobalNamespaceBucketNotFound = logicerr.New("global namespace bucket has not been found")
ErrTargetTypeBucketNotFound = logicerr.New("target type bucket has not been found")
ErrTargetNameBucketNotFound = logicerr.New("target name bucket has not been found")
)
// NewBoltLocalOverrideDatabase returns storage wrapper for storing access policy engine
// local overrides.
//
// chain storage (chainBucket):
// -> global namespace bucket (nBucket):
// --> target bucket (tBucket)
// ---> target name (resource) bucket (rBucket):
//
// | Key | Value |
// x---------------------x-------------------x
// | chain id (string) | serialized chain |
// x---------------------x-------------------x
//
//nolint:godot
func NewBoltLocalOverrideDatabase(opts ...Option) LocalOverrideDatabase {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &boltLocalOverrideStorage{
cfg: c,
}
}
func (cs *boltLocalOverrideStorage) Init() error {
return cs.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(chainBucket)
return err
})
}
func (cs *boltLocalOverrideStorage) Open(context.Context) error {
err := util.MkdirAllX(filepath.Dir(cs.path), cs.perm)
if err != nil {
return fmt.Errorf("can't create dir %s for the chain DB: %w", cs.path, err)
}
opts := *bbolt.DefaultOptions
opts.NoSync = cs.noSync
opts.Timeout = 100 * time.Millisecond
cs.db, err = bbolt.Open(cs.path, cs.perm, &opts)
if err != nil {
return fmt.Errorf("can't open the chain DB: %w", err)
}
cs.db.MaxBatchSize = cs.maxBatchSize
cs.db.MaxBatchDelay = cs.maxBatchDelay
return nil
}
func (cs *boltLocalOverrideStorage) Close() error {
var err error
if cs.db != nil {
err = cs.db.Close()
}
return err
}
func getTargetBucket(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
cbucket := tx.Bucket(chainBucket)
if cbucket == nil {
return nil, ErrChainBucketNotFound
}
nbucket := cbucket.Bucket([]byte(name))
if nbucket == nil {
return nil, fmt.Errorf("global namespace %s: %w", name, ErrGlobalNamespaceBucketNotFound)
}
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
if typeBucket == nil {
return nil, fmt.Errorf("type bucket '%c': %w", target.Type, ErrTargetTypeBucketNotFound)
}
rbucket := typeBucket.Bucket([]byte(target.Name))
if rbucket == nil {
return nil, fmt.Errorf("target name bucket %s: %w", target.Name, ErrTargetNameBucketNotFound)
}
return rbucket, nil
}
func getTargetBucketCreateIfEmpty(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
cbucket := tx.Bucket(chainBucket)
if cbucket == nil {
return nil, ErrChainBucketNotFound
}
nbucket := cbucket.Bucket([]byte(name))
if nbucket == nil {
var err error
nbucket, err = cbucket.CreateBucket([]byte(name))
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the global chain name %s: %w", name, err)
}
}
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
if typeBucket == nil {
var err error
typeBucket, err = cbucket.CreateBucket([]byte{byte(target.Type)})
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the target type '%c': %w", target.Type, err)
}
}
rbucket := typeBucket.Bucket([]byte(target.Name))
if rbucket == nil {
var err error
rbucket, err = typeBucket.CreateBucket([]byte(target.Name))
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the target name %s: %w", target.Name, err)
}
}
return rbucket, nil
}
func (cs *boltLocalOverrideStorage) AddOverride(name chain.Name, target policyengine.Target, c *chain.Chain) (chain.ID, error) {
if c.ID == "" {
return "", fmt.Errorf("chain ID is not set")
}
serializedChain := c.Bytes()
err := cs.db.Update(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucketCreateIfEmpty(tx, name, target)
if err != nil {
return err
}
return rbuck.Put([]byte(c.ID), serializedChain)
})
return c.ID, err
}
func (cs *boltLocalOverrideStorage) GetOverride(name chain.Name, target policyengine.Target, chainID chain.ID) (*chain.Chain, error) {
var serializedChain []byte
if err := cs.db.View(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
serializedChain = rbuck.Get([]byte(chainID))
if serializedChain == nil {
return ErrChainNotFound
}
serializedChain = slice.Copy(serializedChain)
return nil
}); err != nil {
return nil, err
}
c := &chain.Chain{}
if err := json.Unmarshal(serializedChain, c); err != nil {
return nil, err
}
return c, nil
}
func (cs *boltLocalOverrideStorage) RemoveOverride(name chain.Name, target policyengine.Target, chainID chain.ID) error {
return cs.db.Update(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
return rbuck.Delete([]byte(chainID))
})
}
func (cs *boltLocalOverrideStorage) ListOverrides(name chain.Name, target policyengine.Target) ([]*chain.Chain, error) {
var serializedChains [][]byte
var serializedChain []byte
if err := cs.db.View(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
return rbuck.ForEach(func(_, v []byte) error {
serializedChain = slice.Copy(v)
serializedChains = append(serializedChains, serializedChain)
return nil
})
}); err != nil {
if errors.Is(err, ErrGlobalNamespaceBucketNotFound) || errors.Is(err, ErrTargetNameBucketNotFound) {
return []*chain.Chain{}, nil
}
return nil, err
}
chains := make([]*chain.Chain, 0, len(serializedChains))
for _, serializedChain = range serializedChains {
c := &chain.Chain{}
if err := json.Unmarshal(serializedChain, c); err != nil {
return nil, err
}
chains = append(chains, c)
}
return chains, nil
}
func (cs *boltLocalOverrideStorage) DropAllOverrides(name chain.Name) error {
return cs.db.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(name))
})
}

View file

@ -0,0 +1,30 @@
package chainbase
import (
"context"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
)
type inmemoryLocalOverrideStorage struct {
engine.LocalOverrideStorage
}
func NewInmemoryLocalOverrideDatabase() LocalOverrideDatabase {
return &inmemoryLocalOverrideStorage{
LocalOverrideStorage: inmemory.NewInmemoryLocalStorage(),
}
}
func (cs *inmemoryLocalOverrideStorage) Init() error {
return nil
}
func (cs *inmemoryLocalOverrideStorage) Open(_ context.Context) error {
return nil
}
func (cs *inmemoryLocalOverrideStorage) Close() error {
return nil
}

View file

@ -0,0 +1,22 @@
package chainbase
import (
"context"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
// DatabaseCore interface provides methods to initialize and manage local override storage
// as database.
type DatabaseCore interface {
Init() error
Open(context.Context) error
Close() error
}
// LocalOverrideDatabase interface provides methods to manage local override storage
// as database and as the APE's local override storage.
type LocalOverrideDatabase interface {
DatabaseCore
engine.LocalOverrideStorage
}

View file

@ -0,0 +1,67 @@
package chainbase
import (
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type Option func(*cfg)
type cfg struct {
path string
perm fs.FileMode
noSync bool
maxBatchDelay time.Duration
maxBatchSize int
log *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
perm: os.ModePerm,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
maxBatchSize: bbolt.DefaultMaxBatchSize,
log: &logger.Logger{Logger: zap.L()},
}
}
func WithPath(path string) Option {
return func(c *cfg) {
c.path = path
}
}
func WithPerm(perm fs.FileMode) Option {
return func(c *cfg) {
c.perm = perm
}
}
func WithNoSync(noSync bool) Option {
return func(c *cfg) {
c.noSync = noSync
}
}
func WithMaxBatchDelay(maxBatchDelay time.Duration) Option {
return func(c *cfg) {
c.maxBatchDelay = maxBatchDelay
}
}
func WithMaxBatchSize(maxBatchSize int) Option {
return func(c *cfg) {
c.maxBatchSize = maxBatchSize
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}

View file

@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
// Container groups information about the FrostFS container stored in the FrostFS network.
@ -72,10 +71,3 @@ type EACLSource interface {
// eACL table is not in source.
GetEACL(cid.ID) (*EACL, error)
}
// AccessPolicyEngineChainSource interface provides methods to access and manipulate
// policy engine chain storage.
type AccessPolicyEngineChainSource interface {
// TODO (aarifullin): Better to use simpler interface instead CachedChainStorage.
GetChainSource(cid cid.ID) (engine.LocalOverrideEngine, error)
}

View file

@ -4,34 +4,24 @@ import (
"context"
"errors"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// extractCID extracts CID from the schema's pattern.
// TODO (aarifullin): This is temporary solution should be replaced by
// resource name validation.
func extractCID(resource string) (cid.ID, error) {
var cidStr string
// Sscanf requires to make tokens delimited by spaces.
pattern := strings.Replace(nativeschema.ResourceFormatRootContainerObjects, "/", " ", -1)
resource = strings.Replace(resource, "/", " ", -1)
if _, err := fmt.Sscanf(resource, pattern, &cidStr); err != nil {
err = fmt.Errorf("could not parse the target name '%s' to CID: %w", resource, err)
return cid.ID{}, err
func apeTarget(chainTarget *control.ChainTarget) (engine.Target, error) {
switch chainTarget.GetType() {
case control.ChainTarget_CONTAINER:
return engine.ContainerTarget(chainTarget.GetName()), nil
case control.ChainTarget_NAMESPACE:
return engine.NamespaceTarget(chainTarget.GetName()), nil
default:
}
var cid cid.ID
err := cid.DecodeString(cidStr)
return cid, err
return engine.Target{}, status.Error(codes.InvalidArgument,
fmt.Errorf("target type is not supported: %s", chainTarget.GetType().String()).Error())
}
func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainLocalOverrideRequest) (*control.AddChainLocalOverrideResponse, error) {
@ -39,34 +29,23 @@ func (s *Server) AddChainLocalOverride(_ context.Context, req *control.AddChainL
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
var chain apechain.Chain
if err = chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
if err := chain.DecodeBytes(req.GetBody().GetChain()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
s.apeChainCounter.Add(1)
// TODO (aarifullin): the such chain id is not well-designed yet.
if chain.ID == "" {
chain.ID = apechain.ID(fmt.Sprintf("%s:%d", apechain.Ingress, s.apeChainCounter.Load()))
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
if _, err = src.LocalStorage().AddOverride(apechain.Ingress, engine.ContainerTarget(resource), &chain); err != nil {
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, err
}
if _, err = s.localOverrideStorage.LocalStorage().AddOverride(apechain.Ingress, target, &chain); err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
@ -87,23 +66,11 @@ func (s *Server) GetChainLocalOverride(_ context.Context, req *control.GetChainL
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
return nil, err
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
chain, err := src.LocalStorage().GetOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId()))
chain, err := s.localOverrideStorage.LocalStorage().GetOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId()))
if err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
@ -125,23 +92,12 @@ func (s *Server) ListChainLocalOverrides(_ context.Context, req *control.ListCha
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
return nil, err
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
chains, err := src.LocalStorage().ListOverrides(apechain.Ingress, engine.ContainerTarget(resource))
chains, err := s.localOverrideStorage.LocalStorage().ListOverrides(apechain.Ingress, target)
if err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
@ -167,23 +123,12 @@ func (s *Server) RemoveChainLocalOverride(_ context.Context, req *control.Remove
return nil, status.Error(codes.PermissionDenied, err.Error())
}
target := req.GetBody().GetTarget()
if target.Type != control.ChainTarget_CONTAINER {
return nil, status.Error(codes.Internal, fmt.Errorf("target type is not supported: %s", target.Type.String()).Error())
}
cid, err := extractCID(target.GetName())
target, err := apeTarget(req.GetBody().GetTarget())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
return nil, err
}
src, err := s.apeChainSrc.GetChainSource(cid)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resource := fmt.Sprintf(nativeschema.ResourceFormatRootContainerObjects, cid.EncodeToString())
if err = src.LocalStorage().RemoveOverride(apechain.Ingress, engine.ContainerTarget(resource), apechain.ID(req.GetBody().GetChainId())); err != nil {
if err = s.localOverrideStorage.LocalStorage().RemoveOverride(apechain.Ingress, target, apechain.ID(req.GetBody().GetChainId())); err != nil {
return nil, status.Error(getCodeByLocalStorageErr(err), err.Error())
}
resp := &control.RemoveChainLocalOverrideResponse{

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
// Server is an entity that serves
@ -51,6 +52,14 @@ type NodeState interface {
ForceMaintenance() error
}
// LocalOverrideStorageDecorator interface provides methods to decorate LocalOverrideEngine
// interface methods.
type LocalOverrideStorageDecorator interface {
// LocalStorage method can be decorated by using sync primitives in the case if the local
// override storage state should be consistent for chain router.
LocalStorage() policyengine.LocalOverrideStorage
}
// Option of the Server's constructor.
type Option func(*cfg)
@ -65,7 +74,7 @@ type cfg struct {
cnrSrc container.Source
apeChainSrc container.AccessPolicyEngineChainSource
localOverrideStorage LocalOverrideStorageDecorator
replicator *replicator.Replicator
@ -160,10 +169,10 @@ func WithTreeService(s TreeService) Option {
}
}
// WithAPEChainSource returns the option to set access policy engine
// chain source.
func WithAPEChainSource(apeChainSrc container.AccessPolicyEngineChainSource) Option {
// WithLocalOverrideStorage returns the option to set access policy engine
// chain override storage.
func WithLocalOverrideStorage(localOverrideStorage LocalOverrideStorageDecorator) Option {
return func(c *cfg) {
c.apeChainSrc = apeChainSrc
c.localOverrideStorage = localOverrideStorage
}
}

View file

@ -1,10 +1,8 @@
package acl
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -12,34 +10,25 @@ import (
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
)
var errAPEChainNoSource = errors.New("could not get ape chain source for the container")
type apeCheckerImpl struct {
log *logger.Logger
apeSrc container.AccessPolicyEngineChainSource
log *logger.Logger
chainRouter policyengine.ChainRouter
}
func NewAPEChecker(log *logger.Logger, apeSrc container.AccessPolicyEngineChainSource) v2.APEChainChecker {
func NewAPEChecker(log *logger.Logger, chainRouter policyengine.ChainRouter) v2.APEChainChecker {
return &apeCheckerImpl{
log: log,
apeSrc: apeSrc,
log: log,
chainRouter: chainRouter,
}
}
func (c *apeCheckerImpl) CheckIfRequestPermitted(reqInfo v2.RequestInfo) error {
cnr := reqInfo.ContainerID()
chainCache, err := c.apeSrc.GetChainSource(cnr)
if err != nil {
return errAPEChainNoSource
}
request := new(Request)
request.FromRequestInfo(reqInfo)
cnrTarget := getResource(reqInfo).Name()
status, ruleFound, err := chainCache.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
status, ruleFound, err := c.chainRouter.IsAllowed(apechain.Ingress, policyengine.NewRequestTargetWithContainer(cnrTarget), request)
if err != nil {
return err
}