[#1985] blobstor: Allow to report multiple errors to caller
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
f2d7e65e39
commit
d65604ad30
10 changed files with 61 additions and 183 deletions
|
@ -1,9 +1,9 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -21,7 +21,7 @@ type PutRes struct {
|
||||||
|
|
||||||
// ErrFull is returned when trying to save an
|
// ErrFull is returned when trying to save an
|
||||||
// object to a filled blobovnicza.
|
// object to a filled blobovnicza.
|
||||||
var ErrFull = errors.New("blobovnicza is full")
|
var ErrFull = logicerr.New("blobovnicza is full")
|
||||||
|
|
||||||
// SetAddress sets the address of the saving object.
|
// SetAddress sets the address of the saving object.
|
||||||
func (p *PutPrm) SetAddress(addr oid.Address) {
|
func (p *PutPrm) SetAddress(addr oid.Address) {
|
||||||
|
@ -62,7 +62,7 @@ func (b *Blobovnicza) Put(prm PutPrm) (PutRes, error) {
|
||||||
// expected to happen:
|
// expected to happen:
|
||||||
// - before initialization step (incorrect usage by design)
|
// - before initialization step (incorrect usage by design)
|
||||||
// - if DB is corrupted (in future this case should be handled)
|
// - if DB is corrupted (in future this case should be handled)
|
||||||
return fmt.Errorf("(%T) bucket for size %d not created", b, sz)
|
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
|
||||||
}
|
}
|
||||||
|
|
||||||
// save the object in bucket
|
// save the object in bucket
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -157,7 +158,7 @@ func (b *Blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex
|
||||||
if ok {
|
if ok {
|
||||||
if old != nil {
|
if old != nil {
|
||||||
if active.ind == b.blzShallowWidth-1 {
|
if active.ind == b.blzShallowWidth-1 {
|
||||||
return active, errors.New("no more Blobovniczas")
|
return active, logicerr.New("no more Blobovniczas")
|
||||||
} else if active.ind != *old {
|
} else if active.ind != *old {
|
||||||
// sort of CAS in order to control concurrent
|
// sort of CAS in order to control concurrent
|
||||||
// updateActive calls
|
// updateActive calls
|
||||||
|
@ -246,3 +247,8 @@ func (b *Blobovniczas) Path() string {
|
||||||
func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
|
func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
|
||||||
b.compression = cc
|
b.compression = cc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetReportErrorFunc implements common.Storage.
|
||||||
|
func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) {
|
||||||
|
b.reportError = f
|
||||||
|
}
|
||||||
|
|
|
@ -1,165 +0,0 @@
|
||||||
package blobovniczatree
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"go.uber.org/zap/zaptest"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestOpenedAndActive(t *testing.T) {
|
|
||||||
rand.Seed(1024)
|
|
||||||
|
|
||||||
l := test.NewLogger(true)
|
|
||||||
p, err := os.MkdirTemp("", "*")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
const (
|
|
||||||
width = 2
|
|
||||||
depth = 1
|
|
||||||
dbSize = 64 * 1024
|
|
||||||
)
|
|
||||||
|
|
||||||
b := NewBlobovniczaTree(
|
|
||||||
WithLogger(l),
|
|
||||||
WithObjectSizeLimit(2048),
|
|
||||||
WithBlobovniczaShallowWidth(width),
|
|
||||||
WithBlobovniczaShallowDepth(depth),
|
|
||||||
WithRootPath(p),
|
|
||||||
WithOpenedCacheSize(1),
|
|
||||||
WithBlobovniczaSize(dbSize))
|
|
||||||
|
|
||||||
defer os.RemoveAll(p)
|
|
||||||
|
|
||||||
require.NoError(t, b.Open(false))
|
|
||||||
require.NoError(t, b.Init())
|
|
||||||
|
|
||||||
type pair struct {
|
|
||||||
obj *objectSDK.Object
|
|
||||||
sid []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
objects := make([]pair, 10)
|
|
||||||
for i := range objects {
|
|
||||||
var prm common.PutPrm
|
|
||||||
prm.Object = blobstortest.NewObject(1024)
|
|
||||||
prm.Address = object.AddressOf(prm.Object)
|
|
||||||
prm.RawData, err = prm.Object.Marshal()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
res, err := b.Put(prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
objects[i].obj = prm.Object
|
|
||||||
objects[i].sid = res.StorageID
|
|
||||||
}
|
|
||||||
for i := range objects {
|
|
||||||
var prm common.GetPrm
|
|
||||||
prm.Address = object.AddressOf(objects[i].obj)
|
|
||||||
// It is important to provide StorageID because
|
|
||||||
// we want to open a single blobovnicza, without other
|
|
||||||
// unpredictable cache effects.
|
|
||||||
prm.StorageID = objects[i].sid
|
|
||||||
|
|
||||||
_, err := b.Get(prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
require.NoError(t, b.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBlobovniczas(t *testing.T) {
|
|
||||||
rand.Seed(1024)
|
|
||||||
|
|
||||||
l := test.NewLogger(false)
|
|
||||||
p, err := os.MkdirTemp("", "*")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
var width, depth uint64 = 2, 2
|
|
||||||
|
|
||||||
// sizeLim must be big enough, to hold at least multiple pages.
|
|
||||||
// 32 KiB is the initial size after all by-size buckets are created.
|
|
||||||
var szLim uint64 = 32*1024 + 1
|
|
||||||
|
|
||||||
b := NewBlobovniczaTree(
|
|
||||||
WithLogger(l),
|
|
||||||
WithObjectSizeLimit(szLim),
|
|
||||||
WithBlobovniczaShallowWidth(width),
|
|
||||||
WithBlobovniczaShallowDepth(depth),
|
|
||||||
WithRootPath(p),
|
|
||||||
WithBlobovniczaSize(szLim))
|
|
||||||
|
|
||||||
defer os.RemoveAll(p)
|
|
||||||
|
|
||||||
require.NoError(t, b.Init())
|
|
||||||
|
|
||||||
objSz := uint64(szLim / 2)
|
|
||||||
|
|
||||||
addrList := make([]oid.Address, 0)
|
|
||||||
minFitObjNum := width * depth * szLim / objSz
|
|
||||||
|
|
||||||
for i := uint64(0); i < minFitObjNum; i++ {
|
|
||||||
obj := blobstortest.NewObject(objSz)
|
|
||||||
addr := object.AddressOf(obj)
|
|
||||||
|
|
||||||
addrList = append(addrList, addr)
|
|
||||||
|
|
||||||
d, err := obj.Marshal()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// save object in blobovnicza
|
|
||||||
_, err = b.Put(common.PutPrm{Address: addr, RawData: d})
|
|
||||||
require.NoError(t, err, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFillOrder(t *testing.T) {
|
|
||||||
for _, depth := range []uint64{1, 2, 4} {
|
|
||||||
t.Run("depth="+strconv.FormatUint(depth, 10), func(t *testing.T) {
|
|
||||||
testFillOrder(t, depth)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testFillOrder(t *testing.T, depth uint64) {
|
|
||||||
p, err := os.MkdirTemp("", "*")
|
|
||||||
require.NoError(t, err)
|
|
||||||
b := NewBlobovniczaTree(
|
|
||||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
|
||||||
WithObjectSizeLimit(2048),
|
|
||||||
WithBlobovniczaShallowWidth(3),
|
|
||||||
WithBlobovniczaShallowDepth(depth),
|
|
||||||
WithRootPath(p),
|
|
||||||
WithBlobovniczaSize(1024*1024)) // big enough for some objects.
|
|
||||||
require.NoError(t, b.Open(false))
|
|
||||||
require.NoError(t, b.Init())
|
|
||||||
t.Cleanup(func() {
|
|
||||||
b.Close()
|
|
||||||
})
|
|
||||||
|
|
||||||
objCount := 10 /* ~ objects per blobovnicza */ *
|
|
||||||
int(math.Pow(3, float64(depth)-1)) /* blobovniczas on a previous to last level */
|
|
||||||
|
|
||||||
for i := 0; i < objCount; i++ {
|
|
||||||
obj := blobstortest.NewObject(1024)
|
|
||||||
addr := object.AddressOf(obj)
|
|
||||||
|
|
||||||
d, err := obj.Marshal()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
res, err := b.Put(common.PutPrm{Address: addr, RawData: d, DontCompress: true})
|
|
||||||
require.NoError(t, err, i)
|
|
||||||
require.True(t, strings.HasSuffix(string(res.StorageID), "/0"))
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,9 +3,14 @@ package blobovniczatree
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
func isErrOutOfRange(err error) bool {
|
func isErrOutOfRange(err error) bool {
|
||||||
return errors.As(err, new(apistatus.ObjectOutOfRange))
|
return errors.As(err, new(apistatus.ObjectOutOfRange))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isLogical(err error) bool {
|
||||||
|
return errors.As(err, new(logicerr.Logical))
|
||||||
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@ type cfg struct {
|
||||||
blzShallowWidth uint64
|
blzShallowWidth uint64
|
||||||
compression *compression.Config
|
compression *compression.Config
|
||||||
blzOpts []blobovnicza.Option
|
blzOpts []blobovnicza.Option
|
||||||
|
// reportError is the function called when encountering disk errors.
|
||||||
|
reportError func(string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
@ -37,6 +39,7 @@ func initConfig(c *cfg) {
|
||||||
openedCacheSize: defaultOpenedCacheSize,
|
openedCacheSize: defaultOpenedCacheSize,
|
||||||
blzShallowDepth: defaultBlzShallowDepth,
|
blzShallowDepth: defaultBlzShallowDepth,
|
||||||
blzShallowWidth: defaultBlzShallowWidth,
|
blzShallowWidth: defaultBlzShallowWidth,
|
||||||
|
reportError: func(string, error) {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,9 +34,12 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||||
fn = func(p string) (bool, error) {
|
fn = func(p string) (bool, error) {
|
||||||
active, err := b.getActivated(p)
|
active, err := b.getActivated(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.log.Debug("could not get active blobovnicza",
|
if !isLogical(err) {
|
||||||
zap.String("error", err.Error()),
|
b.reportError("could not get active blobovnicza", err)
|
||||||
)
|
} else {
|
||||||
|
b.log.Debug("could not get active blobovnicza",
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -49,10 +52,13 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := b.updateActive(p, &active.ind); err != nil {
|
if err := b.updateActive(p, &active.ind); err != nil {
|
||||||
b.log.Debug("could not update active blobovnicza",
|
if !isLogical(err) {
|
||||||
zap.String("level", p),
|
b.reportError("could not update active blobovnicza", err)
|
||||||
zap.String("error", err.Error()),
|
} else {
|
||||||
)
|
b.log.Debug("could not update active blobovnicza",
|
||||||
|
zap.String("level", p),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -61,10 +67,13 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
allFull = false
|
allFull = false
|
||||||
b.log.Debug("could not put object to active blobovnicza",
|
if !isLogical(err) {
|
||||||
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))),
|
b.reportError("could not put object to active blobovnicza", err)
|
||||||
zap.String("error", err.Error()),
|
} else {
|
||||||
)
|
b.log.Debug("could not put object to active blobovnicza",
|
||||||
|
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,3 +105,11 @@ func WithUncompressableContentTypes(values []string) Option {
|
||||||
c.compression.UncompressableContentTypes = values
|
c.compression.UncompressableContentTypes = values
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
||||||
|
// This function MUST be called before Open.
|
||||||
|
func (b *BlobStor) SetReportErrorFunc(f func(string, error)) {
|
||||||
|
for i := range b.storage {
|
||||||
|
b.storage[i].Storage.SetReportErrorFunc(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -12,6 +12,9 @@ type Storage interface {
|
||||||
Type() string
|
Type() string
|
||||||
Path() string
|
Path() string
|
||||||
SetCompressor(cc *compression.Config)
|
SetCompressor(cc *compression.Config)
|
||||||
|
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
||||||
|
// This function MUST be called before Open.
|
||||||
|
SetReportErrorFunc(f func(string, error))
|
||||||
|
|
||||||
Get(GetPrm) (GetRes, error)
|
Get(GetPrm) (GetRes, error)
|
||||||
GetRange(GetRangePrm) (GetRangeRes, error)
|
GetRange(GetRangePrm) (GetRangeRes, error)
|
||||||
|
|
|
@ -379,3 +379,8 @@ func (t *FSTree) Path() string {
|
||||||
func (t *FSTree) SetCompressor(cc *compression.Config) {
|
func (t *FSTree) SetCompressor(cc *compression.Config) {
|
||||||
t.Config = cc
|
t.Config = cc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetReportErrorFunc implements common.Storage.
|
||||||
|
func (t *FSTree) SetReportErrorFunc(f func(string, error)) {
|
||||||
|
// Do nothing, FSTree can encounter only one error which is returned.
|
||||||
|
}
|
||||||
|
|
|
@ -127,12 +127,16 @@ func New(opts ...Option) *Shard {
|
||||||
tsSource: c.tsSource,
|
tsSource: c.tsSource,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reportFunc := func(msg string, err error) {
|
||||||
|
s.reportErrorFunc(s.ID().String(), msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.blobStor.SetReportErrorFunc(reportFunc)
|
||||||
|
|
||||||
if c.useWriteCache {
|
if c.useWriteCache {
|
||||||
s.writeCache = writecache.New(
|
s.writeCache = writecache.New(
|
||||||
append(c.writeCacheOpts,
|
append(c.writeCacheOpts,
|
||||||
writecache.WithReportErrorFunc(func(msg string, err error) {
|
writecache.WithReportErrorFunc(reportFunc),
|
||||||
s.reportErrorFunc(s.ID().String(), msg, err)
|
|
||||||
}),
|
|
||||||
writecache.WithBlobstor(bs),
|
writecache.WithBlobstor(bs),
|
||||||
writecache.WithMetabase(mb))...)
|
writecache.WithMetabase(mb))...)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue