From d2cce62934f5dbec7363e15c8ed66cd6838b18c8 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 20 Oct 2022 13:40:25 +0300 Subject: [PATCH] [#1818] writecache: Increase error counter on background errors Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 1 + pkg/local_object_storage/engine/engine.go | 19 ++++++ pkg/local_object_storage/engine/shards.go | 1 + pkg/local_object_storage/shard/shard.go | 42 ++++++++----- pkg/local_object_storage/writecache/flush.go | 59 +++++++++++-------- .../writecache/flush_test.go | 22 ++++--- .../writecache/options.go | 9 +++ 7 files changed, 105 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e25baccd..54abe04f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Changelog for NeoFS Node - Do not panic with bad inputs for `GET_RANGE` (#2007) - Correctly select the shard for applying tree service operations (#1996) - Physical child object removal by GC (#1699) +- Increase error counter for write-cache flush errors (#1818) ### Removed ### Updated diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 579d0d8ee..cf61253b4 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -35,6 +35,25 @@ type shardWrapper struct { *shard.Shard } +// reportShardErrorBackground increases shard error counter and logs an error. +// It is intended to be used from background workers and +// doesn't change shard mode because of possible deadlocks. +func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) { + e.mtx.RLock() + sh, ok := e.shards[id] + e.mtx.RUnlock() + + if !ok { + return + } + + errCount := sh.errorCount.Inc() + e.log.Warn(msg, + zap.String("shard_id", id), + zap.Uint32("error count", errCount), + zap.String("error", err.Error())) +} + // reportShardError checks that the amount of errors doesn't exceed the configured threshold. // If it does, shard is set to read-only mode. func (e *StorageEngine) reportShardError( diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 4f275ac05..8604b773e 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -87,6 +87,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithDeletedLockCallback(e.processDeletedLocks), + shard.WithReportErrorFunc(e.reportShardErrorBackground), )...) if err := sh.UpdateID(); err != nil { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index d36416004..fb5ba833e 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -96,13 +96,16 @@ type cfg struct { tsSource TombstoneSource metricsWriter MetricsWriter + + reportErrorFunc func(selfID string, message string, err error) } func defaultCfg() *cfg { return &cfg{ - rmBatchSize: 100, - log: &logger.Logger{Logger: zap.L()}, - gcCfg: defaultGCCfg(), + rmBatchSize: 100, + log: &logger.Logger{Logger: zap.L()}, + gcCfg: defaultGCCfg(), + reportErrorFunc: func(string, string, error) {}, } } @@ -117,20 +120,21 @@ func New(opts ...Option) *Shard { bs := blobstor.New(c.blobOpts...) mb := meta.New(c.metaOpts...) - var writeCache writecache.Cache - if c.useWriteCache { - writeCache = writecache.New( - append(c.writeCacheOpts, - writecache.WithBlobstor(bs), - writecache.WithMetabase(mb))...) + s := &Shard{ + cfg: c, + blobStor: bs, + metaBase: mb, + tsSource: c.tsSource, } - s := &Shard{ - cfg: c, - blobStor: bs, - metaBase: mb, - writeCache: writeCache, - tsSource: c.tsSource, + if c.useWriteCache { + s.writeCache = writecache.New( + append(c.writeCacheOpts, + writecache.WithReportErrorFunc(func(msg string, err error) { + s.reportErrorFunc(s.ID().String(), msg, err) + }), + writecache.WithBlobstor(bs), + writecache.WithMetabase(mb))...) } if s.piloramaOpts != nil { @@ -281,6 +285,14 @@ func WithMetricsWriter(v MetricsWriter) Option { } } +// WithReportErrorFunc returns option to specify callback for handling storage-related errors +// in the background workers. +func WithReportErrorFunc(f func(selfID string, message string, err error)) Option { + return func(c *cfg) { + c.reportErrorFunc = f + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 9957fda05..346f4613f 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -7,6 +7,7 @@ import ( "github.com/mr-tron/base58" "github.com/nspcc-dev/neo-go/pkg/util/slice" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -146,6 +147,16 @@ func (c *cache) flushBigObjects() { } } +func (c *cache) reportFlushError(msg string, addr string, err error) { + if c.reportError != nil { + c.reportError(msg, err) + } else { + c.log.Error(msg, + zap.String("address", addr), + zap.Error(err)) + } +} + func (c *cache) flushFSTree(ignoreErrors bool) error { var prm common.IteratePrm prm.IgnoreErrors = ignoreErrors @@ -158,8 +169,8 @@ func (c *cache) flushFSTree(ignoreErrors bool) error { data, err := f() if err != nil { + c.reportFlushError("can't read a file", sAddr, err) if ignoreErrors { - c.log.Error("can't read a file", zap.Stringer("address", addr)) return nil } return err @@ -168,37 +179,21 @@ func (c *cache) flushFSTree(ignoreErrors bool) error { var obj object.Object err = obj.Unmarshal(data) if err != nil { + c.reportFlushError("can't unmarshal an object", sAddr, err) if ignoreErrors { - c.log.Error("can't unmarshal an object", zap.Stringer("address", addr)) return nil } return err } - var prm common.PutPrm - prm.Address = addr - prm.Object = &obj - prm.RawData = data - - res, err := c.blobstor.Put(prm) + err = c.flushObject(&obj, data) if err != nil { if ignoreErrors { - c.log.Error("cant flush object to blobstor", zap.Error(err)) return nil } return err } - var updPrm meta.UpdateStorageIDPrm - updPrm.SetAddress(addr) - updPrm.SetStorageID(res.StorageID) - - _, err = c.metabase.UpdateStorageID(updPrm) - if err != nil { - c.log.Error("failed to update storage ID in metabase", zap.Error(err)) - return nil - } - // mark object as flushed c.flushed.Add(sAddr, false) @@ -222,30 +217,40 @@ func (c *cache) flushWorker(_ int) { return } - err := c.flushObject(obj) - if err != nil { - c.log.Error("can't flush object to the main storage", zap.Error(err)) - } else { + err := c.flushObject(obj, nil) + if err == nil { c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true) } } } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(obj *object.Object) error { +func (c *cache) flushObject(obj *object.Object, data []byte) error { + addr := objectCore.AddressOf(obj) + var prm common.PutPrm prm.Object = obj + prm.RawData = data res, err := c.blobstor.Put(prm) if err != nil { + if !errors.Is(err, common.ErrNoSpace) && !errors.Is(err, common.ErrReadOnly) && + !errors.Is(err, blobstor.ErrNoPlaceFound) { + c.reportFlushError("can't flush an object to blobstor", + addr.EncodeToString(), err) + } return err } var updPrm meta.UpdateStorageIDPrm - updPrm.SetAddress(objectCore.AddressOf(obj)) + updPrm.SetAddress(addr) updPrm.SetStorageID(res.StorageID) _, err = c.metabase.UpdateStorageID(updPrm) + if err != nil { + c.reportFlushError("can't update object storage ID", + addr.EncodeToString(), err) + } return err } @@ -280,6 +285,7 @@ func (c *cache) flush(ignoreErrors bool) error { } if err := addr.DecodeString(sa); err != nil { + c.reportFlushError("can't decode object address from the DB", sa, err) if ignoreErrors { continue } @@ -288,13 +294,14 @@ func (c *cache) flush(ignoreErrors bool) error { var obj object.Object if err := obj.Unmarshal(data); err != nil { + c.reportFlushError("can't unmarshal an object from the DB", sa, err) if ignoreErrors { continue } return err } - if err := c.flushObject(&obj); err != nil { + if err := c.flushObject(&obj, data); err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index e98cc941a..dea042e12 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -21,6 +21,7 @@ import ( versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" + "go.uber.org/atomic" "go.uber.org/zap/zaptest" ) @@ -35,7 +36,7 @@ func TestFlush(t *testing.T) { obj *object.Object } - newCache := func(t *testing.T) (Cache, *blobstor.BlobStor, *meta.DB) { + newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) { dir := t.TempDir() mb := meta.New( meta.WithPath(filepath.Join(dir, "meta")), @@ -54,11 +55,13 @@ func TestFlush(t *testing.T) { require.NoError(t, bs.Init()) wc := New( - WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), - WithPath(filepath.Join(dir, "writecache")), - WithSmallObjectSize(smallSize), - WithMetabase(mb), - WithBlobstor(bs)) + append([]Option{ + WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), + WithPath(filepath.Join(dir, "writecache")), + WithSmallObjectSize(smallSize), + WithMetabase(mb), + WithBlobstor(bs), + }, opts...)...) require.NoError(t, wc.Open(false)) require.NoError(t, wc.Init()) @@ -164,7 +167,10 @@ func TestFlush(t *testing.T) { t.Run("ignore errors", func(t *testing.T) { testIgnoreErrors := func(t *testing.T, f func(*cache)) { - wc, bs, mb := newCache(t) + var errCount atomic.Uint32 + wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) { + errCount.Inc() + })) objects := putObjects(t, wc) f(wc.(*cache)) @@ -172,7 +178,9 @@ func TestFlush(t *testing.T) { require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) + require.Equal(t, uint32(0), errCount.Load()) require.Error(t, wc.Flush(false)) + require.True(t, errCount.Load() > 0) require.NoError(t, wc.Flush(true)) check(t, mb, bs, objects) diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 76714f6d1..dec5c1b1a 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -52,6 +52,8 @@ type options struct { maxBatchDelay time.Duration // noSync is true iff FSTree allows unsynchronized writes. noSync bool + // reportError is the function called when encountering disk errors in background workers. + reportError func(string, error) } // WithLogger sets logger. @@ -142,3 +144,10 @@ func WithNoSync(noSync bool) Option { o.noSync = noSync } } + +// WithReportErrorFunc sets error reporting function. +func WithReportErrorFunc(f func(string, error)) Option { + return func(o *options) { + o.reportError = f + } +}