forked from TrueCloudLab/frostfs-node
[#1550] engine: Split errors on write- and meta- errors
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
dafc21b052
commit
7b5b735fb2
22 changed files with 185 additions and 84 deletions
|
@ -73,7 +73,7 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRe
|
|||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
size, err := shard.ContainerSize(sh.Shard, prm.cnr)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "can't get container size", err,
|
||||
e.reportShardError(sh, sh.metaErrorCount, "can't get container size", err,
|
||||
zap.Stringer("container_id", prm.cnr),
|
||||
)
|
||||
return false
|
||||
|
@ -121,7 +121,7 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) {
|
|||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
cnrs, err := shard.ListContainers(sh.Shard)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "can't get list of containers", err)
|
||||
e.reportShardError(sh, sh.metaErrorCount, "can't get list of containers", err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,9 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
|
|||
|
||||
resExists, err := sh.Exists(existsPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
if resExists.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not check object existence", err)
|
||||
}
|
||||
return false
|
||||
} else if !resExists.Exists() {
|
||||
return false
|
||||
|
@ -68,7 +70,9 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
|
|||
|
||||
_, err = sh.Inhume(shPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
if sh.GetMode() == shard.ModeReadWrite {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not inhume object in shard", err)
|
||||
}
|
||||
|
||||
locked.is = errors.As(err, &locked.err)
|
||||
|
||||
|
|
|
@ -28,7 +28,8 @@ type StorageEngine struct {
|
|||
}
|
||||
|
||||
type shardWrapper struct {
|
||||
errorCount *atomic.Uint32
|
||||
metaErrorCount *atomic.Uint32
|
||||
writeErrorCount *atomic.Uint32
|
||||
*shard.Shard
|
||||
}
|
||||
|
||||
|
@ -36,10 +37,11 @@ type shardWrapper struct {
|
|||
// If it does, shard is set to read-only mode.
|
||||
func (e *StorageEngine) reportShardError(
|
||||
sh hashedShard,
|
||||
errorCount *atomic.Uint32,
|
||||
msg string,
|
||||
err error,
|
||||
fields ...zap.Field) {
|
||||
errCount := sh.errorCount.Inc()
|
||||
errCount := errorCount.Inc()
|
||||
e.log.Warn(msg, append([]zap.Field{
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Uint32("error count", errCount),
|
||||
|
@ -50,7 +52,11 @@ func (e *StorageEngine) reportShardError(
|
|||
return
|
||||
}
|
||||
|
||||
err = sh.SetMode(shard.ModeDegraded)
|
||||
if errorCount == sh.writeErrorCount {
|
||||
err = sh.SetMode(sh.GetMode() | shard.ModeReadOnly)
|
||||
} else {
|
||||
err = sh.SetMode(sh.GetMode() | shard.ModeDegraded)
|
||||
}
|
||||
if err != nil {
|
||||
e.log.Error("failed to move shard in degraded mode",
|
||||
zap.Uint32("error count", errCount),
|
||||
|
|
|
@ -78,7 +78,8 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
|
|||
}
|
||||
|
||||
engine.shards[s.ID().String()] = shardWrapper{
|
||||
errorCount: atomic.NewUint32(0),
|
||||
writeErrorCount: atomic.NewUint32(0),
|
||||
metaErrorCount: atomic.NewUint32(0),
|
||||
Shard: s,
|
||||
}
|
||||
engine.shardPools[s.ID().String()] = pool
|
||||
|
|
|
@ -63,6 +63,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
|
||||
func TestErrorReporting(t *testing.T) {
|
||||
t.Run("ignore errors by default", func(t *testing.T) {
|
||||
t.Skip()
|
||||
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
|
||||
|
||||
obj := generateObjectWithCID(t, cidtest.ID())
|
||||
|
@ -111,10 +112,16 @@ func TestErrorReporting(t *testing.T) {
|
|||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
|
||||
e.mtx.RLock()
|
||||
sh := e.shards[id[0].String()]
|
||||
e.mtx.RUnlock()
|
||||
fmt.Println(sh.writeErrorCount, sh.metaErrorCount, errThreshold)
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < errThreshold; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
fmt.Println(sh.writeErrorCount, sh.metaErrorCount)
|
||||
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
|
@ -123,12 +130,12 @@ func TestErrorReporting(t *testing.T) {
|
|||
for i := uint32(0); i < 2; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], errThreshold+i, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[0], errThreshold, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
}
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], shard.ModeReadWrite, false))
|
||||
checkShardState(t, e, id[0], errThreshold+1, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], errThreshold, shard.ModeReadWrite)
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], shard.ModeReadWrite, true))
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
|
@ -191,7 +198,7 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
require.ErrorIs(t, err, object.ErrRangeOutOfBounds)
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 4, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[0], 2, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
}
|
||||
|
||||
|
@ -201,7 +208,7 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
|
|||
e.mtx.RUnlock()
|
||||
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
require.Equal(t, errCount, sh.errorCount.Load())
|
||||
require.Equal(t, errCount, sh.writeErrorCount.Load()+sh.metaErrorCount.Load())
|
||||
}
|
||||
|
||||
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
|
||||
|
|
|
@ -21,7 +21,9 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
|
|||
return true
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
||||
if res.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not check existence of object in shard", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !exists {
|
||||
|
|
|
@ -107,7 +107,9 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
e.reportShardError(sh, "could not get object from shard", err)
|
||||
if sh.GetMode()&shard.ModeDegraded == 0 {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not get object from shard", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -139,8 +141,9 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
|
|||
if obj == nil {
|
||||
return GetRes{}, outError
|
||||
}
|
||||
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
|
||||
metaError, zap.Stringer("address", prm.addr))
|
||||
e.log.Warn("meta info was present, but object is missing",
|
||||
zap.String("err", metaError.Error()),
|
||||
zap.Stringer("address", prm.addr))
|
||||
}
|
||||
|
||||
return GetRes{
|
||||
|
|
|
@ -112,7 +112,9 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
e.reportShardError(sh, "could not head object from shard", err)
|
||||
if res.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not head object from shard", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ func (e *StorageEngine) DumpInfo() (i Info) {
|
|||
|
||||
for _, sh := range e.shards {
|
||||
info := sh.DumpInfo()
|
||||
info.ErrorCount = sh.errorCount.Load()
|
||||
info.ErrorCount = sh.metaErrorCount.Load()
|
||||
i.Shards = append(i.Shards, info)
|
||||
}
|
||||
|
||||
|
|
|
@ -108,6 +108,11 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
|
|||
}
|
||||
}()
|
||||
|
||||
if sh.GetMode() != shard.ModeReadWrite {
|
||||
// Inhume is a modifying operation on metabase, so return here.
|
||||
return false
|
||||
}
|
||||
|
||||
if checkExists {
|
||||
existPrm.WithAddress(addr)
|
||||
exRes, err := sh.Exists(existPrm)
|
||||
|
@ -120,7 +125,9 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
|
|||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
||||
if exRes.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not check for presents in shard", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -132,13 +139,12 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
|
|||
|
||||
_, err := sh.Inhume(prm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
|
||||
if errors.As(err, &errLocked) {
|
||||
status = 1
|
||||
return true
|
||||
}
|
||||
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not inhume object in shard", err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,10 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi
|
|||
if err != nil {
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
e.reportShardError(sh, "could not check locked object for presence in shard", err)
|
||||
// In non-degraded mode the error originated from the metabase.
|
||||
if exRes.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not check locked object for presence in shard", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -84,7 +87,7 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi
|
|||
|
||||
err := sh.Lock(idCnr, locker, []oid.ID{locked})
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not lock object in shard", err)
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not lock object in shard", err)
|
||||
|
||||
if errors.As(err, &errIrregular) {
|
||||
status = 1
|
||||
|
|
|
@ -76,6 +76,9 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
|||
|
||||
exists, err := sh.Exists(existPrm)
|
||||
if err != nil {
|
||||
if exists.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not check object existence", err)
|
||||
}
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
||||
|
@ -101,12 +104,20 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
|||
var putPrm shard.PutPrm
|
||||
putPrm.WithObject(prm.obj)
|
||||
|
||||
_, err = sh.Put(putPrm)
|
||||
var res shard.PutRes
|
||||
res, err = sh.Put(putPrm)
|
||||
if err != nil {
|
||||
if res.FromMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not put object in shard", err)
|
||||
return
|
||||
} else if res.FromBlobstor() {
|
||||
e.reportShardError(sh, sh.writeErrorCount, "could not put object in shard", err)
|
||||
return
|
||||
} else {
|
||||
e.log.Warn("could not put object in shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
@ -126,7 +126,9 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
e.reportShardError(sh, "could not get object from shard", err)
|
||||
if !res.HasMeta() {
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not get object from shard", err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -162,7 +164,8 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
|
|||
if obj == nil {
|
||||
return RngRes{}, outError
|
||||
}
|
||||
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
|
||||
e.reportShardError(shardWithMeta, shardWithMeta.metaErrorCount,
|
||||
"meta info was present, but object is missing",
|
||||
metaError,
|
||||
zap.Stringer("address", prm.addr),
|
||||
)
|
||||
|
|
|
@ -68,7 +68,7 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) {
|
|||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
res, err := sh.Select(shPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not select objects from shard", err)
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not select objects from shard", err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -113,7 +113,7 @@ func (e *StorageEngine) list(limit uint64) (SelectRes, error) {
|
|||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
res, err := sh.List() // consider limit result of shard iterator
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not select objects from shard", err)
|
||||
e.reportShardError(sh, sh.metaErrorCount, "could not select objects from shard", err)
|
||||
} else {
|
||||
for _, addr := range res.AddressList() { // save only unique values
|
||||
if _, ok := uniqueMap[addr.EncodeToString()]; !ok {
|
||||
|
|
|
@ -50,7 +50,8 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
|||
}
|
||||
|
||||
e.shards[strID] = shardWrapper{
|
||||
errorCount: atomic.NewUint32(0),
|
||||
metaErrorCount: atomic.NewUint32(0),
|
||||
writeErrorCount: atomic.NewUint32(0),
|
||||
Shard: sh,
|
||||
}
|
||||
|
||||
|
@ -135,7 +136,8 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode, resetErrorCount
|
|||
for shID, sh := range e.shards {
|
||||
if id.String() == shID {
|
||||
if resetErrorCounter {
|
||||
sh.errorCount.Store(0)
|
||||
sh.metaErrorCount.Store(0)
|
||||
sh.writeErrorCount.Store(0)
|
||||
}
|
||||
return sh.SetMode(m)
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pil
|
|||
if errors.Is(err, shard.ErrReadOnlyMode) {
|
||||
return nil, err
|
||||
}
|
||||
e.reportShardError(sh, "can't perform `TreeMove`", err,
|
||||
e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeMove`", err,
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.String("tree", treeID))
|
||||
continue
|
||||
|
@ -41,7 +41,7 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
|
|||
if errors.Is(err, shard.ErrReadOnlyMode) {
|
||||
return nil, err
|
||||
}
|
||||
e.reportShardError(sh, "can't perform `TreeAddByPath`", err,
|
||||
e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeAddByPath`", err,
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.String("tree", treeID))
|
||||
continue
|
||||
|
@ -60,7 +60,7 @@ func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pi
|
|||
if errors.Is(err, shard.ErrReadOnlyMode) {
|
||||
return err
|
||||
}
|
||||
e.reportShardError(sh, "can't perform `TreeApply`", err,
|
||||
e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeApply`", err,
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.String("tree", treeID))
|
||||
continue
|
||||
|
@ -79,9 +79,9 @@ func (e *StorageEngine) TreeGetByPath(cid cidSDK.ID, treeID string, attr string,
|
|||
nodes, err = sh.TreeGetByPath(cid, treeID, attr, path, latest)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
e.reportShardError(sh, "can't perform `TreeGetByPath`", err,
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID))
|
||||
//e.reportShardError(sh, "can't perform `TreeGetByPath`", err,
|
||||
// zap.Stringer("cid", cid),
|
||||
// zap.String("tree", treeID))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ func (e *StorageEngine) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID piloram
|
|||
m, p, err = sh.TreeGetMeta(cid, treeID, nodeID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
e.reportShardError(sh, "can't perform `TreeGetMeta`", err,
|
||||
e.reportShardError(sh, sh.writeErrorCount, "can't perform `TreeGetMeta`", err,
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID))
|
||||
}
|
||||
|
@ -118,9 +118,9 @@ func (e *StorageEngine) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID pil
|
|||
nodes, err = sh.TreeGetChildren(cid, treeID, nodeID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
e.reportShardError(sh, "can't perform `TreeGetChildren`", err,
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID))
|
||||
//e.reportShardError(sh, "can't perform `TreeGetChildren`", err,
|
||||
// zap.Stringer("cid", cid),
|
||||
// zap.String("tree", treeID))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -137,9 +137,9 @@ func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64
|
|||
lm, err = sh.TreeGetOpLog(cid, treeID, height)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
e.reportShardError(sh, "can't perform `TreeGetOpLog`", err,
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID))
|
||||
//e.reportShardError(sh, "can't perform `TreeGetOpLog`", err,
|
||||
// zap.Stringer("cid", cid),
|
||||
// zap.String("tree", treeID))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
|||
// Open opens all Shard's components.
|
||||
func (s *Shard) Open() error {
|
||||
components := []interface{ Open() error }{
|
||||
s.blobStor, s.pilorama,
|
||||
s.blobStor, s.metaBase, s.pilorama,
|
||||
}
|
||||
|
||||
if s.hasWriteCache() {
|
||||
|
@ -69,6 +69,8 @@ func (s *Shard) Init() error {
|
|||
|
||||
var components []initializer
|
||||
|
||||
metaIndex := -1
|
||||
|
||||
if s.GetMode() != ModeDegraded {
|
||||
var initMetabase initializer
|
||||
|
||||
|
@ -78,6 +80,7 @@ func (s *Shard) Init() error {
|
|||
initMetabase = s.metaBase
|
||||
}
|
||||
|
||||
metaIndex = 1
|
||||
components = []initializer{
|
||||
s.blobStor, initMetabase, s.pilorama,
|
||||
}
|
||||
|
@ -89,9 +92,9 @@ func (s *Shard) Init() error {
|
|||
components = append(components, s.writeCache)
|
||||
}
|
||||
|
||||
for _, component := range components {
|
||||
for i, component := range components {
|
||||
if err := component.Init(); err != nil {
|
||||
if component == s.metaBase {
|
||||
if i == metaIndex {
|
||||
err = s.handleMetabaseFailure("init", err)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -29,7 +29,8 @@ func (p *DeletePrm) WithAddresses(addr ...oid.Address) {
|
|||
// Delete removes data from the shard's writeCache, metaBase and
|
||||
// blobStor.
|
||||
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
mode := s.GetMode()
|
||||
if s.GetMode()&ModeReadOnly != 0 {
|
||||
return DeleteRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
@ -61,10 +62,13 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
|||
}
|
||||
}
|
||||
|
||||
err := meta.Delete(s.metaBase, prm.addr...)
|
||||
var err error
|
||||
if mode&ModeDegraded == 0 { // Skip metabase errors in degraded mode.
|
||||
err = meta.Delete(s.metaBase, prm.addr...)
|
||||
if err != nil {
|
||||
return DeleteRes{}, err // stop on metabase error ?
|
||||
}
|
||||
}
|
||||
|
||||
for i := range prm.addr { // delete small object
|
||||
if id, ok := smalls[prm.addr[i]]; ok {
|
||||
|
|
|
@ -15,6 +15,7 @@ type ExistsPrm struct {
|
|||
// ExistsRes groups the resulting values of Exists operation.
|
||||
type ExistsRes struct {
|
||||
ex bool
|
||||
metaErr bool
|
||||
}
|
||||
|
||||
// WithAddress is an Exists option to set object checked for existence.
|
||||
|
@ -31,6 +32,11 @@ func (p ExistsRes) Exists() bool {
|
|||
return p.ex
|
||||
}
|
||||
|
||||
// FromMeta returns true if the error resulted from the metabase.
|
||||
func (p ExistsRes) FromMeta() bool {
|
||||
return p.metaErr
|
||||
}
|
||||
|
||||
// Exists checks if object is presented in shard.
|
||||
//
|
||||
// Returns any error encountered that does not allow to
|
||||
|
@ -38,11 +44,16 @@ func (p ExistsRes) Exists() bool {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
||||
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
||||
exists, err := meta.Exists(s.metaBase, prm.addr)
|
||||
if err != nil {
|
||||
// If the shard is in degraded mode, try to consult blobstor directly.
|
||||
// Otherwise, just return an error.
|
||||
if s.GetMode() == ModeDegraded {
|
||||
var exists bool
|
||||
var err error
|
||||
|
||||
mode := s.GetMode()
|
||||
if mode&ModeDegraded == 0 { // In Degraded mode skip metabase consulting.
|
||||
exists, err = meta.Exists(s.metaBase, prm.addr)
|
||||
}
|
||||
|
||||
metaErr := err != nil
|
||||
if err != nil && mode&ModeDegraded != 0 {
|
||||
var p blobstor.ExistsPrm
|
||||
p.SetAddress(prm.addr)
|
||||
|
||||
|
@ -53,11 +64,13 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
|||
zap.Stringer("address", prm.addr),
|
||||
zap.String("error", err.Error()))
|
||||
err = nil
|
||||
}
|
||||
} else if err == nil {
|
||||
err = bErr
|
||||
}
|
||||
}
|
||||
|
||||
return ExistsRes{
|
||||
ex: exists,
|
||||
metaErr: metaErr,
|
||||
}, err
|
||||
}
|
||||
|
|
|
@ -77,7 +77,6 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
|||
|
||||
return res.Object(), nil
|
||||
}
|
||||
|
||||
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||
var getSmallPrm blobstor.GetSmallPrm
|
||||
getSmallPrm.SetAddress(prm.addr)
|
||||
|
|
|
@ -18,6 +18,7 @@ type HeadPrm struct {
|
|||
// HeadRes groups the resulting values of Head operation.
|
||||
type HeadRes struct {
|
||||
obj *objectSDK.Object
|
||||
meta bool
|
||||
}
|
||||
|
||||
// WithAddress is a Head option to set the address of the requested object.
|
||||
|
@ -43,6 +44,11 @@ func (r HeadRes) Object() *objectSDK.Object {
|
|||
return r.obj
|
||||
}
|
||||
|
||||
// FromMeta returns true if the error is related to the metabase.
|
||||
func (r HeadRes) FromMeta() bool {
|
||||
return r.meta
|
||||
}
|
||||
|
||||
// Head reads header of the object from the shard.
|
||||
//
|
||||
// Returns any error encountered.
|
||||
|
@ -67,13 +73,25 @@ func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
|
|||
// otherwise object seems to be flushed to metabase
|
||||
}
|
||||
|
||||
if s.GetMode()&ModeDegraded != 0 { // In degraded mode, fallback to blobstor.
|
||||
var getPrm GetPrm
|
||||
getPrm.WithIgnoreMeta(true)
|
||||
getPrm.WithAddress(getPrm.addr)
|
||||
|
||||
res, err := s.Get(getPrm)
|
||||
if err != nil {
|
||||
return HeadRes{}, err
|
||||
}
|
||||
return HeadRes{obj: res.obj.CutPayload()}, nil
|
||||
}
|
||||
|
||||
var headParams meta.GetPrm
|
||||
headParams.WithAddress(prm.addr)
|
||||
headParams.WithRaw(prm.raw)
|
||||
|
||||
res, err := s.metaBase.Get(headParams)
|
||||
if err != nil {
|
||||
return HeadRes{}, err
|
||||
return HeadRes{meta: true}, err
|
||||
}
|
||||
|
||||
return HeadRes{
|
||||
|
|
|
@ -15,7 +15,10 @@ type PutPrm struct {
|
|||
}
|
||||
|
||||
// PutRes groups the resulting values of Put operation.
|
||||
type PutRes struct{}
|
||||
type PutRes struct {
|
||||
metaErr bool
|
||||
blobErr bool
|
||||
}
|
||||
|
||||
// WithObject is a Put option to set object to save.
|
||||
func (p *PutPrm) WithObject(obj *object.Object) {
|
||||
|
@ -24,6 +27,14 @@ func (p *PutPrm) WithObject(obj *object.Object) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *PutRes) FromMeta() bool {
|
||||
return r.metaErr
|
||||
}
|
||||
|
||||
func (r *PutRes) FromBlobstor() bool {
|
||||
return r.blobErr
|
||||
}
|
||||
|
||||
// Put saves the object in shard.
|
||||
//
|
||||
// Returns any error encountered that
|
||||
|
@ -31,7 +42,8 @@ func (p *PutPrm) WithObject(obj *object.Object) {
|
|||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
mode := s.GetMode()
|
||||
if mode&ModeReadOnly != 0 {
|
||||
return PutRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
@ -56,14 +68,16 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
|||
)
|
||||
|
||||
if res, err = s.blobStor.Put(putPrm); err != nil {
|
||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||
return PutRes{blobErr: true}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||
}
|
||||
|
||||
if mode&ModeDegraded == 0 { // In degraded mode, skip metabase.
|
||||
// put to metabase
|
||||
if err := meta.Put(s.metaBase, prm.obj, res.BlobovniczaID()); err != nil {
|
||||
// may we need to handle this case in a special way
|
||||
// since the object has been successfully written to BlobStor
|
||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
return PutRes{metaErr: true}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return PutRes{}, nil
|
||||
|
|
Loading…
Reference in a new issue