forked from TrueCloudLab/frostfs-node
[#1085] shard: allow to dump/restore objects
Dump contains magic and a list of objects prefixed by object size in bytes. We can't use proto-marshaled list because this requires having all dump in memory. Using TAR induces 512 byte overhead for each object which can be a problem in some cases. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
257069a132
commit
7eedb23eb7
3 changed files with 399 additions and 0 deletions
102
pkg/local_object_storage/shard/evacuate.go
Normal file
102
pkg/local_object_storage/shard/evacuate.go
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
package shard
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
)
|
||||||
|
|
||||||
|
var dumpMagic = []byte("NEOF")
|
||||||
|
|
||||||
|
// EvacuatePrm groups the parameters of Evacuate operation.
|
||||||
|
type EvacuatePrm struct {
|
||||||
|
path string
|
||||||
|
stream io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPath is an Evacuate option to set the destination path.
|
||||||
|
func (p *EvacuatePrm) WithPath(path string) *EvacuatePrm {
|
||||||
|
p.path = path
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithStream is an Evacuate option to set the destination stream.
|
||||||
|
// It takes priority over `path` option.
|
||||||
|
func (p *EvacuatePrm) WithStream(r io.Writer) *EvacuatePrm {
|
||||||
|
p.stream = r
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// EvacuateRes groups the result fields of Evacuate operation.
|
||||||
|
type EvacuateRes struct {
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count return amount of object written.
|
||||||
|
func (r *EvacuateRes) Count() int {
|
||||||
|
return r.count
|
||||||
|
}
|
||||||
|
|
||||||
|
var ErrMustBeReadOnly = errors.New("shard must be in read-only mode")
|
||||||
|
|
||||||
|
// Evacuate dumps all objects from the shard to a file or stream.
|
||||||
|
//
|
||||||
|
// Returns any error encountered.
|
||||||
|
func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) {
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode != ModeReadOnly {
|
||||||
|
return nil, ErrMustBeReadOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
w := prm.stream
|
||||||
|
if w == nil {
|
||||||
|
f, err := os.OpenFile(prm.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
w = f
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := w.Write(dumpMagic)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var count int
|
||||||
|
|
||||||
|
if s.hasWriteCache() {
|
||||||
|
// TODO evacuate objects from write cache
|
||||||
|
}
|
||||||
|
|
||||||
|
var pi blobstor.IteratePrm
|
||||||
|
|
||||||
|
pi.SetIterationHandler(func(elem blobstor.IterationElement) error {
|
||||||
|
data := elem.ObjectData()
|
||||||
|
|
||||||
|
var size [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
||||||
|
if _, err := w.Write(size[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if _, err := s.blobStor.Iterate(pi); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &EvacuateRes{count: count}, nil
|
||||||
|
}
|
182
pkg/local_object_storage/shard/evacuate_test.go
Normal file
182
pkg/local_object_storage/shard/evacuate_test.go
Normal file
|
@ -0,0 +1,182 @@
|
||||||
|
package shard_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEvacuate(t *testing.T) {
|
||||||
|
sh := newShard(t, false)
|
||||||
|
defer releaseShard(sh, t)
|
||||||
|
|
||||||
|
out := filepath.Join(t.TempDir(), "dump")
|
||||||
|
prm := new(shard.EvacuatePrm).WithPath(out)
|
||||||
|
|
||||||
|
t.Run("must be read-only", func(t *testing.T) {
|
||||||
|
_, err := sh.Evacuate(prm)
|
||||||
|
require.True(t, errors.Is(err, shard.ErrMustBeReadOnly), "got: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||||
|
outEmpty := out + ".empty"
|
||||||
|
res, err := sh.Evacuate(new(shard.EvacuatePrm).WithPath(outEmpty))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, res.Count())
|
||||||
|
require.NoError(t, sh.SetMode(shard.ModeReadWrite))
|
||||||
|
|
||||||
|
const objCount = 10
|
||||||
|
objects := make([]*object.Object, objCount)
|
||||||
|
for i := 0; i < objCount; i++ {
|
||||||
|
cid := cidtest.ID()
|
||||||
|
obj := generateRawObjectWithCID(t, cid)
|
||||||
|
addAttribute(obj, "foo", strconv.FormatUint(rand.Uint64(), 10))
|
||||||
|
objects[i] = obj.Object()
|
||||||
|
|
||||||
|
prm := new(shard.PutPrm).WithObject(objects[i])
|
||||||
|
_, err := sh.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||||
|
|
||||||
|
t.Run("invalid path", func(t *testing.T) {
|
||||||
|
_, err := sh.Evacuate(new(shard.EvacuatePrm).WithPath("\x00"))
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
res, err = sh.Evacuate(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objCount, res.Count())
|
||||||
|
|
||||||
|
t.Run("restore", func(t *testing.T) {
|
||||||
|
sh := newShard(t, false)
|
||||||
|
defer releaseShard(sh, t)
|
||||||
|
|
||||||
|
t.Run("empty dump", func(t *testing.T) {
|
||||||
|
res, err := sh.Restore(new(shard.RestorePrm).WithPath(outEmpty))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, res.Count())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid path", func(t *testing.T) {
|
||||||
|
_, err := sh.Restore(new(shard.RestorePrm))
|
||||||
|
require.True(t, errors.Is(err, os.ErrNotExist), "got: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("invalid file", func(t *testing.T) {
|
||||||
|
t.Run("invalid magic", func(t *testing.T) {
|
||||||
|
out := out + ".wrongmagic"
|
||||||
|
require.NoError(t, ioutil.WriteFile(out, []byte{0, 0, 0, 0}, os.ModePerm))
|
||||||
|
|
||||||
|
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||||
|
require.True(t, errors.Is(err, shard.ErrInvalidMagic), "got: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
fileData, err := ioutil.ReadFile(out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("incomplete size", func(t *testing.T) {
|
||||||
|
out := out + ".wrongsize"
|
||||||
|
fileData := append(fileData, 1)
|
||||||
|
require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm))
|
||||||
|
|
||||||
|
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||||
|
require.True(t, errors.Is(err, io.ErrUnexpectedEOF), "got: %v", err)
|
||||||
|
})
|
||||||
|
t.Run("incomplete object data", func(t *testing.T) {
|
||||||
|
out := out + ".wrongsize"
|
||||||
|
fileData := append(fileData, 1, 0, 0, 0)
|
||||||
|
require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm))
|
||||||
|
|
||||||
|
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||||
|
require.True(t, errors.Is(err, io.EOF), "got: %v", err)
|
||||||
|
})
|
||||||
|
t.Run("invalid object", func(t *testing.T) {
|
||||||
|
out := out + ".wrongobj"
|
||||||
|
fileData := append(fileData, 1, 0, 0, 0, 0xFF)
|
||||||
|
require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm))
|
||||||
|
|
||||||
|
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
prm := new(shard.RestorePrm).WithPath(out)
|
||||||
|
t.Run("must allow write", func(t *testing.T) {
|
||||||
|
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||||
|
|
||||||
|
_, err := sh.Restore(prm)
|
||||||
|
require.True(t, errors.Is(err, shard.ErrReadOnlyMode), "got: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, sh.SetMode(shard.ModeReadWrite))
|
||||||
|
|
||||||
|
checkRestore(t, sh, prm, objects)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream(t *testing.T) {
|
||||||
|
sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil)
|
||||||
|
defer releaseShard(sh1, t)
|
||||||
|
|
||||||
|
sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil)
|
||||||
|
defer releaseShard(sh2, t)
|
||||||
|
|
||||||
|
const objCount = 5
|
||||||
|
objects := make([]*object.Object, objCount)
|
||||||
|
for i := 0; i < objCount; i++ {
|
||||||
|
cid := cidtest.ID()
|
||||||
|
obj := generateRawObjectWithCID(t, cid)
|
||||||
|
objects[i] = obj.Object()
|
||||||
|
|
||||||
|
prm := new(shard.PutPrm).WithObject(objects[i])
|
||||||
|
_, err := sh1.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, sh1.SetMode(shard.ModeReadOnly))
|
||||||
|
|
||||||
|
r, w := io.Pipe()
|
||||||
|
finish := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
res, err := sh1.Evacuate(new(shard.EvacuatePrm).WithStream(w))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objCount, res.Count())
|
||||||
|
require.NoError(t, w.Close())
|
||||||
|
close(finish)
|
||||||
|
}()
|
||||||
|
|
||||||
|
checkRestore(t, sh2, new(shard.RestorePrm).WithStream(r), objects)
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-finish:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, time.Second, time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkRestore(t *testing.T, sh *shard.Shard, prm *shard.RestorePrm, objects []*object.Object) {
|
||||||
|
res, err := sh.Restore(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, len(objects), res.Count())
|
||||||
|
|
||||||
|
for i := range objects {
|
||||||
|
_, err := sh.Get(new(shard.GetPrm).WithAddress(objects[i].Address()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
115
pkg/local_object_storage/shard/restore.go
Normal file
115
pkg/local_object_storage/shard/restore.go
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
package shard
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrInvalidMagic is returned when dump format is invalid.
|
||||||
|
var ErrInvalidMagic = errors.New("invalid magic")
|
||||||
|
|
||||||
|
// RestorePrm groups the parameters of Restore operation.
|
||||||
|
type RestorePrm struct {
|
||||||
|
path string
|
||||||
|
stream io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPath is a Restore option to set the destination path.
|
||||||
|
func (p *RestorePrm) WithPath(path string) *RestorePrm {
|
||||||
|
p.path = path
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithStream is a Restore option to set the stream to read objects from.
|
||||||
|
// It takes priority over `WithPath` option.
|
||||||
|
func (p *RestorePrm) WithStream(r io.Reader) *RestorePrm {
|
||||||
|
p.stream = r
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// RestoreRes groups the result fields of Restore operation.
|
||||||
|
type RestoreRes struct {
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count return amount of object written.
|
||||||
|
func (r *RestoreRes) Count() int {
|
||||||
|
return r.count
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore restores objects from the dump prepared by Evacuate.
|
||||||
|
//
|
||||||
|
// Returns any error encountered.
|
||||||
|
func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
|
||||||
|
// Disallow changing mode during restore.
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode != ModeReadWrite {
|
||||||
|
return nil, ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
|
||||||
|
r := prm.stream
|
||||||
|
if r == nil {
|
||||||
|
f, err := os.OpenFile(prm.path, os.O_RDONLY, os.ModeExclusive)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
r = f
|
||||||
|
}
|
||||||
|
|
||||||
|
var m [4]byte
|
||||||
|
_, _ = io.ReadFull(r, m[:])
|
||||||
|
if !bytes.Equal(m[:], dumpMagic) {
|
||||||
|
return nil, ErrInvalidMagic
|
||||||
|
}
|
||||||
|
|
||||||
|
var count int
|
||||||
|
var data []byte
|
||||||
|
var size [4]byte
|
||||||
|
for {
|
||||||
|
// If there are less than 4 bytes left, `Read` returns nil error instead of
|
||||||
|
// io.ErrUnexpectedEOF, thus `ReadFull` is used.
|
||||||
|
_, err := io.ReadFull(r, size[:])
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sz := binary.LittleEndian.Uint32(size[:])
|
||||||
|
if uint32(cap(data)) < sz {
|
||||||
|
data = make([]byte, sz)
|
||||||
|
} else {
|
||||||
|
data = data[:sz]
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = r.Read(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := object.New()
|
||||||
|
err = obj.Unmarshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.Put(new(PutPrm).WithObject(obj))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
|
||||||
|
return &RestoreRes{count: count}, nil
|
||||||
|
}
|
Loading…
Reference in a new issue