[#1806] writecache: Allow to start flush manually
Allow user to initiate flushing objects from a writecache. We need this in 2 cases: 1. During writecache storage schema update, it should be flushed with the old version of node and started clean with a new one. 2. During SSD replacement, to avoid data loss. Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
55148404ee
commit
0b4c867ef1
3 changed files with 206 additions and 0 deletions
|
@ -1,6 +1,7 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
|
@ -25,6 +26,10 @@ const (
|
||||||
defaultFlushInterval = time.Second
|
defaultFlushInterval = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// errMustBeReadOnly is returned when write-cache must be
|
||||||
|
// in read-only mode to perform an operation.
|
||||||
|
var errMustBeReadOnly = errors.New("write-cache must be in read-only mode")
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *cache) runFlushLoop() {
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
|
@ -224,3 +229,68 @@ func (c *cache) flushObject(obj *object.Object) error {
|
||||||
_, err = c.metabase.Put(pPrm)
|
_, err = c.metabase.Put(pPrm)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Flush flushes all objects from the write-cache to the main storage.
|
||||||
|
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
||||||
|
// to prevent interference with background flush workers.
|
||||||
|
func (c *cache) Flush() error {
|
||||||
|
c.modeMtx.RLock()
|
||||||
|
defer c.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if !c.mode.ReadOnly() {
|
||||||
|
return errMustBeReadOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm common.IteratePrm
|
||||||
|
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||||
|
_, ok := c.flushed.Peek(addr.EncodeToString())
|
||||||
|
if ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := f()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj object.Object
|
||||||
|
err = obj.Unmarshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.flushObject(&obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := c.fsTree.Iterate(prm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
var addr oid.Address
|
||||||
|
|
||||||
|
b := tx.Bucket(defaultBucket)
|
||||||
|
cs := b.Cursor()
|
||||||
|
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
||||||
|
sa := string(k)
|
||||||
|
if _, ok := c.flushed.Peek(sa); ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := addr.DecodeString(sa); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var obj object.Object
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.flushObject(&obj); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
135
pkg/local_object_storage/writecache/flush_test.go
Normal file
135
pkg/local_object_storage/writecache/flush_test.go
Normal file
|
@ -0,0 +1,135 @@
|
||||||
|
package writecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test"
|
||||||
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||||
|
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
|
||||||
|
versionSDK "github.com/nspcc-dev/neofs-sdk-go/version"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFlush(t *testing.T) {
|
||||||
|
const (
|
||||||
|
objCount = 4
|
||||||
|
smallSize = 256
|
||||||
|
)
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
mb := meta.New(
|
||||||
|
meta.WithPath(filepath.Join(dir, "meta")),
|
||||||
|
meta.WithEpochState(dummyEpoch{}))
|
||||||
|
require.NoError(t, mb.Open(false))
|
||||||
|
require.NoError(t, mb.Init())
|
||||||
|
|
||||||
|
fsTree := fstree.New(fstree.WithPath(filepath.Join(dir, "blob")))
|
||||||
|
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
|
{Storage: fsTree},
|
||||||
|
}))
|
||||||
|
require.NoError(t, bs.Open(false))
|
||||||
|
require.NoError(t, bs.Init())
|
||||||
|
|
||||||
|
wc := New(
|
||||||
|
WithLogger(zaptest.NewLogger(t)),
|
||||||
|
WithPath(filepath.Join(dir, "writecache")),
|
||||||
|
WithSmallObjectSize(smallSize),
|
||||||
|
WithMetabase(mb),
|
||||||
|
WithBlobstor(bs))
|
||||||
|
require.NoError(t, wc.Open(false))
|
||||||
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
|
// First set mode for metabase and blobstor to prevent background flushes.
|
||||||
|
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||||
|
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||||
|
|
||||||
|
type objectPair struct {
|
||||||
|
addr oid.Address
|
||||||
|
obj *object.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
objects := make([]objectPair, objCount)
|
||||||
|
for i := range objects {
|
||||||
|
obj := object.New()
|
||||||
|
ver := versionSDK.Current()
|
||||||
|
|
||||||
|
obj.SetID(oidtest.ID())
|
||||||
|
obj.SetOwnerID(usertest.ID())
|
||||||
|
obj.SetContainerID(cidtest.ID())
|
||||||
|
obj.SetType(object.TypeRegular)
|
||||||
|
obj.SetVersion(&ver)
|
||||||
|
obj.SetPayloadChecksum(checksumtest.Checksum())
|
||||||
|
obj.SetPayloadHomomorphicHash(checksumtest.Checksum())
|
||||||
|
obj.SetPayload(make([]byte, 1+(i%2)*smallSize))
|
||||||
|
|
||||||
|
addr := objectCore.AddressOf(obj)
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Address = objectCore.AddressOf(obj)
|
||||||
|
prm.Object = obj
|
||||||
|
prm.RawData = data
|
||||||
|
|
||||||
|
_, err = wc.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
objects[i] = objectPair{addr: addr, obj: obj}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("must be read-only", func(t *testing.T) {
|
||||||
|
require.ErrorIs(t, wc.Flush(), errMustBeReadOnly)
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||||
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
|
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||||
|
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||||
|
|
||||||
|
require.NoError(t, wc.Flush())
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
var mPrm meta.GetPrm
|
||||||
|
mPrm.SetAddress(objects[i].addr)
|
||||||
|
_, err := mb.Get(mPrm)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
_, err = bs.Get(common.GetPrm{Address: objects[i].addr})
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 2; i < objCount; i++ {
|
||||||
|
var mPrm meta.StorageIDPrm
|
||||||
|
mPrm.SetAddress(objects[i].addr)
|
||||||
|
|
||||||
|
mRes, err := mb.StorageID(mPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var prm common.GetPrm
|
||||||
|
prm.Address = objects[i].addr
|
||||||
|
prm.StorageID = mRes.StorageID()
|
||||||
|
|
||||||
|
res, err := bs.Get(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objects[i].obj, res.Object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type dummyEpoch struct{}
|
||||||
|
|
||||||
|
func (dummyEpoch) CurrentEpoch() uint64 {
|
||||||
|
return 0
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ type Cache interface {
|
||||||
SetMode(mode.Mode) error
|
SetMode(mode.Mode) error
|
||||||
SetLogger(*zap.Logger)
|
SetLogger(*zap.Logger)
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
|
Flush() error
|
||||||
|
|
||||||
Init() error
|
Init() error
|
||||||
Open(readOnly bool) error
|
Open(readOnly bool) error
|
||||||
|
|
Loading…
Reference in a new issue