[#1806] writecache: Allow to ignore read errors during flush

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-09-26 11:54:21 +03:00 committed by fyrchik
parent f2045c10d7
commit 0a411908ee
5 changed files with 204 additions and 84 deletions

View file

@ -6,7 +6,8 @@ import (
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation. // FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
shardID *shard.ID shardID *shard.ID
ignoreErrors bool
} }
// SetShardID is an option to set shard ID. // SetShardID is an option to set shard ID.
@ -16,6 +17,11 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
p.shardID = id p.shardID = id
} }
// SetIgnoreErrors sets errors ignore flag..
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
type FlushWriteCacheRes struct{} type FlushWriteCacheRes struct{}
@ -29,5 +35,8 @@ func (e *StorageEngine) FlushWriteCache(p FlushWriteCachePrm) (FlushWriteCacheRe
return FlushWriteCacheRes{}, errShardNotFound return FlushWriteCacheRes{}, errShardNotFound
} }
return FlushWriteCacheRes{}, sh.FlushWriteCache() var prm shard.FlushWriteCachePrm
prm.SetIgnoreErrors(p.ignoreErrors)
return FlushWriteCacheRes{}, sh.FlushWriteCache(prm)
} }

View file

@ -6,13 +6,23 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
) )
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct {
ignoreErrors bool
}
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// errWriteCacheDisabled is returned when an operation on write-cache is performed, // errWriteCacheDisabled is returned when an operation on write-cache is performed,
// but write-cache is disabled. // but write-cache is disabled.
var errWriteCacheDisabled = errors.New("write-cache is disabled") var errWriteCacheDisabled = errors.New("write-cache is disabled")
// FlushWriteCache moves writecache in read-only mode and flushes all data from it. // FlushWriteCache moves writecache in read-only mode and flushes all data from it.
// After the operation writecache will remain read-only mode. // After the operation writecache will remain read-only mode.
func (s *Shard) FlushWriteCache() error { func (s *Shard) FlushWriteCache(p FlushWriteCachePrm) error {
if !s.hasWriteCache() { if !s.hasWriteCache() {
return errWriteCacheDisabled return errWriteCacheDisabled
} }
@ -32,5 +42,5 @@ func (s *Shard) FlushWriteCache() error {
return err return err
} }
return s.writeCache.Flush() return s.writeCache.Flush(p.ignoreErrors)
} }

View file

@ -233,7 +233,7 @@ func (c *cache) flushObject(obj *object.Object) error {
// Flush flushes all objects from the write-cache to the main storage. // Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and // Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers. // to prevent interference with background flush workers.
func (c *cache) Flush() error { func (c *cache) Flush(ignoreErrors bool) error {
c.modeMtx.RLock() c.modeMtx.RLock()
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
@ -242,6 +242,7 @@ func (c *cache) Flush() error {
} }
var prm common.IteratePrm var prm common.IteratePrm
prm.IgnoreErrors = ignoreErrors
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
_, ok := c.flushed.Peek(addr.EncodeToString()) _, ok := c.flushed.Peek(addr.EncodeToString())
if ok { if ok {
@ -250,12 +251,18 @@ func (c *cache) Flush() error {
data, err := f() data, err := f()
if err != nil { if err != nil {
if ignoreErrors {
return nil
}
return err return err
} }
var obj object.Object var obj object.Object
err = obj.Unmarshal(data) err = obj.Unmarshal(data)
if err != nil { if err != nil {
if ignoreErrors {
return nil
}
return err return err
} }
@ -279,11 +286,17 @@ func (c *cache) Flush() error {
} }
if err := addr.DecodeString(sa); err != nil { if err := addr.DecodeString(sa); err != nil {
if ignoreErrors {
continue
}
return err return err
} }
var obj object.Object var obj object.Object
if err := obj.Unmarshal(data); err != nil { if err := obj.Unmarshal(data); err != nil {
if ignoreErrors {
continue
}
return err return err
} }

View file

@ -1,6 +1,7 @@
package writecache package writecache
import ( import (
"os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -18,6 +19,7 @@ import (
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" versionSDK "github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
@ -27,105 +29,191 @@ func TestFlush(t *testing.T) {
smallSize = 256 smallSize = 256
) )
dir := t.TempDir()
mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")),
meta.WithEpochState(dummyEpoch{}))
require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init())
fsTree := fstree.New(fstree.WithPath(filepath.Join(dir, "blob")))
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
{Storage: fsTree},
}))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
wc := New(
WithLogger(zaptest.NewLogger(t)),
WithPath(filepath.Join(dir, "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb),
WithBlobstor(bs))
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
// First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
type objectPair struct { type objectPair struct {
addr oid.Address addr oid.Address
obj *object.Object obj *object.Object
} }
objects := make([]objectPair, objCount) newCache := func(t *testing.T) (Cache, *blobstor.BlobStor, *meta.DB) {
for i := range objects { dir := t.TempDir()
obj := object.New() mb := meta.New(
ver := versionSDK.Current() meta.WithPath(filepath.Join(dir, "meta")),
meta.WithEpochState(dummyEpoch{}))
require.NoError(t, mb.Open(false))
require.NoError(t, mb.Init())
obj.SetID(oidtest.ID()) fsTree := fstree.New(
obj.SetOwnerID(usertest.ID()) fstree.WithPath(filepath.Join(dir, "blob")),
obj.SetContainerID(cidtest.ID()) fstree.WithDepth(0),
obj.SetType(object.TypeRegular) fstree.WithDirNameLen(1))
obj.SetVersion(&ver) bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
obj.SetPayloadChecksum(checksumtest.Checksum()) {Storage: fsTree},
obj.SetPayloadHomomorphicHash(checksumtest.Checksum()) }))
obj.SetPayload(make([]byte, 1+(i%2)*smallSize)) require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
addr := objectCore.AddressOf(obj) wc := New(
data, err := obj.Marshal() WithLogger(zaptest.NewLogger(t)),
require.NoError(t, err) WithPath(filepath.Join(dir, "writecache")),
WithSmallObjectSize(smallSize),
WithMetabase(mb),
WithBlobstor(bs))
require.NoError(t, wc.Open(false))
require.NoError(t, wc.Init())
var prm common.PutPrm // First set mode for metabase and blobstor to prevent background flushes.
prm.Address = objectCore.AddressOf(obj) require.NoError(t, mb.SetMode(mode.ReadOnly))
prm.Object = obj require.NoError(t, bs.SetMode(mode.ReadOnly))
prm.RawData = data
_, err = wc.Put(prm) return wc, bs, mb
require.NoError(t, err)
objects[i] = objectPair{addr: addr, obj: obj}
} }
t.Run("must be read-only", func(t *testing.T) { putObjects := func(t *testing.T, c Cache) []objectPair {
require.ErrorIs(t, wc.Flush(), errMustBeReadOnly) objects := make([]objectPair, objCount)
for i := range objects {
obj, data := newObject(t, 1+(i%2)*smallSize)
addr := objectCore.AddressOf(obj)
var prm common.PutPrm
prm.Address = objectCore.AddressOf(obj)
prm.Object = obj
prm.RawData = data
_, err := c.Put(prm)
require.NoError(t, err)
objects[i] = objectPair{addr: addr, obj: obj}
}
return objects
}
check := func(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
for i := range objects {
var mPrm meta.StorageIDPrm
mPrm.SetAddress(objects[i].addr)
mRes, err := mb.StorageID(mPrm)
require.NoError(t, err)
var prm common.GetPrm
prm.Address = objects[i].addr
prm.StorageID = mRes.StorageID()
res, err := bs.Get(prm)
require.NoError(t, err)
require.Equal(t, objects[i].obj, res.Object)
}
}
t.Run("no errors", func(t *testing.T) {
wc, bs, mb := newCache(t)
objects := putObjects(t, wc)
t.Run("must be read-only", func(t *testing.T) {
require.ErrorIs(t, wc.Flush(false), errMustBeReadOnly)
})
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
require.NoError(t, wc.Flush(false))
for i := 0; i < 2; i++ {
var mPrm meta.GetPrm
mPrm.SetAddress(objects[i].addr)
_, err := mb.Get(mPrm)
require.Error(t, err)
_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
require.Error(t, err)
}
check(t, mb, bs, objects[2:])
}) })
require.NoError(t, wc.SetMode(mode.ReadOnly)) t.Run("ignore errors", func(t *testing.T) {
require.NoError(t, bs.SetMode(mode.ReadWrite)) testIgnoreErrors := func(t *testing.T, f func(*cache)) {
require.NoError(t, mb.SetMode(mode.ReadWrite)) wc, bs, mb := newCache(t)
objects := putObjects(t, wc)
f(wc.(*cache))
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true) require.NoError(t, wc.SetMode(mode.ReadOnly))
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.Flush()) require.Error(t, wc.Flush(false))
require.NoError(t, wc.Flush(true))
for i := 0; i < 2; i++ { check(t, mb, bs, objects)
var mPrm meta.GetPrm }
mPrm.SetAddress(objects[i].addr) t.Run("db, invalid address", func(t *testing.T) {
_, err := mb.Get(mPrm) testIgnoreErrors(t, func(c *cache) {
require.Error(t, err) _, data := newObject(t, 1)
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte{1, 2, 3}, data)
}))
})
})
t.Run("db, invalid object", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) {
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
}))
})
})
t.Run("fs, read error", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) {
obj, data := newObject(t, 1)
_, err = bs.Get(common.GetPrm{Address: objects[i].addr}) var prm common.PutPrm
require.Error(t, err) prm.Address = objectCore.AddressOf(obj)
} prm.RawData = data
for i := 2; i < objCount; i++ { _, err := c.fsTree.Put(prm)
var mPrm meta.StorageIDPrm require.NoError(t, err)
mPrm.SetAddress(objects[i].addr)
mRes, err := mb.StorageID(mPrm) p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString()
require.NoError(t, err) p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:])
var prm common.GetPrm _, err = os.Stat(p) // sanity check
prm.Address = objects[i].addr require.NoError(t, err)
prm.StorageID = mRes.StorageID() require.NoError(t, os.Chmod(p, 0))
})
})
t.Run("fs, invalid object", func(t *testing.T) {
testIgnoreErrors(t, func(c *cache) {
var prm common.PutPrm
prm.Address = oidtest.Address()
prm.RawData = []byte{1, 2, 3}
_, err := c.fsTree.Put(prm)
require.NoError(t, err)
})
})
})
}
res, err := bs.Get(prm) func newObject(t *testing.T, size int) (*object.Object, []byte) {
require.NoError(t, err) obj := object.New()
require.Equal(t, objects[i].obj, res.Object) ver := versionSDK.Current()
}
obj.SetID(oidtest.ID())
obj.SetOwnerID(usertest.ID())
obj.SetContainerID(cidtest.ID())
obj.SetType(object.TypeRegular)
obj.SetVersion(&ver)
obj.SetPayloadChecksum(checksumtest.Checksum())
obj.SetPayloadHomomorphicHash(checksumtest.Checksum())
obj.SetPayload(make([]byte, size))
data, err := obj.Marshal()
require.NoError(t, err)
return obj, data
} }
type dummyEpoch struct{} type dummyEpoch struct{}

View file

@ -28,7 +28,7 @@ type Cache interface {
SetMode(mode.Mode) error SetMode(mode.Mode) error
SetLogger(*zap.Logger) SetLogger(*zap.Logger)
DumpInfo() Info DumpInfo() Info
Flush() error Flush(bool) error
Init() error Init() error
Open(readOnly bool) error Open(readOnly bool) error