From 7eedb23eb7ae4e86cff401b5ff2a7ac03457c6f2 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 18 Jan 2022 14:39:55 +0300 Subject: [PATCH] [#1085] shard: allow to dump/restore objects Dump contains magic and a list of objects prefixed by object size in bytes. We can't use proto-marshaled list because this requires having all dump in memory. Using TAR induces 512 byte overhead for each object which can be a problem in some cases. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/shard/evacuate.go | 102 ++++++++++ .../shard/evacuate_test.go | 182 ++++++++++++++++++ pkg/local_object_storage/shard/restore.go | 115 +++++++++++ 3 files changed, 399 insertions(+) create mode 100644 pkg/local_object_storage/shard/evacuate.go create mode 100644 pkg/local_object_storage/shard/evacuate_test.go create mode 100644 pkg/local_object_storage/shard/restore.go diff --git a/pkg/local_object_storage/shard/evacuate.go b/pkg/local_object_storage/shard/evacuate.go new file mode 100644 index 000000000..303bbaf10 --- /dev/null +++ b/pkg/local_object_storage/shard/evacuate.go @@ -0,0 +1,102 @@ +package shard + +import ( + "encoding/binary" + "errors" + "io" + "os" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" +) + +var dumpMagic = []byte("NEOF") + +// EvacuatePrm groups the parameters of Evacuate operation. +type EvacuatePrm struct { + path string + stream io.Writer +} + +// WithPath is an Evacuate option to set the destination path. +func (p *EvacuatePrm) WithPath(path string) *EvacuatePrm { + p.path = path + return p +} + +// WithStream is an Evacuate option to set the destination stream. +// It takes priority over `path` option. +func (p *EvacuatePrm) WithStream(r io.Writer) *EvacuatePrm { + p.stream = r + return p +} + +// EvacuateRes groups the result fields of Evacuate operation. +type EvacuateRes struct { + count int +} + +// Count return amount of object written. +func (r *EvacuateRes) Count() int { + return r.count +} + +var ErrMustBeReadOnly = errors.New("shard must be in read-only mode") + +// Evacuate dumps all objects from the shard to a file or stream. +// +// Returns any error encountered. +func (s *Shard) Evacuate(prm *EvacuatePrm) (*EvacuateRes, error) { + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode != ModeReadOnly { + return nil, ErrMustBeReadOnly + } + + w := prm.stream + if w == nil { + f, err := os.OpenFile(prm.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) + if err != nil { + return nil, err + } + defer f.Close() + + w = f + } + + _, err := w.Write(dumpMagic) + if err != nil { + return nil, err + } + + var count int + + if s.hasWriteCache() { + // TODO evacuate objects from write cache + } + + var pi blobstor.IteratePrm + + pi.SetIterationHandler(func(elem blobstor.IterationElement) error { + data := elem.ObjectData() + + var size [4]byte + binary.LittleEndian.PutUint32(size[:], uint32(len(data))) + if _, err := w.Write(size[:]); err != nil { + return err + } + + if _, err := w.Write(data); err != nil { + return err + } + + count++ + return nil + }) + + if _, err := s.blobStor.Iterate(pi); err != nil { + return nil, err + } + + return &EvacuateRes{count: count}, nil +} diff --git a/pkg/local_object_storage/shard/evacuate_test.go b/pkg/local_object_storage/shard/evacuate_test.go new file mode 100644 index 000000000..6dab5e706 --- /dev/null +++ b/pkg/local_object_storage/shard/evacuate_test.go @@ -0,0 +1,182 @@ +package shard_test + +import ( + "errors" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" +) + +func TestEvacuate(t *testing.T) { + sh := newShard(t, false) + defer releaseShard(sh, t) + + out := filepath.Join(t.TempDir(), "dump") + prm := new(shard.EvacuatePrm).WithPath(out) + + t.Run("must be read-only", func(t *testing.T) { + _, err := sh.Evacuate(prm) + require.True(t, errors.Is(err, shard.ErrMustBeReadOnly), "got: %v", err) + }) + + require.NoError(t, sh.SetMode(shard.ModeReadOnly)) + outEmpty := out + ".empty" + res, err := sh.Evacuate(new(shard.EvacuatePrm).WithPath(outEmpty)) + require.NoError(t, err) + require.Equal(t, 0, res.Count()) + require.NoError(t, sh.SetMode(shard.ModeReadWrite)) + + const objCount = 10 + objects := make([]*object.Object, objCount) + for i := 0; i < objCount; i++ { + cid := cidtest.ID() + obj := generateRawObjectWithCID(t, cid) + addAttribute(obj, "foo", strconv.FormatUint(rand.Uint64(), 10)) + objects[i] = obj.Object() + + prm := new(shard.PutPrm).WithObject(objects[i]) + _, err := sh.Put(prm) + require.NoError(t, err) + } + + require.NoError(t, sh.SetMode(shard.ModeReadOnly)) + + t.Run("invalid path", func(t *testing.T) { + _, err := sh.Evacuate(new(shard.EvacuatePrm).WithPath("\x00")) + require.Error(t, err) + }) + + res, err = sh.Evacuate(prm) + require.NoError(t, err) + require.Equal(t, objCount, res.Count()) + + t.Run("restore", func(t *testing.T) { + sh := newShard(t, false) + defer releaseShard(sh, t) + + t.Run("empty dump", func(t *testing.T) { + res, err := sh.Restore(new(shard.RestorePrm).WithPath(outEmpty)) + require.NoError(t, err) + require.Equal(t, 0, res.Count()) + }) + + t.Run("invalid path", func(t *testing.T) { + _, err := sh.Restore(new(shard.RestorePrm)) + require.True(t, errors.Is(err, os.ErrNotExist), "got: %v", err) + }) + + t.Run("invalid file", func(t *testing.T) { + t.Run("invalid magic", func(t *testing.T) { + out := out + ".wrongmagic" + require.NoError(t, ioutil.WriteFile(out, []byte{0, 0, 0, 0}, os.ModePerm)) + + _, err := sh.Restore(new(shard.RestorePrm).WithPath(out)) + require.True(t, errors.Is(err, shard.ErrInvalidMagic), "got: %v", err) + }) + + fileData, err := ioutil.ReadFile(out) + require.NoError(t, err) + + t.Run("incomplete size", func(t *testing.T) { + out := out + ".wrongsize" + fileData := append(fileData, 1) + require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm)) + + _, err := sh.Restore(new(shard.RestorePrm).WithPath(out)) + require.True(t, errors.Is(err, io.ErrUnexpectedEOF), "got: %v", err) + }) + t.Run("incomplete object data", func(t *testing.T) { + out := out + ".wrongsize" + fileData := append(fileData, 1, 0, 0, 0) + require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm)) + + _, err := sh.Restore(new(shard.RestorePrm).WithPath(out)) + require.True(t, errors.Is(err, io.EOF), "got: %v", err) + }) + t.Run("invalid object", func(t *testing.T) { + out := out + ".wrongobj" + fileData := append(fileData, 1, 0, 0, 0, 0xFF) + require.NoError(t, ioutil.WriteFile(out, fileData, os.ModePerm)) + + _, err := sh.Restore(new(shard.RestorePrm).WithPath(out)) + require.Error(t, err) + }) + }) + + prm := new(shard.RestorePrm).WithPath(out) + t.Run("must allow write", func(t *testing.T) { + require.NoError(t, sh.SetMode(shard.ModeReadOnly)) + + _, err := sh.Restore(prm) + require.True(t, errors.Is(err, shard.ErrReadOnlyMode), "got: %v", err) + }) + + require.NoError(t, sh.SetMode(shard.ModeReadWrite)) + + checkRestore(t, sh, prm, objects) + }) +} + +func TestStream(t *testing.T) { + sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil) + defer releaseShard(sh1, t) + + sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil) + defer releaseShard(sh2, t) + + const objCount = 5 + objects := make([]*object.Object, objCount) + for i := 0; i < objCount; i++ { + cid := cidtest.ID() + obj := generateRawObjectWithCID(t, cid) + objects[i] = obj.Object() + + prm := new(shard.PutPrm).WithObject(objects[i]) + _, err := sh1.Put(prm) + require.NoError(t, err) + } + + require.NoError(t, sh1.SetMode(shard.ModeReadOnly)) + + r, w := io.Pipe() + finish := make(chan struct{}) + + go func() { + res, err := sh1.Evacuate(new(shard.EvacuatePrm).WithStream(w)) + require.NoError(t, err) + require.Equal(t, objCount, res.Count()) + require.NoError(t, w.Close()) + close(finish) + }() + + checkRestore(t, sh2, new(shard.RestorePrm).WithStream(r), objects) + require.Eventually(t, func() bool { + select { + case <-finish: + return true + default: + return false + } + }, time.Second, time.Millisecond) +} + +func checkRestore(t *testing.T, sh *shard.Shard, prm *shard.RestorePrm, objects []*object.Object) { + res, err := sh.Restore(prm) + require.NoError(t, err) + require.Equal(t, len(objects), res.Count()) + + for i := range objects { + _, err := sh.Get(new(shard.GetPrm).WithAddress(objects[i].Address())) + require.NoError(t, err) + } +} diff --git a/pkg/local_object_storage/shard/restore.go b/pkg/local_object_storage/shard/restore.go new file mode 100644 index 000000000..e3eac3a10 --- /dev/null +++ b/pkg/local_object_storage/shard/restore.go @@ -0,0 +1,115 @@ +package shard + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "os" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +// ErrInvalidMagic is returned when dump format is invalid. +var ErrInvalidMagic = errors.New("invalid magic") + +// RestorePrm groups the parameters of Restore operation. +type RestorePrm struct { + path string + stream io.Reader +} + +// WithPath is a Restore option to set the destination path. +func (p *RestorePrm) WithPath(path string) *RestorePrm { + p.path = path + return p +} + +// WithStream is a Restore option to set the stream to read objects from. +// It takes priority over `WithPath` option. +func (p *RestorePrm) WithStream(r io.Reader) *RestorePrm { + p.stream = r + return p +} + +// RestoreRes groups the result fields of Restore operation. +type RestoreRes struct { + count int +} + +// Count return amount of object written. +func (r *RestoreRes) Count() int { + return r.count +} + +// Restore restores objects from the dump prepared by Evacuate. +// +// Returns any error encountered. +func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) { + // Disallow changing mode during restore. + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode != ModeReadWrite { + return nil, ErrReadOnlyMode + } + + r := prm.stream + if r == nil { + f, err := os.OpenFile(prm.path, os.O_RDONLY, os.ModeExclusive) + if err != nil { + return nil, err + } + defer f.Close() + + r = f + } + + var m [4]byte + _, _ = io.ReadFull(r, m[:]) + if !bytes.Equal(m[:], dumpMagic) { + return nil, ErrInvalidMagic + } + + var count int + var data []byte + var size [4]byte + for { + // If there are less than 4 bytes left, `Read` returns nil error instead of + // io.ErrUnexpectedEOF, thus `ReadFull` is used. + _, err := io.ReadFull(r, size[:]) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, err + } + + sz := binary.LittleEndian.Uint32(size[:]) + if uint32(cap(data)) < sz { + data = make([]byte, sz) + } else { + data = data[:sz] + } + + _, err = r.Read(data) + if err != nil { + return nil, err + } + + obj := object.New() + err = obj.Unmarshal(data) + if err != nil { + return nil, err + } + + _, err = s.Put(new(PutPrm).WithObject(obj)) + if err != nil { + return nil, err + } + + count++ + } + + return &RestoreRes{count: count}, nil +}