[#1085] shard: dump data from write-cache

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
remotes/fyrchik/fix-config-wc
Evgenii Stratonikov 2022-01-18 16:40:11 +03:00 committed by LeL
parent ad01aaf8bf
commit c0f6c988f0
6 changed files with 105 additions and 12 deletions

View File

@ -72,7 +72,23 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) {
var count int
if s.hasWriteCache() {
// TODO evacuate objects from write cache
err := s.writeCache.Iterate(func(data []byte) error {
var size [4]byte
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
if _, err := w.Write(size[:]); err != nil {
return err
}
if _, err := w.Write(data); err != nil {
return err
}
count++
return nil
})
if err != nil {
return nil, err
}
}
var pi blobstor.IteratePrm

View File

@ -4,21 +4,44 @@ import (
"errors"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestEvacuate(t *testing.T) {
sh := newShard(t, false)
t.Run("without write-cache", func(t *testing.T) {
testEvacuate(t, 10, false)
})
t.Run("with write-cache", func(t *testing.T) {
// Put a bit more objects to write-cache to facilitate race-conditions.
testEvacuate(t, 100, true)
})
}
func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) {
const (
wcSmallObjectSize = 1024 // 1 KiB, goes to write-cache memory
wcBigObjectSize = 4 * 1024 // 4 KiB, goes to write-cache FSTree
bsSmallObjectSize = 10 * 1024 // 10 KiB, goes to blobovnicza DB
bsBigObjectSize = 1024*1024 + 1 // > 1 MiB, goes to blobovnicza FSTree
)
var sh *shard.Shard
if !hasWriteCache {
sh = newShard(t, false)
} else {
sh = newCustomShard(t, true,
writecache.WithSmallObjectSize(wcSmallObjectSize),
writecache.WithMaxObjectSize(wcBigObjectSize))
}
defer releaseShard(sh, t)
out := filepath.Join(t.TempDir(), "dump")
@ -36,12 +59,25 @@ func TestEvacuate(t *testing.T) {
require.Equal(t, 0, res.Count())
require.NoError(t, sh.SetMode(shard.ModeReadWrite))
const objCount = 10
// Approximate object header size.
const headerSize = 400
objects := make([]*object.Object, objCount)
for i := 0; i < objCount; i++ {
cid := cidtest.ID()
obj := generateRawObjectWithCID(t, cid)
addAttribute(obj, "foo", strconv.FormatUint(rand.Uint64(), 10))
var size int
switch i % 6 {
case 0, 1:
size = wcSmallObjectSize - headerSize
case 2, 3:
size = bsSmallObjectSize - headerSize
case 4:
size = wcBigObjectSize - headerSize
default:
size = bsBigObjectSize - headerSize
}
data := make([]byte, size)
obj := generateRawObjectWithPayload(cid, data)
objects[i] = obj.Object()
prm := new(shard.PutPrm).WithObject(objects[i])

View File

@ -42,7 +42,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
res, err := testHead(t, sh, headPrm, hasWriteCache)
require.NoError(t, err)
require.Equal(t, obj.Object(), res.Object())
require.Equal(t, obj.CutPayload().Object(), res.Object())
})
t.Run("virtual object", func(t *testing.T) {
@ -75,7 +75,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
head, err := sh.Head(headPrm)
require.NoError(t, err)
require.Equal(t, parent.Object(), head.Object())
require.Equal(t, parent.CutPayload().Object(), head.Object())
})
}

View File

@ -27,6 +27,10 @@ import (
)
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
return newCustomShard(t, enableWriteCache, writecache.WithMaxMemSize(0))
}
func newCustomShard(t testing.TB, enableWriteCache bool, wcOpts ...writecache.Option) *shard.Shard {
rootPath := t.Name()
if enableWriteCache {
rootPath = path.Join(rootPath, "wc")
@ -46,8 +50,7 @@ func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
),
shard.WithWriteCache(enableWriteCache),
shard.WithWriteCacheOptions(
writecache.WithMaxMemSize(0), // disable memory batches
writecache.WithPath(path.Join(rootPath, "wcache")),
append(wcOpts, writecache.WithPath(path.Join(rootPath, "wcache")))...,
),
}
@ -69,12 +72,17 @@ func generateRawObject(t *testing.T) *object.RawObject {
}
func generateRawObjectWithCID(t *testing.T, cid *cid.ID) *object.RawObject {
data := owner.PublicKeyToIDBytes(&test.DecodeKey(-1).PublicKey)
return generateRawObjectWithPayload(cid, data)
}
func generateRawObjectWithPayload(cid *cid.ID, data []byte) *object.RawObject {
version := version.New()
version.SetMajor(2)
version.SetMinor(1)
csum := new(checksum.Checksum)
csum.SetSHA256(sha256.Sum256(owner.PublicKeyToIDBytes(&test.DecodeKey(-1).PublicKey)))
csum.SetSHA256(sha256.Sum256(data))
csumTZ := new(checksum.Checksum)
csumTZ.SetTillichZemor(tz.Sum(csum.Sum()))
@ -84,6 +92,7 @@ func generateRawObjectWithCID(t *testing.T, cid *cid.ID) *object.RawObject {
obj.SetOwnerID(ownertest.ID())
obj.SetContainerID(cid)
obj.SetVersion(version)
obj.SetPayload(data)
obj.SetPayloadChecksum(csum)
obj.SetPayloadHomomorphicHash(csumTZ)

View File

@ -11,6 +11,37 @@ import (
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
var ErrNoDefaultBucket = errors.New("no default bucket")
// Iterate iterates over all objects present in write cache.
// This is very difficult to do correctly unless write-cache is put in read-only mode.
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
func (c *cache) Iterate(f func([]byte) error) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.mode != ModeReadOnly {
return nil
}
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
if _, ok := c.flushed.Peek(string(k)); ok {
return nil
}
return f(data)
})
})
if err != nil {
return err
}
return c.fsTree.Iterate(func(addr *object.Address, data []byte) error {
if _, ok := c.flushed.Peek(addr.String()); ok {
return nil
}
return f(data)
})
}
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
// It is assumed that db is an underlying database of some WriteCache instance.
//

View File

@ -20,6 +20,7 @@ type Cache interface {
Get(*objectSDK.Address) (*object.Object, error)
Head(*objectSDK.Address) (*object.Object, error)
Delete(*objectSDK.Address) error
Iterate(func(data []byte) error) error
Put(*object.Object) error
SetMode(Mode)
DumpInfo() Info