[] blobstor: Unify request dispatch logic

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-07-08 14:33:49 +03:00 committed by fyrchik
parent 266458fe5c
commit 0c9e4e6a35
15 changed files with 161 additions and 195 deletions

View file

@ -11,7 +11,6 @@ import (
"github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@ -199,8 +198,6 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
id = blobovnicza.NewIDFromBytes([]byte(p))
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("Blobovniczas PUT"))
return true, nil
}
@ -575,17 +572,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string,
// removes object from blobovnicza and returns common.DeleteRes.
func (b *Blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp common.DeletePrm) (common.DeleteRes, error) {
_, err := blz.Delete(prm)
if err != nil {
return common.DeleteRes{}, err
}
storagelog.Write(b.log,
storagelog.AddressField(dp.Address),
storagelog.OpField("Blobovniczas DELETE"),
zap.Stringer("blobovnicza ID", blobovnicza.NewIDFromBytes(dp.StorageID)),
)
return common.DeleteRes{}, nil
return common.DeleteRes{}, err
}
// reads object from blobovnicza and returns GetSmallRes.
@ -895,3 +882,8 @@ func u64FromHexString(str string) uint64 {
return v
}
// Type implements common.Storage.
func (b *Blobovniczas) Type() string {
return "blobovniczas"
}

View file

@ -7,13 +7,21 @@ import (
"sync"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)
// SubStorage represents single storage component with some storage policy.
type SubStorage struct {
Storage common.Storage
Policy func(*objectSDK.Object, []byte) bool
}
// BlobStor represents NeoFS local BLOB storage.
type BlobStor struct {
cfg
@ -21,7 +29,7 @@ type BlobStor struct {
modeMtx sync.RWMutex
mode mode.Mode
blobovniczas *blobovniczatree.Blobovniczas
storage [2]SubStorage
}
type Info = fstree.Info
@ -30,10 +38,11 @@ type Info = fstree.Info
type Option func(*cfg)
type cfg struct {
fsTree fstree.FSTree
compression.CConfig
fsTreeDepth int
fsTreeInfo fstree.Info
smallSizeLimit uint64
log *logger.Logger
@ -52,14 +61,10 @@ const blobovniczaDir = "blobovnicza"
func initConfig(c *cfg) {
*c = cfg{
fsTree: fstree.FSTree{
Depth: defaultShallowDepth,
DirNameLen: hex.EncodedLen(fstree.DirNameLen),
CConfig: &c.CConfig,
Info: Info{
Permissions: defaultPerm,
RootPath: "./",
},
fsTreeDepth: defaultShallowDepth,
fsTreeInfo: Info{
Permissions: defaultPerm,
RootPath: "./",
},
smallSizeLimit: defaultSmallSizeLimit,
log: zap.L(),
@ -76,7 +81,21 @@ func New(opts ...Option) *BlobStor {
opts[i](&bs.cfg)
}
bs.blobovniczas = blobovniczatree.NewBlobovniczaTree(bs.blzOpts...)
bs.storage[0].Storage = blobovniczatree.NewBlobovniczaTree(bs.blzOpts...)
bs.storage[0].Policy = func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= bs.cfg.smallSizeLimit
}
bs.storage[1].Storage = &fstree.FSTree{
Info: bs.cfg.fsTreeInfo,
Depth: bs.cfg.fsTreeDepth,
DirNameLen: hex.EncodedLen(fstree.DirNameLen),
CConfig: &bs.cfg.CConfig,
}
bs.storage[1].Policy = func(*objectSDK.Object, []byte) bool {
return true
}
bs.blzOpts = nil
return bs
@ -97,7 +116,7 @@ func WithShallowDepth(depth int) Option {
depth = fstree.MaxDepth
}
c.fsTree.Depth = depth
c.fsTreeDepth = depth
}
}
@ -127,7 +146,7 @@ func WithUncompressableContentTypes(values []string) Option {
// of the fs tree to write the objects.
func WithRootPath(rootDir string) Option {
return func(c *cfg) {
c.fsTree.RootPath = rootDir
c.fsTreeInfo.RootPath = rootDir
c.blzOpts = append(c.blzOpts, blobovniczatree.WithRootPath(filepath.Join(rootDir, blobovniczaDir)))
}
}
@ -136,7 +155,7 @@ func WithRootPath(rootDir string) Option {
// bits of the fs tree.
func WithRootPerm(perm fs.FileMode) Option {
return func(c *cfg) {
c.fsTree.Permissions = perm
c.fsTreeInfo.Permissions = perm
c.blzOpts = append(c.blzOpts, blobovniczatree.WithPermissions(perm))
}
}

View file

@ -38,11 +38,11 @@ func TestCompression(t *testing.T) {
}
testGet := func(t *testing.T, b *BlobStor, i int) {
res1, err := b.getSmall(common.GetPrm{Address: object.AddressOf(smallObj[i])})
res1, err := b.Get(common.GetPrm{Address: object.AddressOf(smallObj[i])})
require.NoError(t, err)
require.Equal(t, smallObj[i], res1.Object)
res2, err := b.getBig(common.GetPrm{Address: object.AddressOf(bigObj[i])})
res2, err := b.Get(common.GetPrm{Address: object.AddressOf(bigObj[i])})
require.NoError(t, err)
require.Equal(t, bigObj[i], res2.Object)
}

View file

@ -3,6 +3,12 @@ package common
// Storage represents key-value object storage.
// It is used as a building block for a blobstor of a shard.
type Storage interface {
Open(readOnly bool) error
Init() error
Close() error
Type() string
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)
Exists(ExistsPrm) (ExistsRes, error)

View file

@ -3,13 +3,21 @@ package blobstor
import (
"errors"
"fmt"
"go.uber.org/zap"
)
// Open opens BlobStor.
func (b *BlobStor) Open(readOnly bool) error {
b.log.Debug("opening...")
return b.blobovniczas.Open(readOnly)
for i := range b.storage {
err := b.storage[i].Storage.Open(readOnly)
if err != nil {
return err
}
}
return nil
}
// ErrInitBlobovniczas is returned when blobovnicza initialization fails.
@ -23,11 +31,12 @@ var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stag
func (b *BlobStor) Init() error {
b.log.Debug("initializing...")
err := b.blobovniczas.Init()
if err != nil {
return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err)
for i := range b.storage {
err := b.storage[i].Storage.Init()
if err != nil {
return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err)
}
}
return nil
}
@ -35,5 +44,16 @@ func (b *BlobStor) Init() error {
func (b *BlobStor) Close() error {
b.log.Debug("closing...")
return b.blobovniczas.Close()
var firstErr error
for i := range b.storage {
err := b.storage[i].Storage.Close()
if err != nil {
b.log.Info("couldn't close storage", zap.String("error", err.Error()))
if firstErr == nil {
firstErr = err
}
continue
}
}
return firstErr
}

View file

@ -6,47 +6,27 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"go.uber.org/zap"
)
func (b *BlobStor) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
if prm.StorageID == nil {
// Nothing specified, try everything.
res, err := b.deleteBig(prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
for i := range b.storage {
res, err := b.storage[i].Storage.Delete(prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
if err == nil {
storagelog.Write(b.log,
storagelog.AddressField(prm.Address),
storagelog.OpField("DELETE"),
zap.String("type", b.storage[i].Storage.Type()),
zap.String("storage ID", string(prm.StorageID)))
}
return res, err
}
}
return b.deleteSmall(prm)
}
if len(prm.StorageID) == 0 {
return b.deleteBig(prm)
return b.storage[1].Storage.Delete(prm)
}
return b.deleteSmall(prm)
}
// deleteBig removes an object from shallow dir of BLOB storage.
//
// Returns any error encountered that did not allow
// to completely remove the object.
//
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
func (b *BlobStor) deleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
res, err := b.fsTree.Delete(prm)
if err == nil {
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree DELETE"))
}
return res, err
}
// deleteSmall removes an object from blobovnicza of BLOB storage.
//
// If blobovnicza ID is not set or set to nil, BlobStor tries to
// find and remove object from any blobovnicza.
//
// Returns any error encountered that did not allow
// to completely remove the object.
//
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
func (b *BlobStor) deleteSmall(prm common.DeletePrm) (common.DeleteRes, error) {
return b.blobovniczas.Delete(prm)
return b.storage[0].Storage.Delete(prm)
}

View file

@ -10,9 +10,6 @@ import (
// Returns any error encountered that did not allow
// to completely check object existence.
func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
// check presence in shallow dir first (cheaper)
res, err := b.existsBig(prm)
// If there was an error during existence check below,
// it will be returned unless object was found in blobovnicza.
// Otherwise, it is logged and the latest error is returned.
@ -22,30 +19,25 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) {
// error | found | log the error, return true, nil
// error | not found | return the error
// error | error | log the first error, return the second
if !res.Exists {
var smallErr error
res, smallErr = b.existsSmall(prm)
if err != nil && (smallErr != nil || res.Exists) {
b.log.Warn("error occured during object existence checking",
zap.Stringer("address", prm.Address),
zap.String("error", err.Error()))
err = nil
}
if err == nil {
err = smallErr
var errors []error
for i := range b.storage {
res, err := b.storage[i].Storage.Exists(prm)
if err == nil && res.Exists {
return res, nil
} else if err != nil {
errors = append(errors, err)
}
}
return res, err
}
if len(errors) == 0 {
return common.ExistsRes{}, nil
}
// checks if object is presented in shallow dir.
func (b *BlobStor) existsBig(prm common.ExistsPrm) (common.ExistsRes, error) {
return b.fsTree.Exists(prm)
}
for _, err := range errors[:len(errors)-1] {
b.log.Warn("error occured during object existence checking",
zap.Stringer("address", prm.Address),
zap.String("error", err.Error()))
}
// existsSmall checks if object is presented in blobovnicza.
func (b *BlobStor) existsSmall(prm common.ExistsPrm) (common.ExistsRes, error) {
return b.blobovniczas.Exists(prm)
return common.ExistsRes{}, errors[len(errors)-1]
}

View file

@ -65,7 +65,7 @@ func TestExists(t *testing.T) {
require.NotEmpty(t, bigDir)
require.NoError(t, os.Chmod(dir, 0))
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTree.Permissions)) })
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTreeInfo.Permissions)) })
// Object exists, first error is logged.
prm.Address = objectCore.AddressOf(objects[0])

View file

@ -0,0 +1,10 @@
package fstree
// Open implements common.Storage.
func (*FSTree) Open(bool) error { return nil }
// Init implements common.Storage.
func (*FSTree) Init() error { return nil }
// Close implements common.Storage.
func (*FSTree) Close() error { return nil }

View file

@ -294,3 +294,8 @@ func (t *FSTree) NumberOfObjects() (uint64, error) {
return counter, nil
}
// Type implements common.Storage.
func (*FSTree) Type() string {
return "fstree"
}

View file

@ -12,31 +12,18 @@ import (
// Otherwise, each sub-storage is tried in order.
func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
if prm.StorageID == nil {
// Nothing specified, try everything.
res, err := b.getBig(prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
for i := range b.storage {
res, err := b.storage[i].Storage.Get(prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
}
}
return b.getSmall(prm)
var errNotFound apistatus.ObjectNotFound
return common.GetRes{}, errNotFound
}
if len(prm.StorageID) == 0 {
return b.getBig(prm)
return b.storage[1].Storage.Get(prm)
}
return b.getSmall(prm)
}
// getBig reads the object from shallow dir of BLOB storage by address.
//
// Returns any error encountered that
// did not allow to completely read the object.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
// presented in shallow dir.
func (b *BlobStor) getBig(prm common.GetPrm) (common.GetRes, error) {
// get compressed object data
return b.fsTree.Get(prm)
}
func (b *BlobStor) getSmall(prm common.GetPrm) (common.GetRes, error) {
return b.blobovniczas.Get(prm)
return b.storage[0].Storage.Get(prm)
}

View file

@ -12,57 +12,18 @@ import (
// Otherwise, each sub-storage is tried in order.
func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
if prm.StorageID == nil {
// Nothing specified, try everything.
res, err := b.getRangeBig(prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
for i := range b.storage {
res, err := b.storage[i].Storage.GetRange(prm)
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
return res, err
}
}
return b.getRangeSmall(prm)
var errNotFound apistatus.ObjectNotFound
return common.GetRangeRes{}, errNotFound
}
if len(prm.StorageID) == 0 {
return b.getRangeBig(prm)
return b.storage[1].Storage.GetRange(prm)
}
return b.getRangeSmall(prm)
}
// getRangeBig reads data of object payload range from shallow dir of BLOB storage.
//
// Returns any error encountered that
// did not allow to completely read the object payload range.
//
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
// Returns an error of type apistatus.ObjectNotFound if object is missing.
func (b *BlobStor) getRangeBig(prm common.GetRangePrm) (common.GetRangeRes, error) {
// get compressed object data
res, err := b.fsTree.Get(common.GetPrm{Address: prm.Address})
if err != nil {
return common.GetRangeRes{}, err
}
payload := res.Object.Payload()
ln, off := prm.Range.GetLength(), prm.Range.GetOffset()
if pLen := uint64(len(payload)); ln+off < off || pLen < off || pLen < ln+off {
var errOutOfRange apistatus.ObjectOutOfRange
return common.GetRangeRes{}, errOutOfRange
}
return common.GetRangeRes{
Data: payload[off : off+ln],
}, nil
}
// getRangeSmall reads data of object payload range from blobovnicza of BLOB storage.
//
// If blobovnicza ID is not set or set to nil, BlobStor tries to get payload range
// from any blobovnicza.
//
// Returns any error encountered that
// did not allow to completely read the object payload range.
//
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
func (b *BlobStor) getRangeSmall(prm common.GetRangePrm) (common.GetRangeRes, error) {
return b.blobovniczas.GetRange(prm)
return b.storage[0].Storage.GetRange(prm)
}

View file

@ -4,5 +4,5 @@ import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree
// DumpInfo returns information about blob stor.
func (b *BlobStor) DumpInfo() fstree.Info {
return b.fsTree.Info
return b.cfg.fsTreeInfo
}

View file

@ -16,14 +16,11 @@ import (
//
// If handler returns an error, method wraps and returns it immediately.
func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
_, err := b.blobovniczas.Iterate(prm)
if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err)
}
_, err = b.fsTree.Iterate(prm)
if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err)
for i := range b.storage {
_, err := b.storage[i].Storage.Iterate(prm)
if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err)
}
}
return common.IterateRes{}, nil
}

View file

@ -1,12 +1,14 @@
package blobstor
import (
"errors"
"fmt"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)
// Put saves the object in BLOB storage.
@ -30,21 +32,21 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
prm.RawData = data
}
big := b.isBig(prm.RawData)
if big {
_, err := b.fsTree.Put(prm)
if err != nil {
return common.PutRes{}, err
for i := range b.storage {
if b.storage[i].Policy(prm.Object, prm.RawData) {
res, err := b.storage[i].Storage.Put(prm)
if err == nil {
storagelog.Write(b.log,
storagelog.AddressField(prm.Address),
storagelog.OpField("PUT"),
zap.String("type", b.storage[i].Storage.Type()),
zap.String("storage ID", string(res.StorageID)))
}
return res, err
}
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT"))
return common.PutRes{}, nil
}
// save object in blobovnicza
return b.blobovniczas.Put(prm)
return common.PutRes{}, errors.New("couldn't find a place to store an object")
}
// NeedsCompression returns true if the object should be compressed.
@ -54,8 +56,3 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
return b.cfg.CConfig.NeedsCompression(obj)
}
// checks if object is "big".
func (b *BlobStor) isBig(data []byte) bool {
return uint64(len(data)) > b.smallSizeLimit
}