From de2934aeaaf0223b4d97a18d5cbccc03b9a743af Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 9 Nov 2022 13:59:24 +0300 Subject: [PATCH] [#1985] blobstor: Allow to report multiple errors to caller Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/blobovnicza/put.go | 6 +- .../blobstor/blobovniczatree/blobovnicza.go | 8 +- .../blobovniczatree/blobovnicza_test.go | 165 ------------------ .../blobstor/blobovniczatree/errors.go | 5 + .../blobstor/blobovniczatree/option.go | 3 + .../blobstor/blobovniczatree/put.go | 31 ++-- pkg/local_object_storage/blobstor/blobstor.go | 8 + .../blobstor/common/storage.go | 3 + .../blobstor/fstree/fstree.go | 5 + pkg/local_object_storage/shard/shard.go | 10 +- 10 files changed, 61 insertions(+), 183 deletions(-) delete mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go diff --git a/pkg/local_object_storage/blobovnicza/put.go b/pkg/local_object_storage/blobovnicza/put.go index dbbec6d76..a2b4d61fd 100644 --- a/pkg/local_object_storage/blobovnicza/put.go +++ b/pkg/local_object_storage/blobovnicza/put.go @@ -1,9 +1,9 @@ package blobovnicza import ( - "errors" "fmt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -21,7 +21,7 @@ type PutRes struct { // ErrFull is returned when trying to save an // object to a filled blobovnicza. -var ErrFull = errors.New("blobovnicza is full") +var ErrFull = logicerr.New("blobovnicza is full") // SetAddress sets the address of the saving object. func (p *PutPrm) SetAddress(addr oid.Address) { @@ -62,7 +62,7 @@ func (b *Blobovnicza) Put(prm PutPrm) (PutRes, error) { // expected to happen: // - before initialization step (incorrect usage by design) // - if DB is corrupted (in future this case should be handled) - return fmt.Errorf("(%T) bucket for size %d not created", b, sz) + return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz)) } // save the object in bucket diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index e70ed06ae..d106932f7 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -157,7 +158,7 @@ func (b *Blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex if ok { if old != nil { if active.ind == b.blzShallowWidth-1 { - return active, errors.New("no more Blobovniczas") + return active, logicerr.New("no more Blobovniczas") } else if active.ind != *old { // sort of CAS in order to control concurrent // updateActive calls @@ -246,3 +247,8 @@ func (b *Blobovniczas) Path() string { func (b *Blobovniczas) SetCompressor(cc *compression.Config) { b.compression = cc } + +// SetReportErrorFunc implements common.Storage. +func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) { + b.reportError = f +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go deleted file mode 100644 index b331a13b0..000000000 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza_test.go +++ /dev/null @@ -1,165 +0,0 @@ -package blobovniczatree - -import ( - "math" - "math/rand" - "os" - "strconv" - "strings" - "testing" - - "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" - "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/nspcc-dev/neofs-node/pkg/util/logger/test" - objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestOpenedAndActive(t *testing.T) { - rand.Seed(1024) - - l := test.NewLogger(true) - p, err := os.MkdirTemp("", "*") - require.NoError(t, err) - - const ( - width = 2 - depth = 1 - dbSize = 64 * 1024 - ) - - b := NewBlobovniczaTree( - WithLogger(l), - WithObjectSizeLimit(2048), - WithBlobovniczaShallowWidth(width), - WithBlobovniczaShallowDepth(depth), - WithRootPath(p), - WithOpenedCacheSize(1), - WithBlobovniczaSize(dbSize)) - - defer os.RemoveAll(p) - - require.NoError(t, b.Open(false)) - require.NoError(t, b.Init()) - - type pair struct { - obj *objectSDK.Object - sid []byte - } - - objects := make([]pair, 10) - for i := range objects { - var prm common.PutPrm - prm.Object = blobstortest.NewObject(1024) - prm.Address = object.AddressOf(prm.Object) - prm.RawData, err = prm.Object.Marshal() - require.NoError(t, err) - - res, err := b.Put(prm) - require.NoError(t, err) - - objects[i].obj = prm.Object - objects[i].sid = res.StorageID - } - for i := range objects { - var prm common.GetPrm - prm.Address = object.AddressOf(objects[i].obj) - // It is important to provide StorageID because - // we want to open a single blobovnicza, without other - // unpredictable cache effects. - prm.StorageID = objects[i].sid - - _, err := b.Get(prm) - require.NoError(t, err) - } - require.NoError(t, b.Close()) -} - -func TestBlobovniczas(t *testing.T) { - rand.Seed(1024) - - l := test.NewLogger(false) - p, err := os.MkdirTemp("", "*") - require.NoError(t, err) - - var width, depth uint64 = 2, 2 - - // sizeLim must be big enough, to hold at least multiple pages. - // 32 KiB is the initial size after all by-size buckets are created. - var szLim uint64 = 32*1024 + 1 - - b := NewBlobovniczaTree( - WithLogger(l), - WithObjectSizeLimit(szLim), - WithBlobovniczaShallowWidth(width), - WithBlobovniczaShallowDepth(depth), - WithRootPath(p), - WithBlobovniczaSize(szLim)) - - defer os.RemoveAll(p) - - require.NoError(t, b.Init()) - - objSz := uint64(szLim / 2) - - addrList := make([]oid.Address, 0) - minFitObjNum := width * depth * szLim / objSz - - for i := uint64(0); i < minFitObjNum; i++ { - obj := blobstortest.NewObject(objSz) - addr := object.AddressOf(obj) - - addrList = append(addrList, addr) - - d, err := obj.Marshal() - require.NoError(t, err) - - // save object in blobovnicza - _, err = b.Put(common.PutPrm{Address: addr, RawData: d}) - require.NoError(t, err, i) - } -} - -func TestFillOrder(t *testing.T) { - for _, depth := range []uint64{1, 2, 4} { - t.Run("depth="+strconv.FormatUint(depth, 10), func(t *testing.T) { - testFillOrder(t, depth) - }) - } -} - -func testFillOrder(t *testing.T, depth uint64) { - p, err := os.MkdirTemp("", "*") - require.NoError(t, err) - b := NewBlobovniczaTree( - WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), - WithObjectSizeLimit(2048), - WithBlobovniczaShallowWidth(3), - WithBlobovniczaShallowDepth(depth), - WithRootPath(p), - WithBlobovniczaSize(1024*1024)) // big enough for some objects. - require.NoError(t, b.Open(false)) - require.NoError(t, b.Init()) - t.Cleanup(func() { - b.Close() - }) - - objCount := 10 /* ~ objects per blobovnicza */ * - int(math.Pow(3, float64(depth)-1)) /* blobovniczas on a previous to last level */ - - for i := 0; i < objCount; i++ { - obj := blobstortest.NewObject(1024) - addr := object.AddressOf(obj) - - d, err := obj.Marshal() - require.NoError(t, err) - - res, err := b.Put(common.PutPrm{Address: addr, RawData: d, DontCompress: true}) - require.NoError(t, err, i) - require.True(t, strings.HasSuffix(string(res.StorageID), "/0")) - } -} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/errors.go b/pkg/local_object_storage/blobstor/blobovniczatree/errors.go index 22381a897..a78ff7d87 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/errors.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/errors.go @@ -3,9 +3,14 @@ package blobovniczatree import ( "errors" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" ) func isErrOutOfRange(err error) bool { return errors.As(err, new(apistatus.ObjectOutOfRange)) } + +func isLogical(err error) bool { + return errors.As(err, new(logicerr.Logical)) +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/option.go b/pkg/local_object_storage/blobstor/blobovniczatree/option.go index dca755692..840d2df45 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/option.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/option.go @@ -19,6 +19,8 @@ type cfg struct { blzShallowWidth uint64 compression *compression.Config blzOpts []blobovnicza.Option + // reportError is the function called when encountering disk errors. + reportError func(string, error) } type Option func(*cfg) @@ -37,6 +39,7 @@ func initConfig(c *cfg) { openedCacheSize: defaultOpenedCacheSize, blzShallowDepth: defaultBlzShallowDepth, blzShallowWidth: defaultBlzShallowWidth, + reportError: func(string, error) {}, } } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/put.go b/pkg/local_object_storage/blobstor/blobovniczatree/put.go index 2f11f6f49..3589456ab 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/put.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/put.go @@ -34,9 +34,12 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) { fn = func(p string) (bool, error) { active, err := b.getActivated(p) if err != nil { - b.log.Debug("could not get active blobovnicza", - zap.String("error", err.Error()), - ) + if !isLogical(err) { + b.reportError("could not get active blobovnicza", err) + } else { + b.log.Debug("could not get active blobovnicza", + zap.String("error", err.Error())) + } return false, nil } @@ -49,10 +52,13 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) { ) if err := b.updateActive(p, &active.ind); err != nil { - b.log.Debug("could not update active blobovnicza", - zap.String("level", p), - zap.String("error", err.Error()), - ) + if !isLogical(err) { + b.reportError("could not update active blobovnicza", err) + } else { + b.log.Debug("could not update active blobovnicza", + zap.String("level", p), + zap.String("error", err.Error())) + } return false, nil } @@ -61,10 +67,13 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) { } allFull = false - b.log.Debug("could not put object to active blobovnicza", - zap.String("path", filepath.Join(p, u64ToHexString(active.ind))), - zap.String("error", err.Error()), - ) + if !isLogical(err) { + b.reportError("could not put object to active blobovnicza", err) + } else { + b.log.Debug("could not put object to active blobovnicza", + zap.String("path", filepath.Join(p, u64ToHexString(active.ind))), + zap.String("error", err.Error())) + } return false, nil } diff --git a/pkg/local_object_storage/blobstor/blobstor.go b/pkg/local_object_storage/blobstor/blobstor.go index fdd1949b0..717ba893b 100644 --- a/pkg/local_object_storage/blobstor/blobstor.go +++ b/pkg/local_object_storage/blobstor/blobstor.go @@ -105,3 +105,11 @@ func WithUncompressableContentTypes(values []string) Option { c.compression.UncompressableContentTypes = values } } + +// SetReportErrorFunc allows to provide a function to be called on disk errors. +// This function MUST be called before Open. +func (b *BlobStor) SetReportErrorFunc(f func(string, error)) { + for i := range b.storage { + b.storage[i].Storage.SetReportErrorFunc(f) + } +} diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 85f8039a3..b66b41200 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -12,6 +12,9 @@ type Storage interface { Type() string Path() string SetCompressor(cc *compression.Config) + // SetReportErrorFunc allows to provide a function to be called on disk errors. + // This function MUST be called before Open. + SetReportErrorFunc(f func(string, error)) Get(GetPrm) (GetRes, error) GetRange(GetRangePrm) (GetRangeRes, error) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 9254095f7..aef171d09 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -379,3 +379,8 @@ func (t *FSTree) Path() string { func (t *FSTree) SetCompressor(cc *compression.Config) { t.Config = cc } + +// SetReportErrorFunc implements common.Storage. +func (t *FSTree) SetReportErrorFunc(f func(string, error)) { + // Do nothing, FSTree can encounter only one error which is returned. +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index fb5ba833e..ede56e912 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -127,12 +127,16 @@ func New(opts ...Option) *Shard { tsSource: c.tsSource, } + reportFunc := func(msg string, err error) { + s.reportErrorFunc(s.ID().String(), msg, err) + } + + s.blobStor.SetReportErrorFunc(reportFunc) + if c.useWriteCache { s.writeCache = writecache.New( append(c.writeCacheOpts, - writecache.WithReportErrorFunc(func(msg string, err error) { - s.reportErrorFunc(s.ID().String(), msg, err) - }), + writecache.WithReportErrorFunc(reportFunc), writecache.WithBlobstor(bs), writecache.WithMetabase(mb))...) }