diff --git a/pkg/local_object_storage/shard/evacuate.go b/pkg/local_object_storage/shard/evacuate.go index 303bbaf10..4dc425a4c 100644 --- a/pkg/local_object_storage/shard/evacuate.go +++ b/pkg/local_object_storage/shard/evacuate.go @@ -72,7 +72,23 @@ func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) { var count int if s.hasWriteCache() { - // TODO evacuate objects from write cache + err := s.writeCache.Iterate(func(data []byte) error { + var size [4]byte + binary.LittleEndian.PutUint32(size[:], uint32(len(data))) + if _, err := w.Write(size[:]); err != nil { + return err + } + + if _, err := w.Write(data); err != nil { + return err + } + + count++ + return nil + }) + if err != nil { + return nil, err + } } var pi blobstor.IteratePrm diff --git a/pkg/local_object_storage/shard/evacuate_test.go b/pkg/local_object_storage/shard/evacuate_test.go index 6dab5e706..7c5c2b20f 100644 --- a/pkg/local_object_storage/shard/evacuate_test.go +++ b/pkg/local_object_storage/shard/evacuate_test.go @@ -4,21 +4,44 @@ import ( "errors" "io" "io/ioutil" - "math/rand" "os" "path/filepath" - "strconv" "testing" "time" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/stretchr/testify/require" ) func TestEvacuate(t *testing.T) { - sh := newShard(t, false) + t.Run("without write-cache", func(t *testing.T) { + testEvacuate(t, 10, false) + }) + t.Run("with write-cache", func(t *testing.T) { + // Put a bit more objects to write-cache to facilitate race-conditions. + testEvacuate(t, 100, true) + }) +} + +func testEvacuate(t *testing.T, objCount int, hasWriteCache bool) { + const ( + wcSmallObjectSize = 1024 // 1 KiB, goes to write-cache memory + wcBigObjectSize = 4 * 1024 // 4 KiB, goes to write-cache FSTree + bsSmallObjectSize = 10 * 1024 // 10 KiB, goes to blobovnicza DB + bsBigObjectSize = 1024*1024 + 1 // > 1 MiB, goes to blobovnicza FSTree + ) + + var sh *shard.Shard + if !hasWriteCache { + sh = newShard(t, false) + } else { + sh = newCustomShard(t, true, + writecache.WithSmallObjectSize(wcSmallObjectSize), + writecache.WithMaxObjectSize(wcBigObjectSize)) + } defer releaseShard(sh, t) out := filepath.Join(t.TempDir(), "dump") @@ -36,12 +59,25 @@ func TestEvacuate(t *testing.T) { require.Equal(t, 0, res.Count()) require.NoError(t, sh.SetMode(shard.ModeReadWrite)) - const objCount = 10 + // Approximate object header size. + const headerSize = 400 + objects := make([]*object.Object, objCount) for i := 0; i < objCount; i++ { cid := cidtest.ID() - obj := generateRawObjectWithCID(t, cid) - addAttribute(obj, "foo", strconv.FormatUint(rand.Uint64(), 10)) + var size int + switch i % 6 { + case 0, 1: + size = wcSmallObjectSize - headerSize + case 2, 3: + size = bsSmallObjectSize - headerSize + case 4: + size = wcBigObjectSize - headerSize + default: + size = bsBigObjectSize - headerSize + } + data := make([]byte, size) + obj := generateRawObjectWithPayload(cid, data) objects[i] = obj.Object() prm := new(shard.PutPrm).WithObject(objects[i]) diff --git a/pkg/local_object_storage/shard/head_test.go b/pkg/local_object_storage/shard/head_test.go index ead199d1c..cf3cfce9c 100644 --- a/pkg/local_object_storage/shard/head_test.go +++ b/pkg/local_object_storage/shard/head_test.go @@ -42,7 +42,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) { res, err := testHead(t, sh, headPrm, hasWriteCache) require.NoError(t, err) - require.Equal(t, obj.Object(), res.Object()) + require.Equal(t, obj.CutPayload().Object(), res.Object()) }) t.Run("virtual object", func(t *testing.T) { @@ -75,7 +75,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) { head, err := sh.Head(headPrm) require.NoError(t, err) - require.Equal(t, parent.Object(), head.Object()) + require.Equal(t, parent.CutPayload().Object(), head.Object()) }) } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index d605fb006..e4e4ae3b3 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -27,6 +27,10 @@ import ( ) func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { + return newCustomShard(t, enableWriteCache, writecache.WithMaxMemSize(0)) +} + +func newCustomShard(t testing.TB, enableWriteCache bool, wcOpts ...writecache.Option) *shard.Shard { rootPath := t.Name() if enableWriteCache { rootPath = path.Join(rootPath, "wc") @@ -46,8 +50,7 @@ func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { ), shard.WithWriteCache(enableWriteCache), shard.WithWriteCacheOptions( - writecache.WithMaxMemSize(0), // disable memory batches - writecache.WithPath(path.Join(rootPath, "wcache")), + append(wcOpts, writecache.WithPath(path.Join(rootPath, "wcache")))..., ), } @@ -69,12 +72,17 @@ func generateRawObject(t *testing.T) *object.RawObject { } func generateRawObjectWithCID(t *testing.T, cid *cid.ID) *object.RawObject { + data := owner.PublicKeyToIDBytes(&test.DecodeKey(-1).PublicKey) + return generateRawObjectWithPayload(cid, data) +} + +func generateRawObjectWithPayload(cid *cid.ID, data []byte) *object.RawObject { version := version.New() version.SetMajor(2) version.SetMinor(1) csum := new(checksum.Checksum) - csum.SetSHA256(sha256.Sum256(owner.PublicKeyToIDBytes(&test.DecodeKey(-1).PublicKey))) + csum.SetSHA256(sha256.Sum256(data)) csumTZ := new(checksum.Checksum) csumTZ.SetTillichZemor(tz.Sum(csum.Sum())) @@ -84,6 +92,7 @@ func generateRawObjectWithCID(t *testing.T, cid *cid.ID) *object.RawObject { obj.SetOwnerID(ownertest.ID()) obj.SetContainerID(cid) obj.SetVersion(version) + obj.SetPayload(data) obj.SetPayloadChecksum(csum) obj.SetPayloadHomomorphicHash(csumTZ) diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index cfa2518cb..3bdf87763 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -11,6 +11,37 @@ import ( // ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. var ErrNoDefaultBucket = errors.New("no default bucket") +// Iterate iterates over all objects present in write cache. +// This is very difficult to do correctly unless write-cache is put in read-only mode. +// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results. +func (c *cache) Iterate(f func([]byte) error) error { + c.modeMtx.RLock() + defer c.modeMtx.RUnlock() + if c.mode != ModeReadOnly { + return nil + } + + err := c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + return b.ForEach(func(k, data []byte) error { + if _, ok := c.flushed.Peek(string(k)); ok { + return nil + } + return f(data) + }) + }) + if err != nil { + return err + } + + return c.fsTree.Iterate(func(addr *object.Address, data []byte) error { + if _, ok := c.flushed.Peek(addr.String()); ok { + return nil + } + return f(data) + }) +} + // IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. // It is assumed that db is an underlying database of some WriteCache instance. // diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index acd6578b2..f3f5c0852 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -20,6 +20,7 @@ type Cache interface { Get(*objectSDK.Address) (*object.Object, error) Head(*objectSDK.Address) (*object.Object, error) Delete(*objectSDK.Address) error + Iterate(func(data []byte) error) error Put(*object.Object) error SetMode(Mode) DumpInfo() Info