[#1186] engine: Read object directly from blobstor in case of conflicts

Metabase is expected to contain actual information about objects stored
in shard. If the object is present in metabase but is missing from
blobstor, peform an additional attempt to fetch it directly without
consulting metabase. Such a situation is unexpected, so error counter
is increased for the shard which has the object in the metabase. We
don't increase error counter for the shard which has the object in
blobstor, because some garbage can be expected there. In this
implementation there is no overhead for objects which are really
missing, i.e. are not present in any metabase.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-02-22 10:20:33 +03:00 committed by Alex Vanin
parent 6e6f3648d2
commit 69e1e6ca20
4 changed files with 151 additions and 48 deletions

View file

@ -7,6 +7,7 @@ import (
"strconv"
"testing"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
@ -15,48 +16,52 @@ import (
"go.uber.org/zap/zaptest"
)
func TestErrorReporting(t *testing.T) {
const smallSize = 100
const errSmallSize = 256
log := zaptest.NewLogger(t)
newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
dir, err := os.MkdirTemp("", "*")
func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
if dir == "" {
var err error
dir, err = os.MkdirTemp("", "*")
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
e := New(
WithLogger(log),
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
var ids [2]*shard.ID
for i := range ids {
ids[i], err = e.AddShard(
shard.WithLogger(log),
shard.WithBlobStorOptions(
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
blobstor.WithShallowDepth(1),
blobstor.WithBlobovniczaShallowWidth(1),
blobstor.WithBlobovniczaShallowDepth(1),
blobstor.WithSmallSizeLimit(100),
blobstor.WithRootPerm(0700)),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700)))
require.NoError(t, err)
}
require.NoError(t, e.Open())
require.NoError(t, e.Init())
return e, dir, ids
}
e := New(
WithLogger(zaptest.NewLogger(t)),
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
var ids [2]*shard.ID
var err error
for i := range ids {
ids[i], err = e.AddShard(
shard.WithLogger(zaptest.NewLogger(t)),
shard.WithBlobStorOptions(
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
blobstor.WithShallowDepth(1),
blobstor.WithBlobovniczaShallowWidth(1),
blobstor.WithBlobovniczaShallowDepth(1),
blobstor.WithSmallSizeLimit(errSmallSize),
blobstor.WithRootPerm(0700)),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700)))
require.NoError(t, err)
}
require.NoError(t, e.Open())
require.NoError(t, e.Init())
return e, dir, ids
}
func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngine(t, 0)
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, smallSize))
obj.SetPayload(make([]byte, errSmallSize))
prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock()
@ -82,10 +87,10 @@ func TestErrorReporting(t *testing.T) {
t.Run("with error threshold", func(t *testing.T) {
const errThreshold = 3
e, dir, id := newEngine(t, errThreshold)
e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold)
obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, smallSize))
obj.SetPayload(make([]byte, errSmallSize))
prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock()
@ -123,6 +128,54 @@ func TestErrorReporting(t *testing.T) {
})
}
// Issue #1186.
func TestBlobstorFailback(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
objs := make([]*object.Object, 0, 2)
for _, size := range []int{1, errSmallSize + 1} {
obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, size))
prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock()
_, err = e.shards[id[0].String()].Shard.Put(prm)
e.mtx.RUnlock()
require.NoError(t, err)
objs = append(objs, obj.Object())
}
for i := range objs {
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
require.NoError(t, err)
}
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
require.NoError(t, e.Close())
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.RootPath
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.RootPath
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
for i := range objs {
actual, err := e.Get(&GetPrm{addr: objs[i].Address()})
require.NoError(t, err)
require.Equal(t, objs[i], actual.Object())
}
checkShardState(t, e, id[0], 2, shard.ModeReadOnly)
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
}
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) {
e.mtx.RLock()
sh := e.shards[id.String()]