forked from TrueCloudLab/frostfs-node
[#1118] engine: allow to set error threshold
There are certain errors which are not expected during usual node operation and which tell us that something is wrong with the shard. To prevent possible data corruption, move shard in read-only mode after amount of errors exceeded some threshold. By default no actions are performed. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
4f3323f084
commit
6ad2624552
13 changed files with 243 additions and 113 deletions
|
@ -72,14 +72,11 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (*ContainerSizeRes,
|
|||
|
||||
var res ContainerSizeRes
|
||||
|
||||
e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) {
|
||||
size, err := shard.ContainerSize(s, prm.cid)
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
size, err := shard.ContainerSize(sh.Shard, prm.cid)
|
||||
if err != nil {
|
||||
e.log.Warn("can't get container size",
|
||||
zap.Stringer("shard_id", s.ID()),
|
||||
zap.Stringer("container_id", prm.cid),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
e.reportShardError(sh, "can't get container size", err,
|
||||
zap.Stringer("container_id", prm.cid))
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -122,13 +119,10 @@ func (e *StorageEngine) listContainers() (*ListContainersRes, error) {
|
|||
|
||||
uniqueIDs := make(map[string]*cid.ID)
|
||||
|
||||
e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) {
|
||||
cnrs, err := shard.ListContainers(s)
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
cnrs, err := shard.ListContainers(sh.Shard)
|
||||
if err != nil {
|
||||
e.log.Warn("can't get list of containers",
|
||||
zap.Stringer("shard_id", s.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
e.reportShardError(sh, "can't get list of containers", err)
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ package engine
|
|||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DeletePrm groups the parameters of Delete operation.
|
||||
|
@ -46,15 +45,10 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
existsPrm := new(shard.ExistsPrm)
|
||||
|
||||
for i := range prm.addr {
|
||||
e.iterateOverSortedShards(prm.addr[i], func(_ int, sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(prm.addr[i], func(_ int, sh hashedShard) (stop bool) {
|
||||
resExists, err := sh.Exists(existsPrm.WithAddress(prm.addr[i]))
|
||||
if err != nil {
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not check object existence",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
return false
|
||||
} else if !resExists.Exists() {
|
||||
return false
|
||||
|
@ -62,11 +56,7 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
|
||||
_, err = sh.Inhume(shPrm.MarkAsGarbage(prm.addr[i]))
|
||||
if err != nil {
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not inhume object in shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
}
|
||||
|
||||
return err == nil
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -15,7 +16,7 @@ type StorageEngine struct {
|
|||
|
||||
mtx *sync.RWMutex
|
||||
|
||||
shards map[string]*shard.Shard
|
||||
shards map[string]shardWrapper
|
||||
|
||||
shardPools map[string]util.WorkerPool
|
||||
|
||||
|
@ -26,12 +27,49 @@ type StorageEngine struct {
|
|||
}
|
||||
}
|
||||
|
||||
type shardWrapper struct {
|
||||
errorCount *atomic.Uint32
|
||||
*shard.Shard
|
||||
}
|
||||
|
||||
// reportShardError checks that amount of errors doesn't exceed configured threshold.
|
||||
// If it does, shard is set to read-only mode.
|
||||
func (e *StorageEngine) reportShardError(
|
||||
sh hashedShard,
|
||||
msg string,
|
||||
err error,
|
||||
fields ...zap.Field) {
|
||||
errCount := sh.errorCount.Inc()
|
||||
e.log.Warn(msg, append([]zap.Field{
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Uint32("error count", errCount),
|
||||
zap.String("error", err.Error()),
|
||||
}, fields...)...)
|
||||
|
||||
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
err = sh.SetMode(shard.ModeReadOnly)
|
||||
if err != nil {
|
||||
e.log.Error("failed to move shard in read-only mode",
|
||||
zap.Uint32("error count", errCount),
|
||||
zap.Error(err))
|
||||
} else {
|
||||
e.log.Info("shard is moved in read-only due to error threshold",
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.Uint32("error count", errCount))
|
||||
}
|
||||
}
|
||||
|
||||
// Option represents StorageEngine's constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
log *logger.Logger
|
||||
|
||||
errorsThreshold uint32
|
||||
|
||||
metrics MetricRegister
|
||||
|
||||
shardPoolSize uint32
|
||||
|
@ -56,7 +94,7 @@ func New(opts ...Option) *StorageEngine {
|
|||
return &StorageEngine{
|
||||
cfg: c,
|
||||
mtx: new(sync.RWMutex),
|
||||
shards: make(map[string]*shard.Shard),
|
||||
shards: make(map[string]shardWrapper),
|
||||
shardPools: make(map[string]util.WorkerPool),
|
||||
}
|
||||
}
|
||||
|
@ -80,3 +118,11 @@ func WithShardPoolSize(sz uint32) Option {
|
|||
c.shardPoolSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithErrorThreshold returns an option to specify size amount of errors after which
|
||||
// shard is moved to read-only mode.
|
||||
func WithErrorThreshold(sz uint32) Option {
|
||||
return func(c *cfg) {
|
||||
c.errorsThreshold = sz
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/nspcc-dev/tzhash/tz"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -32,7 +33,7 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
|
|||
log: zap.L(),
|
||||
},
|
||||
mtx: new(sync.RWMutex),
|
||||
shards: make(map[string]*shard.Shard, len(shards)),
|
||||
shards: make(map[string]shardWrapper, len(shards)),
|
||||
shardPools: make(map[string]util.WorkerPool, len(shards)),
|
||||
}
|
||||
|
||||
|
@ -42,7 +43,10 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
engine.shards[s.ID().String()] = s
|
||||
engine.shards[s.ID().String()] = shardWrapper{
|
||||
errorCount: atomic.NewUint32(0),
|
||||
Shard: s,
|
||||
}
|
||||
engine.shardPools[s.ID().String()] = pool
|
||||
}
|
||||
|
||||
|
|
139
pkg/local_object_storage/engine/error_test.go
Normal file
139
pkg/local_object_storage/engine/error_test.go
Normal file
|
@ -0,0 +1,139 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestErrorReporting(t *testing.T) {
|
||||
const smallSize = 100
|
||||
|
||||
log := zaptest.NewLogger(t)
|
||||
newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
|
||||
e := New(
|
||||
WithLogger(log),
|
||||
WithShardPoolSize(1),
|
||||
WithErrorThreshold(errThreshold))
|
||||
|
||||
var ids [2]*shard.ID
|
||||
|
||||
for i := range ids {
|
||||
ids[i], err = e.AddShard(
|
||||
shard.WithLogger(log),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
blobstor.WithShallowDepth(1),
|
||||
blobstor.WithBlobovniczaShallowWidth(1),
|
||||
blobstor.WithBlobovniczaShallowDepth(1),
|
||||
blobstor.WithSmallSizeLimit(100),
|
||||
blobstor.WithRootPerm(0700)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPermissions(0700)))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
return e, dir, ids
|
||||
}
|
||||
|
||||
t.Run("ignore errors by default", func(t *testing.T) {
|
||||
e, dir, id := newEngine(t, 0)
|
||||
|
||||
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, smallSize))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj.Object())
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(&GetPrm{addr: obj.Object().Address()})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < 3; i++ {
|
||||
_, err = e.Get(&GetPrm{addr: obj.Object().Address()})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
}
|
||||
})
|
||||
t.Run("with error threshold", func(t *testing.T) {
|
||||
const errThreshold = 3
|
||||
|
||||
e, dir, id := newEngine(t, errThreshold)
|
||||
|
||||
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, smallSize))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj.Object())
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(&GetPrm{addr: obj.Object().Address()})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < errThreshold; i++ {
|
||||
_, err = e.Get(&GetPrm{addr: obj.Object().Address()})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
}
|
||||
|
||||
for i := uint32(0); i < 2; i++ {
|
||||
_, err = e.Get(&GetPrm{addr: obj.Object().Address()})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], errThreshold+i, shard.ModeReadOnly)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) {
|
||||
e.mtx.RLock()
|
||||
sh := e.shards[id.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
require.Equal(t, errCount, sh.errorCount.Load())
|
||||
}
|
||||
|
||||
// corruptSubDir makes random directory in blobstor FSTree unreadable.
|
||||
func corruptSubDir(t *testing.T, dir string) {
|
||||
de, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
for i := range de {
|
||||
if de[i].IsDir() {
|
||||
require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) {
|
||||
|
@ -14,7 +13,7 @@ func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) {
|
|||
alreadyRemoved := false
|
||||
exists := false
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Exists(shPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, object.ErrAlreadyRemoved) {
|
||||
|
@ -23,11 +22,7 @@ func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) {
|
|||
return true
|
||||
}
|
||||
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not check existence of object in shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
||||
}
|
||||
|
||||
if res != nil && !exists {
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// GetPrm groups the parameters of Get operation.
|
||||
|
@ -69,7 +68,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
shPrm := new(shard.GetPrm).
|
||||
WithAddress(prm.addr)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
if err != nil {
|
||||
switch {
|
||||
|
@ -95,13 +94,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
// TODO: smth wrong with shard, need to be processed, but
|
||||
// still go to next shard
|
||||
e.log.Warn("could not get object from shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
e.reportShardError(sh, "could not get object from shard", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// HeadPrm groups the parameters of Head operation.
|
||||
|
@ -85,7 +84,7 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) {
|
|||
WithAddress(prm.addr).
|
||||
WithRaw(prm.raw)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Head(shPrm)
|
||||
if err != nil {
|
||||
switch {
|
||||
|
@ -111,13 +110,7 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
// TODO: smth wrong with shard, need to be processed, but
|
||||
// still go to next shard
|
||||
e.log.Warn("could not head object from shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
e.reportShardError(sh, "could not head object from shard", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// InhumePrm encapsulates parameters for inhume operation.
|
||||
|
@ -89,7 +88,7 @@ func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) {
|
|||
func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm, checkExists bool) (ok bool) {
|
||||
root := false
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
defer func() {
|
||||
// if object is root we continue since information about it
|
||||
// can be presented in other shards
|
||||
|
@ -111,12 +110,7 @@ func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm
|
|||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not check for presents in shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
e.reportShardError(sh, "could not check for presents in shard", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -128,11 +122,7 @@ func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm
|
|||
|
||||
_, err := sh.Inhume(prm)
|
||||
if err != nil {
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not inhume object in shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
} else {
|
||||
ok = true
|
||||
}
|
||||
|
@ -150,7 +140,7 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*o
|
|||
tss[addrs[i].String()] = struct{}{}
|
||||
}
|
||||
|
||||
e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
sh.HandleExpiredTombstones(tss)
|
||||
|
||||
select {
|
||||
|
|
|
@ -59,9 +59,9 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
|
||||
finished := false
|
||||
|
||||
e.iterateOverSortedShards(prm.obj.Address(), func(ind int, s *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(prm.obj.Address(), func(ind int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
pool := e.shardPools[s.ID().String()]
|
||||
pool := e.shardPools[sh.ID().String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
exitCh := make(chan struct{})
|
||||
|
@ -69,7 +69,7 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
if err := pool.Submit(func() {
|
||||
defer close(exitCh)
|
||||
|
||||
exists, err := s.Exists(existPrm)
|
||||
exists, err := sh.Exists(existPrm)
|
||||
if err != nil {
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
@ -79,10 +79,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
toMoveItPrm := new(shard.ToMoveItPrm)
|
||||
toMoveItPrm.WithAddress(prm.obj.Address())
|
||||
|
||||
_, err = s.ToMoveIt(toMoveItPrm)
|
||||
_, err = sh.ToMoveIt(toMoveItPrm)
|
||||
if err != nil {
|
||||
e.log.Warn("could not mark object for shard relocation",
|
||||
zap.Stringer("shard", s.ID()),
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
@ -96,10 +96,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
putPrm := new(shard.PutPrm)
|
||||
putPrm.WithObject(prm.obj)
|
||||
|
||||
_, err = s.Put(putPrm)
|
||||
_, err = sh.Put(putPrm)
|
||||
if err != nil {
|
||||
e.log.Warn("could not put object in shard",
|
||||
zap.Stringer("shard", s.ID()),
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// RngPrm groups the parameters of GetRange operation.
|
||||
|
@ -88,7 +87,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
WithAddress(prm.addr).
|
||||
WithRange(prm.off, prm.ln)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
if err != nil {
|
||||
switch {
|
||||
|
@ -116,13 +115,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
|
||||
return true // stop, return it back
|
||||
default:
|
||||
// TODO: smth wrong with shard, need to be processed, but
|
||||
// still go to next shard
|
||||
e.log.Warn("could not get object from shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
e.reportShardError(sh, "could not get object from shard", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// SelectPrm groups the parameters of Select operation.
|
||||
|
@ -72,7 +71,7 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) {
|
|||
WithContainerID(prm.cid).
|
||||
WithFilters(prm.filters)
|
||||
|
||||
e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
res, err := sh.Select(shPrm)
|
||||
if err != nil {
|
||||
switch {
|
||||
|
@ -82,11 +81,7 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) {
|
|||
|
||||
return true
|
||||
default:
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not select objects from shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
e.reportShardError(sh, "could not select objects from shard", err)
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
|
@ -129,14 +124,10 @@ func (e *StorageEngine) list(limit uint64) (*SelectRes, error) {
|
|||
ln := uint64(0)
|
||||
|
||||
// consider iterating over shuffled shards
|
||||
e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) {
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
res, err := sh.List() // consider limit result of shard iterator
|
||||
if err != nil {
|
||||
// TODO: smth wrong with shard, need to be processed
|
||||
e.log.Warn("could not select objects from shard",
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
e.reportShardError(sh, "could not select objects from shard", err)
|
||||
} else {
|
||||
for _, addr := range res.AddressList() { // save only unique values
|
||||
if _, ok := uniqueMap[addr.String()]; !ok {
|
||||
|
|
|
@ -9,13 +9,12 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
var errShardNotFound = errors.New("shard not found")
|
||||
|
||||
type hashedShard struct {
|
||||
sh *shard.Shard
|
||||
}
|
||||
type hashedShard shardWrapper
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
|
@ -37,10 +36,13 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
|||
|
||||
strID := id.String()
|
||||
|
||||
e.shards[strID] = shard.New(append(opts,
|
||||
shard.WithID(id),
|
||||
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
|
||||
)...)
|
||||
e.shards[strID] = shardWrapper{
|
||||
errorCount: atomic.NewUint32(0),
|
||||
Shard: shard.New(append(opts,
|
||||
shard.WithID(id),
|
||||
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
|
||||
)...),
|
||||
}
|
||||
|
||||
e.shardPools[strID] = pool
|
||||
|
||||
|
@ -75,8 +77,8 @@ func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []hashedShard {
|
|||
weights := make([]float64, 0, len(e.shards))
|
||||
|
||||
for _, sh := range e.shards {
|
||||
shards = append(shards, hashedShard{sh})
|
||||
weights = append(weights, e.shardWeight(sh))
|
||||
shards = append(shards, hashedShard(sh))
|
||||
weights = append(weights, e.shardWeight(sh.Shard))
|
||||
}
|
||||
|
||||
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.String())))
|
||||
|
@ -91,23 +93,23 @@ func (e *StorageEngine) unsortedShards() []hashedShard {
|
|||
shards := make([]hashedShard, 0, len(e.shards))
|
||||
|
||||
for _, sh := range e.shards {
|
||||
shards = append(shards, hashedShard{sh})
|
||||
shards = append(shards, hashedShard(sh))
|
||||
}
|
||||
|
||||
return shards
|
||||
}
|
||||
|
||||
func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(int, *shard.Shard) (stop bool)) {
|
||||
func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(int, hashedShard) (stop bool)) {
|
||||
for i, sh := range e.sortShardsByWeight(addr) {
|
||||
if handler(i, sh.sh) {
|
||||
if handler(i, sh) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *StorageEngine) iterateOverUnsortedShards(handler func(*shard.Shard) (stop bool)) {
|
||||
func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (stop bool)) {
|
||||
for _, sh := range e.unsortedShards() {
|
||||
if handler(sh.sh) {
|
||||
if handler(sh) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -131,6 +133,6 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode) error {
|
|||
|
||||
func (s hashedShard) Hash() uint64 {
|
||||
return hrw.Hash(
|
||||
[]byte(s.sh.ID().String()),
|
||||
[]byte(s.Shard.ID().String()),
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue