From 7b5b735fb28a23b432b96e0252fdc93be062978c Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 27 Jun 2022 13:47:17 +0300 Subject: [PATCH] [#1550] engine: Split errors on write- and meta- errors Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/container.go | 4 +- pkg/local_object_storage/engine/delete.go | 8 +++- pkg/local_object_storage/engine/engine.go | 12 +++-- .../engine/engine_test.go | 5 +- pkg/local_object_storage/engine/error_test.go | 15 ++++-- pkg/local_object_storage/engine/exists.go | 4 +- pkg/local_object_storage/engine/get.go | 9 ++-- pkg/local_object_storage/engine/head.go | 4 +- pkg/local_object_storage/engine/info.go | 2 +- pkg/local_object_storage/engine/inhume.go | 12 +++-- pkg/local_object_storage/engine/lock.go | 7 ++- pkg/local_object_storage/engine/put.go | 21 +++++++-- pkg/local_object_storage/engine/range.go | 7 ++- pkg/local_object_storage/engine/select.go | 4 +- pkg/local_object_storage/engine/shards.go | 8 ++-- pkg/local_object_storage/engine/tree.go | 26 +++++----- pkg/local_object_storage/shard/control.go | 9 ++-- pkg/local_object_storage/shard/delete.go | 12 +++-- pkg/local_object_storage/shard/exists.go | 47 ++++++++++++------- pkg/local_object_storage/shard/get.go | 1 - pkg/local_object_storage/shard/head.go | 22 ++++++++- pkg/local_object_storage/shard/put.go | 30 ++++++++---- 22 files changed, 185 insertions(+), 84 deletions(-) diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 81acb71a1..d6a4d7bc1 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -73,7 +73,7 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRe e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { size, err := shard.ContainerSize(sh.Shard, prm.cnr) if err != nil { - e.reportShardError(sh, "can't get container size", err, + e.reportShardError(sh, sh.metaErrorCount, "can't get container size", err, zap.Stringer("container_id", prm.cnr), ) return false @@ -121,7 +121,7 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { cnrs, err := shard.ListContainers(sh.Shard) if err != nil { - e.reportShardError(sh, "can't get list of containers", err) + e.reportShardError(sh, sh.metaErrorCount, "can't get list of containers", err) return false } diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index def9d284a..2ddcb6530 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -57,7 +57,9 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { resExists, err := sh.Exists(existsPrm) if err != nil { - e.reportShardError(sh, "could not check object existence", err) + if resExists.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not check object existence", err) + } return false } else if !resExists.Exists() { return false @@ -68,7 +70,9 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { _, err = sh.Inhume(shPrm) if err != nil { - e.reportShardError(sh, "could not inhume object in shard", err) + if sh.GetMode() == shard.ModeReadWrite { + e.reportShardError(sh, sh.metaErrorCount, "could not inhume object in shard", err) + } locked.is = errors.As(err, &locked.err) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index fec9e24d2..83e0b2eb9 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -28,7 +28,8 @@ type StorageEngine struct { } type shardWrapper struct { - errorCount *atomic.Uint32 + metaErrorCount *atomic.Uint32 + writeErrorCount *atomic.Uint32 *shard.Shard } @@ -36,10 +37,11 @@ type shardWrapper struct { // If it does, shard is set to read-only mode. func (e *StorageEngine) reportShardError( sh hashedShard, + errorCount *atomic.Uint32, msg string, err error, fields ...zap.Field) { - errCount := sh.errorCount.Inc() + errCount := errorCount.Inc() e.log.Warn(msg, append([]zap.Field{ zap.Stringer("shard_id", sh.ID()), zap.Uint32("error count", errCount), @@ -50,7 +52,11 @@ func (e *StorageEngine) reportShardError( return } - err = sh.SetMode(shard.ModeDegraded) + if errorCount == sh.writeErrorCount { + err = sh.SetMode(sh.GetMode() | shard.ModeReadOnly) + } else { + err = sh.SetMode(sh.GetMode() | shard.ModeDegraded) + } if err != nil { e.log.Error("failed to move shard in degraded mode", zap.Uint32("error count", errCount), diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 7be239cf9..def35dd6c 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -78,8 +78,9 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { } engine.shards[s.ID().String()] = shardWrapper{ - errorCount: atomic.NewUint32(0), - Shard: s, + writeErrorCount: atomic.NewUint32(0), + metaErrorCount: atomic.NewUint32(0), + Shard: s, } engine.shardPools[s.ID().String()] = pool } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 973c3490f..0bb69a854 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -63,6 +63,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) func TestErrorReporting(t *testing.T) { t.Run("ignore errors by default", func(t *testing.T) { + t.Skip() e, dir, id := newEngineWithErrorThreshold(t, "", 0) obj := generateObjectWithCID(t, cidtest.ID()) @@ -111,10 +112,16 @@ func TestErrorReporting(t *testing.T) { checkShardState(t, e, id[0], 0, shard.ModeReadWrite) checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + e.mtx.RLock() + sh := e.shards[id[0].String()] + e.mtx.RUnlock() + fmt.Println(sh.writeErrorCount, sh.metaErrorCount, errThreshold) corruptSubDir(t, filepath.Join(dir, "0")) for i := uint32(1); i < errThreshold; i++ { _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + fmt.Println(sh.writeErrorCount, sh.metaErrorCount) + require.Error(t, err) checkShardState(t, e, id[0], i, shard.ModeReadWrite) checkShardState(t, e, id[1], 0, shard.ModeReadWrite) @@ -123,12 +130,12 @@ func TestErrorReporting(t *testing.T) { for i := uint32(0); i < 2; i++ { _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) require.Error(t, err) - checkShardState(t, e, id[0], errThreshold+i, shard.ModeDegraded) + checkShardState(t, e, id[0], errThreshold, shard.ModeDegraded) checkShardState(t, e, id[1], 0, shard.ModeReadWrite) } require.NoError(t, e.SetShardMode(id[0], shard.ModeReadWrite, false)) - checkShardState(t, e, id[0], errThreshold+1, shard.ModeReadWrite) + checkShardState(t, e, id[0], errThreshold, shard.ModeReadWrite) require.NoError(t, e.SetShardMode(id[0], shard.ModeReadWrite, true)) checkShardState(t, e, id[0], 0, shard.ModeReadWrite) @@ -191,7 +198,7 @@ func TestBlobstorFailback(t *testing.T) { require.ErrorIs(t, err, object.ErrRangeOutOfBounds) } - checkShardState(t, e, id[0], 4, shard.ModeDegraded) + checkShardState(t, e, id[0], 2, shard.ModeDegraded) checkShardState(t, e, id[1], 0, shard.ModeReadWrite) } @@ -201,7 +208,7 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint e.mtx.RUnlock() require.Equal(t, mode, sh.GetMode()) - require.Equal(t, errCount, sh.errorCount.Load()) + require.Equal(t, errCount, sh.writeErrorCount.Load()+sh.metaErrorCount.Load()) } // corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable. diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 3127bf68e..0b975fa72 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -21,7 +21,9 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) { return true } - e.reportShardError(sh, "could not check existence of object in shard", err) + if res.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not check existence of object in shard", err) + } } if !exists { diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 9bf42a3f6..4747c5184 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -107,7 +107,9 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { return true // stop, return it back default: - e.reportShardError(sh, "could not get object from shard", err) + if sh.GetMode()&shard.ModeDegraded == 0 { + e.reportShardError(sh, sh.metaErrorCount, "could not get object from shard", err) + } return false } } @@ -139,8 +141,9 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { if obj == nil { return GetRes{}, outError } - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, zap.Stringer("address", prm.addr)) + e.log.Warn("meta info was present, but object is missing", + zap.String("err", metaError.Error()), + zap.Stringer("address", prm.addr)) } return GetRes{ diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index d17acc1d3..5da0be28a 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -112,7 +112,9 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { return true // stop, return it back default: - e.reportShardError(sh, "could not head object from shard", err) + if res.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not head object from shard", err) + } return false } } diff --git a/pkg/local_object_storage/engine/info.go b/pkg/local_object_storage/engine/info.go index 396663b4d..4470688ae 100644 --- a/pkg/local_object_storage/engine/info.go +++ b/pkg/local_object_storage/engine/info.go @@ -18,7 +18,7 @@ func (e *StorageEngine) DumpInfo() (i Info) { for _, sh := range e.shards { info := sh.DumpInfo() - info.ErrorCount = sh.errorCount.Load() + info.ErrorCount = sh.metaErrorCount.Load() i.Shards = append(i.Shards, info) } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 85c33bbaf..1aba0039e 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -108,6 +108,11 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE } }() + if sh.GetMode() != shard.ModeReadWrite { + // Inhume is a modifying operation on metabase, so return here. + return false + } + if checkExists { existPrm.WithAddress(addr) exRes, err := sh.Exists(existPrm) @@ -120,7 +125,9 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { - e.reportShardError(sh, "could not check for presents in shard", err) + if exRes.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not check for presents in shard", err) + } return } @@ -132,13 +139,12 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE _, err := sh.Inhume(prm) if err != nil { - e.reportShardError(sh, "could not inhume object in shard", err) - if errors.As(err, &errLocked) { status = 1 return true } + e.reportShardError(sh, sh.metaErrorCount, "could not inhume object in shard", err) return false } diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 6fd2eadde..6bb629551 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -72,7 +72,10 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi if err != nil { var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { - e.reportShardError(sh, "could not check locked object for presence in shard", err) + // In non-degraded mode the error originated from the metabase. + if exRes.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not check locked object for presence in shard", err) + } return } @@ -84,7 +87,7 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi err := sh.Lock(idCnr, locker, []oid.ID{locked}) if err != nil { - e.reportShardError(sh, "could not lock object in shard", err) + e.reportShardError(sh, sh.metaErrorCount, "could not lock object in shard", err) if errors.As(err, &errIrregular) { status = 1 diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 16d05ee31..1ffec8a4a 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -76,6 +76,9 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { exists, err := sh.Exists(existPrm) if err != nil { + if exists.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not check object existence", err) + } return // this is not ErrAlreadyRemoved error so we can go to the next shard } @@ -101,12 +104,20 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { var putPrm shard.PutPrm putPrm.WithObject(prm.obj) - _, err = sh.Put(putPrm) + var res shard.PutRes + res, err = sh.Put(putPrm) if err != nil { - e.log.Warn("could not put object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + if res.FromMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not put object in shard", err) + return + } else if res.FromBlobstor() { + e.reportShardError(sh, sh.writeErrorCount, "could not put object in shard", err) + return + } else { + e.log.Warn("could not put object in shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error())) + } return } diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 721ad713c..b64ada1be 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -126,7 +126,9 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { return true // stop, return it back default: - e.reportShardError(sh, "could not get object from shard", err) + if !res.HasMeta() { + e.reportShardError(sh, sh.metaErrorCount, "could not get object from shard", err) + } return false } } @@ -162,7 +164,8 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { if obj == nil { return RngRes{}, outError } - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", + e.reportShardError(shardWithMeta, shardWithMeta.metaErrorCount, + "meta info was present, but object is missing", metaError, zap.Stringer("address", prm.addr), ) diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 8142e300b..4d5260d95 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -68,7 +68,7 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.Select(shPrm) if err != nil { - e.reportShardError(sh, "could not select objects from shard", err) + e.reportShardError(sh, sh.metaErrorCount, "could not select objects from shard", err) return false } @@ -113,7 +113,7 @@ func (e *StorageEngine) list(limit uint64) (SelectRes, error) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.List() // consider limit result of shard iterator if err != nil { - e.reportShardError(sh, "could not select objects from shard", err) + e.reportShardError(sh, sh.metaErrorCount, "could not select objects from shard", err) } else { for _, addr := range res.AddressList() { // save only unique values if _, ok := uniqueMap[addr.EncodeToString()]; !ok { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 9f68e06ae..dafca0b14 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -50,8 +50,9 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { } e.shards[strID] = shardWrapper{ - errorCount: atomic.NewUint32(0), - Shard: sh, + metaErrorCount: atomic.NewUint32(0), + writeErrorCount: atomic.NewUint32(0), + Shard: sh, } e.shardPools[strID] = pool @@ -135,7 +136,8 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode, resetErrorCount for shID, sh := range e.shards { if id.String() == shID { if resetErrorCounter { - sh.errorCount.Store(0) + sh.metaErrorCount.Store(0) + sh.writeErrorCount.Store(0) } return sh.SetMode(m) } diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index fbc4e27c6..4af255649 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -21,7 +21,7 @@ func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pil if errors.Is(err, shard.ErrReadOnlyMode) { return nil, err } - e.reportShardError(sh, "can't perform `TreeMove`", err, + e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeMove`", err, zap.Stringer("cid", d.CID), zap.String("tree", treeID)) continue @@ -41,7 +41,7 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a if errors.Is(err, shard.ErrReadOnlyMode) { return nil, err } - e.reportShardError(sh, "can't perform `TreeAddByPath`", err, + e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeAddByPath`", err, zap.Stringer("cid", d.CID), zap.String("tree", treeID)) continue @@ -60,7 +60,7 @@ func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pi if errors.Is(err, shard.ErrReadOnlyMode) { return err } - e.reportShardError(sh, "can't perform `TreeApply`", err, + e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeApply`", err, zap.Stringer("cid", d.CID), zap.String("tree", treeID)) continue @@ -79,9 +79,9 @@ func (e *StorageEngine) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, nodes, err = sh.TreeGetByPath(cid, treeID, attr, path, latest) if err != nil { if !errors.Is(err, pilorama.ErrTreeNotFound) { - e.reportShardError(sh, "can't perform `TreeGetByPath`", err, - zap.Stringer("cid", cid), - zap.String("tree", treeID)) + //e.reportShardError(sh, "can't perform `TreeGetByPath`", err, + // zap.Stringer("cid", cid), + // zap.String("tree", treeID)) } continue } @@ -99,7 +99,7 @@ func (e *StorageEngine) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID piloram m, p, err = sh.TreeGetMeta(cid, treeID, nodeID) if err != nil { if !errors.Is(err, pilorama.ErrTreeNotFound) { - e.reportShardError(sh, "can't perform `TreeGetMeta`", err, + e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeGetMeta`", err, zap.Stringer("cid", cid), zap.String("tree", treeID)) } @@ -118,9 +118,9 @@ func (e *StorageEngine) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID pil nodes, err = sh.TreeGetChildren(cid, treeID, nodeID) if err != nil { if !errors.Is(err, pilorama.ErrTreeNotFound) { - e.reportShardError(sh, "can't perform `TreeGetChildren`", err, - zap.Stringer("cid", cid), - zap.String("tree", treeID)) + //e.reportShardError(sh, "can't perform `TreeGetChildren`", err, + // zap.Stringer("cid", cid), + // zap.String("tree", treeID)) } continue } @@ -137,9 +137,9 @@ func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64 lm, err = sh.TreeGetOpLog(cid, treeID, height) if err != nil { if !errors.Is(err, pilorama.ErrTreeNotFound) { - e.reportShardError(sh, "can't perform `TreeGetOpLog`", err, - zap.Stringer("cid", cid), - zap.String("tree", treeID)) + //e.reportShardError(sh, "can't perform `TreeGetOpLog`", err, + // zap.Stringer("cid", cid), + // zap.String("tree", treeID)) } continue } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index f1b44c7e3..4f23e9852 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -30,7 +30,7 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error { // Open opens all Shard's components. func (s *Shard) Open() error { components := []interface{ Open() error }{ - s.blobStor, s.pilorama, + s.blobStor, s.metaBase, s.pilorama, } if s.hasWriteCache() { @@ -69,6 +69,8 @@ func (s *Shard) Init() error { var components []initializer + metaIndex := -1 + if s.GetMode() != ModeDegraded { var initMetabase initializer @@ -78,6 +80,7 @@ func (s *Shard) Init() error { initMetabase = s.metaBase } + metaIndex = 1 components = []initializer{ s.blobStor, initMetabase, s.pilorama, } @@ -89,9 +92,9 @@ func (s *Shard) Init() error { components = append(components, s.writeCache) } - for _, component := range components { + for i, component := range components { if err := component.Init(); err != nil { - if component == s.metaBase { + if i == metaIndex { err = s.handleMetabaseFailure("init", err) if err != nil { return err diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 239fe2ebe..2b30eacb7 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -29,7 +29,8 @@ func (p *DeletePrm) WithAddresses(addr ...oid.Address) { // Delete removes data from the shard's writeCache, metaBase and // blobStor. func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { - if s.GetMode() != ModeReadWrite { + mode := s.GetMode() + if s.GetMode()&ModeReadOnly != 0 { return DeleteRes{}, ErrReadOnlyMode } @@ -61,9 +62,12 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { } } - err := meta.Delete(s.metaBase, prm.addr...) - if err != nil { - return DeleteRes{}, err // stop on metabase error ? + var err error + if mode&ModeDegraded == 0 { // Skip metabase errors in degraded mode. + err = meta.Delete(s.metaBase, prm.addr...) + if err != nil { + return DeleteRes{}, err // stop on metabase error ? + } } for i := range prm.addr { // delete small object diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index 6cb481f11..f9a887aef 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -14,7 +14,8 @@ type ExistsPrm struct { // ExistsRes groups the resulting values of Exists operation. type ExistsRes struct { - ex bool + ex bool + metaErr bool } // WithAddress is an Exists option to set object checked for existence. @@ -31,6 +32,11 @@ func (p ExistsRes) Exists() bool { return p.ex } +// FromMeta returns true if the error resulted from the metabase. +func (p ExistsRes) FromMeta() bool { + return p.metaErr +} + // Exists checks if object is presented in shard. // // Returns any error encountered that does not allow to @@ -38,26 +44,33 @@ func (p ExistsRes) Exists() bool { // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed. func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) { - exists, err := meta.Exists(s.metaBase, prm.addr) - if err != nil { - // If the shard is in degraded mode, try to consult blobstor directly. - // Otherwise, just return an error. - if s.GetMode() == ModeDegraded { - var p blobstor.ExistsPrm - p.SetAddress(prm.addr) + var exists bool + var err error - res, bErr := s.blobStor.Exists(p) - if bErr == nil { - exists = res.Exists() - s.log.Warn("metabase existence check finished with error", - zap.Stringer("address", prm.addr), - zap.String("error", err.Error())) - err = nil - } + mode := s.GetMode() + if mode&ModeDegraded == 0 { // In Degraded mode skip metabase consulting. + exists, err = meta.Exists(s.metaBase, prm.addr) + } + + metaErr := err != nil + if err != nil && mode&ModeDegraded != 0 { + var p blobstor.ExistsPrm + p.SetAddress(prm.addr) + + res, bErr := s.blobStor.Exists(p) + if bErr == nil { + exists = res.Exists() + s.log.Warn("metabase existence check finished with error", + zap.Stringer("address", prm.addr), + zap.String("error", err.Error())) + err = nil + } else if err == nil { + err = bErr } } return ExistsRes{ - ex: exists, + ex: exists, + metaErr: metaErr, }, err } diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 6c28c7c02..888f381d6 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -77,7 +77,6 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) { return res.Object(), nil } - small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) { var getSmallPrm blobstor.GetSmallPrm getSmallPrm.SetAddress(prm.addr) diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index 416100f38..e3513696b 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -17,7 +17,8 @@ type HeadPrm struct { // HeadRes groups the resulting values of Head operation. type HeadRes struct { - obj *objectSDK.Object + obj *objectSDK.Object + meta bool } // WithAddress is a Head option to set the address of the requested object. @@ -43,6 +44,11 @@ func (r HeadRes) Object() *objectSDK.Object { return r.obj } +// FromMeta returns true if the error is related to the metabase. +func (r HeadRes) FromMeta() bool { + return r.meta +} + // Head reads header of the object from the shard. // // Returns any error encountered. @@ -67,13 +73,25 @@ func (s *Shard) Head(prm HeadPrm) (HeadRes, error) { // otherwise object seems to be flushed to metabase } + if s.GetMode()&ModeDegraded != 0 { // In degraded mode, fallback to blobstor. + var getPrm GetPrm + getPrm.WithIgnoreMeta(true) + getPrm.WithAddress(getPrm.addr) + + res, err := s.Get(getPrm) + if err != nil { + return HeadRes{}, err + } + return HeadRes{obj: res.obj.CutPayload()}, nil + } + var headParams meta.GetPrm headParams.WithAddress(prm.addr) headParams.WithRaw(prm.raw) res, err := s.metaBase.Get(headParams) if err != nil { - return HeadRes{}, err + return HeadRes{meta: true}, err } return HeadRes{ diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index cdc582d56..6b9619981 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -15,7 +15,10 @@ type PutPrm struct { } // PutRes groups the resulting values of Put operation. -type PutRes struct{} +type PutRes struct { + metaErr bool + blobErr bool +} // WithObject is a Put option to set object to save. func (p *PutPrm) WithObject(obj *object.Object) { @@ -24,6 +27,14 @@ func (p *PutPrm) WithObject(obj *object.Object) { } } +func (r *PutRes) FromMeta() bool { + return r.metaErr +} + +func (r *PutRes) FromBlobstor() bool { + return r.blobErr +} + // Put saves the object in shard. // // Returns any error encountered that @@ -31,7 +42,8 @@ func (p *PutPrm) WithObject(obj *object.Object) { // // Returns ErrReadOnlyMode error if shard is in "read-only" mode. func (s *Shard) Put(prm PutPrm) (PutRes, error) { - if s.GetMode() != ModeReadWrite { + mode := s.GetMode() + if mode&ModeReadOnly != 0 { return PutRes{}, ErrReadOnlyMode } @@ -56,14 +68,16 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { ) if res, err = s.blobStor.Put(putPrm); err != nil { - return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err) + return PutRes{blobErr: true}, fmt.Errorf("could not put object to BLOB storage: %w", err) } - // put to metabase - if err := meta.Put(s.metaBase, prm.obj, res.BlobovniczaID()); err != nil { - // may we need to handle this case in a special way - // since the object has been successfully written to BlobStor - return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) + if mode&ModeDegraded == 0 { // In degraded mode, skip metabase. + // put to metabase + if err := meta.Put(s.metaBase, prm.obj, res.BlobovniczaID()); err != nil { + // may we need to handle this case in a special way + // since the object has been successfully written to BlobStor + return PutRes{metaErr: true}, fmt.Errorf("could not put object to metabase: %w", err) + } } return PutRes{}, nil