From 0c9e4e6a356c2667923fc05fe92ff53a307c9303 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 8 Jul 2022 14:33:49 +0300 Subject: [PATCH] [#1523] blobstor: Unify request dispatch logic Signed-off-by: Evgenii Stratonikov --- .../blobstor/blobovniczatree/blobovnicza.go | 20 ++----- pkg/local_object_storage/blobstor/blobstor.go | 49 ++++++++++----- .../blobstor/blobstor_test.go | 4 +- .../blobstor/common/storage.go | 6 ++ pkg/local_object_storage/blobstor/control.go | 32 ++++++++-- pkg/local_object_storage/blobstor/delete.go | 50 +++++----------- pkg/local_object_storage/blobstor/exists.go | 40 +++++-------- .../blobstor/exists_test.go | 2 +- .../blobstor/fstree/control.go | 10 ++++ .../blobstor/fstree/fstree.go | 5 ++ pkg/local_object_storage/blobstor/get.go | 33 ++++------- .../blobstor/get_range.go | 59 ++++--------------- pkg/local_object_storage/blobstor/info.go | 2 +- pkg/local_object_storage/blobstor/iterate.go | 13 ++-- pkg/local_object_storage/blobstor/put.go | 31 +++++----- 15 files changed, 161 insertions(+), 195 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/fstree/control.go diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index 28a9c654..f5fb68c5 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -11,7 +11,6 @@ import ( "github.com/nspcc-dev/hrw" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -199,8 +198,6 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) { id = blobovnicza.NewIDFromBytes([]byte(p)) - storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("Blobovniczas PUT")) - return true, nil } @@ -575,17 +572,7 @@ func (b *Blobovniczas) getRangeFromLevel(prm common.GetRangePrm, blzPath string, // removes object from blobovnicza and returns common.DeleteRes. func (b *Blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp common.DeletePrm) (common.DeleteRes, error) { _, err := blz.Delete(prm) - if err != nil { - return common.DeleteRes{}, err - } - - storagelog.Write(b.log, - storagelog.AddressField(dp.Address), - storagelog.OpField("Blobovniczas DELETE"), - zap.Stringer("blobovnicza ID", blobovnicza.NewIDFromBytes(dp.StorageID)), - ) - - return common.DeleteRes{}, nil + return common.DeleteRes{}, err } // reads object from blobovnicza and returns GetSmallRes. @@ -895,3 +882,8 @@ func u64FromHexString(str string) uint64 { return v } + +// Type implements common.Storage. +func (b *Blobovniczas) Type() string { + return "blobovniczas" +} diff --git a/pkg/local_object_storage/blobstor/blobstor.go b/pkg/local_object_storage/blobstor/blobstor.go index 80ace006..1bd21746 100644 --- a/pkg/local_object_storage/blobstor/blobstor.go +++ b/pkg/local_object_storage/blobstor/blobstor.go @@ -7,13 +7,21 @@ import ( "sync" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" ) +// SubStorage represents single storage component with some storage policy. +type SubStorage struct { + Storage common.Storage + Policy func(*objectSDK.Object, []byte) bool +} + // BlobStor represents NeoFS local BLOB storage. type BlobStor struct { cfg @@ -21,7 +29,7 @@ type BlobStor struct { modeMtx sync.RWMutex mode mode.Mode - blobovniczas *blobovniczatree.Blobovniczas + storage [2]SubStorage } type Info = fstree.Info @@ -30,10 +38,11 @@ type Info = fstree.Info type Option func(*cfg) type cfg struct { - fsTree fstree.FSTree - compression.CConfig + fsTreeDepth int + fsTreeInfo fstree.Info + smallSizeLimit uint64 log *logger.Logger @@ -52,14 +61,10 @@ const blobovniczaDir = "blobovnicza" func initConfig(c *cfg) { *c = cfg{ - fsTree: fstree.FSTree{ - Depth: defaultShallowDepth, - DirNameLen: hex.EncodedLen(fstree.DirNameLen), - CConfig: &c.CConfig, - Info: Info{ - Permissions: defaultPerm, - RootPath: "./", - }, + fsTreeDepth: defaultShallowDepth, + fsTreeInfo: Info{ + Permissions: defaultPerm, + RootPath: "./", }, smallSizeLimit: defaultSmallSizeLimit, log: zap.L(), @@ -76,7 +81,21 @@ func New(opts ...Option) *BlobStor { opts[i](&bs.cfg) } - bs.blobovniczas = blobovniczatree.NewBlobovniczaTree(bs.blzOpts...) + bs.storage[0].Storage = blobovniczatree.NewBlobovniczaTree(bs.blzOpts...) + bs.storage[0].Policy = func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) <= bs.cfg.smallSizeLimit + } + + bs.storage[1].Storage = &fstree.FSTree{ + Info: bs.cfg.fsTreeInfo, + Depth: bs.cfg.fsTreeDepth, + DirNameLen: hex.EncodedLen(fstree.DirNameLen), + CConfig: &bs.cfg.CConfig, + } + bs.storage[1].Policy = func(*objectSDK.Object, []byte) bool { + return true + } + bs.blzOpts = nil return bs @@ -97,7 +116,7 @@ func WithShallowDepth(depth int) Option { depth = fstree.MaxDepth } - c.fsTree.Depth = depth + c.fsTreeDepth = depth } } @@ -127,7 +146,7 @@ func WithUncompressableContentTypes(values []string) Option { // of the fs tree to write the objects. func WithRootPath(rootDir string) Option { return func(c *cfg) { - c.fsTree.RootPath = rootDir + c.fsTreeInfo.RootPath = rootDir c.blzOpts = append(c.blzOpts, blobovniczatree.WithRootPath(filepath.Join(rootDir, blobovniczaDir))) } } @@ -136,7 +155,7 @@ func WithRootPath(rootDir string) Option { // bits of the fs tree. func WithRootPerm(perm fs.FileMode) Option { return func(c *cfg) { - c.fsTree.Permissions = perm + c.fsTreeInfo.Permissions = perm c.blzOpts = append(c.blzOpts, blobovniczatree.WithPermissions(perm)) } } diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go index b8700f20..459563af 100644 --- a/pkg/local_object_storage/blobstor/blobstor_test.go +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -38,11 +38,11 @@ func TestCompression(t *testing.T) { } testGet := func(t *testing.T, b *BlobStor, i int) { - res1, err := b.getSmall(common.GetPrm{Address: object.AddressOf(smallObj[i])}) + res1, err := b.Get(common.GetPrm{Address: object.AddressOf(smallObj[i])}) require.NoError(t, err) require.Equal(t, smallObj[i], res1.Object) - res2, err := b.getBig(common.GetPrm{Address: object.AddressOf(bigObj[i])}) + res2, err := b.Get(common.GetPrm{Address: object.AddressOf(bigObj[i])}) require.NoError(t, err) require.Equal(t, bigObj[i], res2.Object) } diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 5674333b..13e0641a 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -3,6 +3,12 @@ package common // Storage represents key-value object storage. // It is used as a building block for a blobstor of a shard. type Storage interface { + Open(readOnly bool) error + Init() error + Close() error + + Type() string + Get(GetPrm) (GetRes, error) GetRange(GetRangePrm) (GetRangeRes, error) Exists(ExistsPrm) (ExistsRes, error) diff --git a/pkg/local_object_storage/blobstor/control.go b/pkg/local_object_storage/blobstor/control.go index 6c1cc70e..785f6d7b 100644 --- a/pkg/local_object_storage/blobstor/control.go +++ b/pkg/local_object_storage/blobstor/control.go @@ -3,13 +3,21 @@ package blobstor import ( "errors" "fmt" + + "go.uber.org/zap" ) // Open opens BlobStor. func (b *BlobStor) Open(readOnly bool) error { b.log.Debug("opening...") - return b.blobovniczas.Open(readOnly) + for i := range b.storage { + err := b.storage[i].Storage.Open(readOnly) + if err != nil { + return err + } + } + return nil } // ErrInitBlobovniczas is returned when blobovnicza initialization fails. @@ -23,11 +31,12 @@ var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stag func (b *BlobStor) Init() error { b.log.Debug("initializing...") - err := b.blobovniczas.Init() - if err != nil { - return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err) + for i := range b.storage { + err := b.storage[i].Storage.Init() + if err != nil { + return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err) + } } - return nil } @@ -35,5 +44,16 @@ func (b *BlobStor) Init() error { func (b *BlobStor) Close() error { b.log.Debug("closing...") - return b.blobovniczas.Close() + var firstErr error + for i := range b.storage { + err := b.storage[i].Storage.Close() + if err != nil { + b.log.Info("couldn't close storage", zap.String("error", err.Error())) + if firstErr == nil { + firstErr = err + } + continue + } + } + return firstErr } diff --git a/pkg/local_object_storage/blobstor/delete.go b/pkg/local_object_storage/blobstor/delete.go index 6d9ef673..c605bb55 100644 --- a/pkg/local_object_storage/blobstor/delete.go +++ b/pkg/local_object_storage/blobstor/delete.go @@ -6,47 +6,27 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "go.uber.org/zap" ) func (b *BlobStor) Delete(prm common.DeletePrm) (common.DeleteRes, error) { if prm.StorageID == nil { - // Nothing specified, try everything. - res, err := b.deleteBig(prm) - if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { - return res, err + for i := range b.storage { + res, err := b.storage[i].Storage.Delete(prm) + if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { + if err == nil { + storagelog.Write(b.log, + storagelog.AddressField(prm.Address), + storagelog.OpField("DELETE"), + zap.String("type", b.storage[i].Storage.Type()), + zap.String("storage ID", string(prm.StorageID))) + } + return res, err + } } - return b.deleteSmall(prm) } if len(prm.StorageID) == 0 { - return b.deleteBig(prm) + return b.storage[1].Storage.Delete(prm) } - return b.deleteSmall(prm) -} - -// deleteBig removes an object from shallow dir of BLOB storage. -// -// Returns any error encountered that did not allow -// to completely remove the object. -// -// Returns an error of type apistatus.ObjectNotFound if there is no object to delete. -func (b *BlobStor) deleteBig(prm common.DeletePrm) (common.DeleteRes, error) { - res, err := b.fsTree.Delete(prm) - if err == nil { - storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree DELETE")) - } - - return res, err -} - -// deleteSmall removes an object from blobovnicza of BLOB storage. -// -// If blobovnicza ID is not set or set to nil, BlobStor tries to -// find and remove object from any blobovnicza. -// -// Returns any error encountered that did not allow -// to completely remove the object. -// -// Returns an error of type apistatus.ObjectNotFound if there is no object to delete. -func (b *BlobStor) deleteSmall(prm common.DeletePrm) (common.DeleteRes, error) { - return b.blobovniczas.Delete(prm) + return b.storage[0].Storage.Delete(prm) } diff --git a/pkg/local_object_storage/blobstor/exists.go b/pkg/local_object_storage/blobstor/exists.go index 3300e988..015d6e84 100644 --- a/pkg/local_object_storage/blobstor/exists.go +++ b/pkg/local_object_storage/blobstor/exists.go @@ -10,9 +10,6 @@ import ( // Returns any error encountered that did not allow // to completely check object existence. func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { - // check presence in shallow dir first (cheaper) - res, err := b.existsBig(prm) - // If there was an error during existence check below, // it will be returned unless object was found in blobovnicza. // Otherwise, it is logged and the latest error is returned. @@ -22,30 +19,25 @@ func (b *BlobStor) Exists(prm common.ExistsPrm) (common.ExistsRes, error) { // error | found | log the error, return true, nil // error | not found | return the error // error | error | log the first error, return the second - if !res.Exists { - var smallErr error - - res, smallErr = b.existsSmall(prm) - if err != nil && (smallErr != nil || res.Exists) { - b.log.Warn("error occured during object existence checking", - zap.Stringer("address", prm.Address), - zap.String("error", err.Error())) - err = nil - } - if err == nil { - err = smallErr + var errors []error + for i := range b.storage { + res, err := b.storage[i].Storage.Exists(prm) + if err == nil && res.Exists { + return res, nil + } else if err != nil { + errors = append(errors, err) } } - return res, err -} + if len(errors) == 0 { + return common.ExistsRes{}, nil + } -// checks if object is presented in shallow dir. -func (b *BlobStor) existsBig(prm common.ExistsPrm) (common.ExistsRes, error) { - return b.fsTree.Exists(prm) -} + for _, err := range errors[:len(errors)-1] { + b.log.Warn("error occured during object existence checking", + zap.Stringer("address", prm.Address), + zap.String("error", err.Error())) + } -// existsSmall checks if object is presented in blobovnicza. -func (b *BlobStor) existsSmall(prm common.ExistsPrm) (common.ExistsRes, error) { - return b.blobovniczas.Exists(prm) + return common.ExistsRes{}, errors[len(errors)-1] } diff --git a/pkg/local_object_storage/blobstor/exists_test.go b/pkg/local_object_storage/blobstor/exists_test.go index cb62e08a..7a1ad187 100644 --- a/pkg/local_object_storage/blobstor/exists_test.go +++ b/pkg/local_object_storage/blobstor/exists_test.go @@ -65,7 +65,7 @@ func TestExists(t *testing.T) { require.NotEmpty(t, bigDir) require.NoError(t, os.Chmod(dir, 0)) - t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTree.Permissions)) }) + t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTreeInfo.Permissions)) }) // Object exists, first error is logged. prm.Address = objectCore.AddressOf(objects[0]) diff --git a/pkg/local_object_storage/blobstor/fstree/control.go b/pkg/local_object_storage/blobstor/fstree/control.go new file mode 100644 index 00000000..eb82cf17 --- /dev/null +++ b/pkg/local_object_storage/blobstor/fstree/control.go @@ -0,0 +1,10 @@ +package fstree + +// Open implements common.Storage. +func (*FSTree) Open(bool) error { return nil } + +// Init implements common.Storage. +func (*FSTree) Init() error { return nil } + +// Close implements common.Storage. +func (*FSTree) Close() error { return nil } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 3865b567..f987857b 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -294,3 +294,8 @@ func (t *FSTree) NumberOfObjects() (uint64, error) { return counter, nil } + +// Type implements common.Storage. +func (*FSTree) Type() string { + return "fstree" +} diff --git a/pkg/local_object_storage/blobstor/get.go b/pkg/local_object_storage/blobstor/get.go index 97b7ebea..25bfd7b8 100644 --- a/pkg/local_object_storage/blobstor/get.go +++ b/pkg/local_object_storage/blobstor/get.go @@ -12,31 +12,18 @@ import ( // Otherwise, each sub-storage is tried in order. func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) { if prm.StorageID == nil { - // Nothing specified, try everything. - res, err := b.getBig(prm) - if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { - return res, err + for i := range b.storage { + res, err := b.storage[i].Storage.Get(prm) + if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { + return res, err + } } - return b.getSmall(prm) + + var errNotFound apistatus.ObjectNotFound + return common.GetRes{}, errNotFound } if len(prm.StorageID) == 0 { - return b.getBig(prm) + return b.storage[1].Storage.Get(prm) } - return b.getSmall(prm) -} - -// getBig reads the object from shallow dir of BLOB storage by address. -// -// Returns any error encountered that -// did not allow to completely read the object. -// -// Returns an error of type apistatus.ObjectNotFound if the requested object is not -// presented in shallow dir. -func (b *BlobStor) getBig(prm common.GetPrm) (common.GetRes, error) { - // get compressed object data - return b.fsTree.Get(prm) -} - -func (b *BlobStor) getSmall(prm common.GetPrm) (common.GetRes, error) { - return b.blobovniczas.Get(prm) + return b.storage[0].Storage.Get(prm) } diff --git a/pkg/local_object_storage/blobstor/get_range.go b/pkg/local_object_storage/blobstor/get_range.go index a161c638..c5289f87 100644 --- a/pkg/local_object_storage/blobstor/get_range.go +++ b/pkg/local_object_storage/blobstor/get_range.go @@ -12,57 +12,18 @@ import ( // Otherwise, each sub-storage is tried in order. func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) { if prm.StorageID == nil { - // Nothing specified, try everything. - res, err := b.getRangeBig(prm) - if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { - return res, err + for i := range b.storage { + res, err := b.storage[i].Storage.GetRange(prm) + if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) { + return res, err + } } - return b.getRangeSmall(prm) + + var errNotFound apistatus.ObjectNotFound + return common.GetRangeRes{}, errNotFound } if len(prm.StorageID) == 0 { - return b.getRangeBig(prm) + return b.storage[1].Storage.GetRange(prm) } - return b.getRangeSmall(prm) -} - -// getRangeBig reads data of object payload range from shallow dir of BLOB storage. -// -// Returns any error encountered that -// did not allow to completely read the object payload range. -// -// Returns ErrRangeOutOfBounds if the requested object range is out of bounds. -// Returns an error of type apistatus.ObjectNotFound if object is missing. -func (b *BlobStor) getRangeBig(prm common.GetRangePrm) (common.GetRangeRes, error) { - // get compressed object data - res, err := b.fsTree.Get(common.GetPrm{Address: prm.Address}) - if err != nil { - return common.GetRangeRes{}, err - } - - payload := res.Object.Payload() - ln, off := prm.Range.GetLength(), prm.Range.GetOffset() - - if pLen := uint64(len(payload)); ln+off < off || pLen < off || pLen < ln+off { - var errOutOfRange apistatus.ObjectOutOfRange - - return common.GetRangeRes{}, errOutOfRange - } - - return common.GetRangeRes{ - Data: payload[off : off+ln], - }, nil -} - -// getRangeSmall reads data of object payload range from blobovnicza of BLOB storage. -// -// If blobovnicza ID is not set or set to nil, BlobStor tries to get payload range -// from any blobovnicza. -// -// Returns any error encountered that -// did not allow to completely read the object payload range. -// -// Returns ErrRangeOutOfBounds if the requested object range is out of bounds. -// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s). -func (b *BlobStor) getRangeSmall(prm common.GetRangePrm) (common.GetRangeRes, error) { - return b.blobovniczas.GetRange(prm) + return b.storage[0].Storage.GetRange(prm) } diff --git a/pkg/local_object_storage/blobstor/info.go b/pkg/local_object_storage/blobstor/info.go index 9f24212c..87ce5606 100644 --- a/pkg/local_object_storage/blobstor/info.go +++ b/pkg/local_object_storage/blobstor/info.go @@ -4,5 +4,5 @@ import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree // DumpInfo returns information about blob stor. func (b *BlobStor) DumpInfo() fstree.Info { - return b.fsTree.Info + return b.cfg.fsTreeInfo } diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index 0a737a39..23f67b92 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -16,14 +16,11 @@ import ( // // If handler returns an error, method wraps and returns it immediately. func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) { - _, err := b.blobovniczas.Iterate(prm) - if err != nil && !prm.IgnoreErrors { - return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err) - } - - _, err = b.fsTree.Iterate(prm) - if err != nil && !prm.IgnoreErrors { - return common.IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err) + for i := range b.storage { + _, err := b.storage[i].Storage.Iterate(prm) + if err != nil && !prm.IgnoreErrors { + return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err) + } } return common.IterateRes{}, nil } diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go index af8db586..9e7b7c86 100644 --- a/pkg/local_object_storage/blobstor/put.go +++ b/pkg/local_object_storage/blobstor/put.go @@ -1,12 +1,14 @@ package blobstor import ( + "errors" "fmt" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + "go.uber.org/zap" ) // Put saves the object in BLOB storage. @@ -30,21 +32,21 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) { prm.RawData = data } - big := b.isBig(prm.RawData) - - if big { - _, err := b.fsTree.Put(prm) - if err != nil { - return common.PutRes{}, err + for i := range b.storage { + if b.storage[i].Policy(prm.Object, prm.RawData) { + res, err := b.storage[i].Storage.Put(prm) + if err == nil { + storagelog.Write(b.log, + storagelog.AddressField(prm.Address), + storagelog.OpField("PUT"), + zap.String("type", b.storage[i].Storage.Type()), + zap.String("storage ID", string(res.StorageID))) + } + return res, err } - - storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT")) - - return common.PutRes{}, nil } - // save object in blobovnicza - return b.blobovniczas.Put(prm) + return common.PutRes{}, errors.New("couldn't find a place to store an object") } // NeedsCompression returns true if the object should be compressed. @@ -54,8 +56,3 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) { func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool { return b.cfg.CConfig.NeedsCompression(obj) } - -// checks if object is "big". -func (b *BlobStor) isBig(data []byte) bool { - return uint64(len(data)) > b.smallSizeLimit -}