frostfs-node/pkg/ape/chainbase/boltdb.go
aarifullin 0f45e3d344
All checks were successful
DCO action / DCO (pull_request) Successful in 2m10s
Vulncheck / Vulncheck (pull_request) Successful in 3m26s
Build / Build Components (1.20) (pull_request) Successful in 5m41s
Build / Build Components (1.21) (pull_request) Successful in 5m44s
Tests and linters / Staticcheck (pull_request) Successful in 7m10s
Tests and linters / Lint (pull_request) Successful in 8m14s
Tests and linters / Tests (1.21) (pull_request) Successful in 14m24s
Tests and linters / Tests (1.20) (pull_request) Successful in 14m41s
Tests and linters / Tests with -race (pull_request) Successful in 14m38s
[#804] ape: Implement boltdb storage for local overrides
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-12-07 19:08:41 +03:00

250 lines
6.6 KiB
Go

package chainbase
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
policyengine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
)
type boltLocalOverrideStorage struct {
*cfg
db *bbolt.DB
}
var (
chainBucket = []byte{0}
)
var (
ErrChainBucketNotFound = logicerr.New("chain root bucket has not been found")
ErrChainNotFound = logicerr.New("chain has not been found")
ErrGlobalNamespaceBucketNotFound = logicerr.New("global namespace bucket has not been found")
ErrTargetTypeBucketNotFound = logicerr.New("target type bucket has not been found")
ErrTargetNameBucketNotFound = logicerr.New("target name bucket has not been found")
)
// NewBoltLocalOverrideDatabase returns storage wrapper for storing access policy engine
// local overrides.
//
// chain storage (chainBucket):
// -> global namespace bucket (nBucket):
// --> target bucket (tBucket)
// ---> target name (resource) bucket (rBucket):
//
// | Key | Value |
// x---------------------x-------------------x
// | chain id (string) | serialized chain |
// x---------------------x-------------------x
//
//nolint:godot
func NewBoltLocalOverrideDatabase(opts ...Option) LocalOverrideDatabase {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &boltLocalOverrideStorage{
cfg: c,
}
}
func (cs *boltLocalOverrideStorage) Init() error {
return cs.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(chainBucket)
return err
})
}
func (cs *boltLocalOverrideStorage) Open(context.Context) error {
err := util.MkdirAllX(filepath.Dir(cs.path), cs.perm)
if err != nil {
return fmt.Errorf("can't create dir %s for the chain DB: %w", cs.path, err)
}
opts := *bbolt.DefaultOptions
opts.NoSync = cs.noSync
opts.Timeout = 100 * time.Millisecond
cs.db, err = bbolt.Open(cs.path, cs.perm, &opts)
if err != nil {
return fmt.Errorf("can't open the chain DB: %w", err)
}
cs.db.MaxBatchSize = cs.maxBatchSize
cs.db.MaxBatchDelay = cs.maxBatchDelay
return nil
}
func (cs *boltLocalOverrideStorage) Close() error {
var err error
if cs.db != nil {
err = cs.db.Close()
}
return err
}
func getTargetBucket(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
cbucket := tx.Bucket(chainBucket)
if cbucket == nil {
return nil, ErrChainBucketNotFound
}
nbucket := cbucket.Bucket([]byte(name))
if nbucket == nil {
return nil, fmt.Errorf("global namespace %s: %w", name, ErrGlobalNamespaceBucketNotFound)
}
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
if typeBucket == nil {
return nil, fmt.Errorf("type bucket '%c': %w", target.Type, ErrTargetTypeBucketNotFound)
}
rbucket := typeBucket.Bucket([]byte(target.Name))
if rbucket == nil {
return nil, fmt.Errorf("target name bucket %s: %w", target.Name, ErrTargetNameBucketNotFound)
}
return rbucket, nil
}
func getTargetBucketCreateIfEmpty(tx *bbolt.Tx, name chain.Name, target policyengine.Target) (*bbolt.Bucket, error) {
cbucket := tx.Bucket(chainBucket)
if cbucket == nil {
return nil, ErrChainBucketNotFound
}
nbucket := cbucket.Bucket([]byte(name))
if nbucket == nil {
var err error
nbucket, err = cbucket.CreateBucket([]byte(name))
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the global chain name %s: %w", name, err)
}
}
typeBucket := nbucket.Bucket([]byte{byte(target.Type)})
if typeBucket == nil {
var err error
typeBucket, err = cbucket.CreateBucket([]byte{byte(target.Type)})
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the target type '%c': %w", target.Type, err)
}
}
rbucket := typeBucket.Bucket([]byte(target.Name))
if rbucket == nil {
var err error
rbucket, err = typeBucket.CreateBucket([]byte(target.Name))
if err != nil {
return nil, fmt.Errorf("could not create a bucket for the target name %s: %w", target.Name, err)
}
}
return rbucket, nil
}
func (cs *boltLocalOverrideStorage) AddOverride(name chain.Name, target policyengine.Target, c *chain.Chain) (chain.ID, error) {
if c.ID == "" {
return "", fmt.Errorf("chain ID is not set")
}
serializedChain := c.Bytes()
err := cs.db.Update(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucketCreateIfEmpty(tx, name, target)
if err != nil {
return err
}
return rbuck.Put([]byte(c.ID), serializedChain)
})
return c.ID, err
}
func (cs *boltLocalOverrideStorage) GetOverride(name chain.Name, target policyengine.Target, chainID chain.ID) (*chain.Chain, error) {
var serializedChain []byte
if err := cs.db.View(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
serializedChain = rbuck.Get([]byte(chainID))
if serializedChain == nil {
return ErrChainNotFound
}
serializedChain = slice.Copy(serializedChain)
return nil
}); err != nil {
return nil, err
}
c := &chain.Chain{}
if err := json.Unmarshal(serializedChain, c); err != nil {
return nil, err
}
return c, nil
}
func (cs *boltLocalOverrideStorage) RemoveOverride(name chain.Name, target policyengine.Target, chainID chain.ID) error {
return cs.db.Update(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
return rbuck.Delete([]byte(chainID))
})
}
func (cs *boltLocalOverrideStorage) ListOverrides(name chain.Name, target policyengine.Target) ([]*chain.Chain, error) {
var serializedChains [][]byte
var serializedChain []byte
if err := cs.db.View(func(tx *bbolt.Tx) error {
rbuck, err := getTargetBucket(tx, name, target)
if err != nil {
return err
}
return rbuck.ForEach(func(_, v []byte) error {
serializedChain = slice.Copy(v)
serializedChains = append(serializedChains, serializedChain)
return nil
})
}); err != nil {
if errors.Is(err, ErrGlobalNamespaceBucketNotFound) || errors.Is(err, ErrTargetNameBucketNotFound) {
return []*chain.Chain{}, nil
}
return nil, err
}
chains := make([]*chain.Chain, 0, len(serializedChains))
for _, serializedChain = range serializedChains {
c := &chain.Chain{}
if err := json.Unmarshal(serializedChain, c); err != nil {
return nil, err
}
chains = append(chains, c)
}
return chains, nil
}
func (cs *boltLocalOverrideStorage) DropAllOverrides(name chain.Name) error {
return cs.db.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(name))
})
}