forked from TrueCloudLab/frostfs-node
[#1819] engine: Increase error counter for PUT errors
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
b89e71fa78
commit
2e3ef817f4
3 changed files with 18 additions and 7 deletions
|
@ -11,6 +11,10 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ErrNoPlaceFound is returned when object can't be saved to any sub-storage component
|
||||
// because of the policy.
|
||||
var ErrNoPlaceFound = errors.New("couldn't find a place to store an object")
|
||||
|
||||
// Put saves the object in BLOB storage.
|
||||
//
|
||||
// If object is "big", BlobStor saves the object in shallow dir.
|
||||
|
@ -46,7 +50,7 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return common.PutRes{}, errors.New("couldn't find a place to store an object")
|
||||
return common.PutRes{}, ErrNoPlaceFound
|
||||
}
|
||||
|
||||
// NeedsCompression returns true if the object should be compressed.
|
||||
|
|
|
@ -133,7 +133,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
|
|||
if shards[j].ID().String() == sid {
|
||||
continue
|
||||
}
|
||||
putDone, exists := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object())
|
||||
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, lst[i], getRes.Object())
|
||||
if putDone || exists {
|
||||
if putDone {
|
||||
e.log.Debug("object is moved to another shard",
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -66,7 +68,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
|||
pool := e.shardPools[sh.ID().String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
putDone, exists := e.putToShard(sh.Shard, ind, pool, addr, prm.obj)
|
||||
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
|
||||
finished = putDone || exists
|
||||
return finished
|
||||
})
|
||||
|
@ -81,7 +83,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
|||
// putToShard puts object to sh.
|
||||
// First return value is true iff put has been successfully done.
|
||||
// Second return value is true iff object already exists.
|
||||
func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
|
||||
func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
|
||||
var putSuccess, alreadyExists bool
|
||||
|
||||
exitCh := make(chan struct{})
|
||||
|
@ -126,10 +128,15 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo
|
|||
|
||||
_, err = sh.Put(putPrm)
|
||||
if err != nil {
|
||||
e.log.Warn("could not put object in shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
|
||||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
|
||||
e.log.Warn("could not put object to shard",
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not put object to shard", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue