diff --git a/cmd/frostfs-cli/modules/control/evacuate_shard.go b/cmd/frostfs-cli/modules/control/evacuate_shard.go index 02ee88ce..b72ff630 100644 --- a/cmd/frostfs-cli/modules/control/evacuate_shard.go +++ b/cmd/frostfs-cli/modules/control/evacuate_shard.go @@ -8,6 +8,8 @@ import ( "github.com/spf13/cobra" ) +const ignoreErrorsFlag = "no-errors" + var evacuateShardCmd = &cobra.Command{ Use: "evacuate", Short: "Evacuate objects from shard", @@ -20,7 +22,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) { req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)} req.Body.Shard_ID = getShardIDList(cmd) - req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(dumpIgnoreErrorsFlag) + req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag) signRequest(cmd, pk, req) @@ -47,7 +49,7 @@ func initControlEvacuateShardCmd() { flags := evacuateShardCmd.Flags() flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") flags.Bool(shardAllFlag, false, "Process all shards") - flags.Bool(dumpIgnoreErrorsFlag, false, "Skip invalid/unreadable objects") + flags.Bool(ignoreErrorsFlag, false, "Skip invalid/unreadable objects") evacuateShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index 9d3eb5c0..8e7ecff8 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -13,16 +13,12 @@ var shardsCmd = &cobra.Command{ func initControlShardsCmd() { shardsCmd.AddCommand(listShardsCmd) shardsCmd.AddCommand(setShardModeCmd) - shardsCmd.AddCommand(dumpShardCmd) - shardsCmd.AddCommand(restoreShardCmd) shardsCmd.AddCommand(evacuateShardCmd) shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(doctorCmd) initControlShardsListCmd() initControlSetShardModeCmd() - initControlDumpShardCmd() - initControlRestoreShardCmd() initControlEvacuateShardCmd() initControlFlushCacheCmd() initControlDoctorCmd() diff --git a/cmd/frostfs-cli/modules/control/shards_dump.go b/cmd/frostfs-cli/modules/control/shards_dump.go deleted file mode 100644 index c0d0aca9..00000000 --- a/cmd/frostfs-cli/modules/control/shards_dump.go +++ /dev/null @@ -1,66 +0,0 @@ -package control - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" - commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - "github.com/spf13/cobra" -) - -const ( - dumpFilepathFlag = "path" - dumpIgnoreErrorsFlag = "no-errors" -) - -var dumpShardCmd = &cobra.Command{ - Use: "dump", - Short: "Dump objects from shard", - Long: "Dump objects from shard to a file", - Run: dumpShard, -} - -func dumpShard(cmd *cobra.Command, _ []string) { - pk := key.Get(cmd) - - body := new(control.DumpShardRequest_Body) - body.SetShardID(getShardID(cmd)) - - p, _ := cmd.Flags().GetString(dumpFilepathFlag) - body.SetFilepath(p) - - ignore, _ := cmd.Flags().GetBool(dumpIgnoreErrorsFlag) - body.SetIgnoreErrors(ignore) - - req := new(control.DumpShardRequest) - req.SetBody(body) - - signRequest(cmd, pk, req) - - cli := getClient(cmd, pk) - - var resp *control.DumpShardResponse - var err error - err = cli.ExecRaw(func(client *client.Client) error { - resp, err = control.DumpShard(client, req) - return err - }) - commonCmd.ExitOnErr(cmd, "rpc error: %w", err) - - verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) - - cmd.Println("Shard has been dumped successfully.") -} - -func initControlDumpShardCmd() { - initControlFlags(dumpShardCmd) - - flags := dumpShardCmd.Flags() - flags.String(shardIDFlag, "", "Shard ID in base58 encoding") - flags.String(dumpFilepathFlag, "", "File to write objects to") - flags.Bool(dumpIgnoreErrorsFlag, false, "Skip invalid/unreadable objects") - - _ = dumpShardCmd.MarkFlagRequired(shardIDFlag) - _ = dumpShardCmd.MarkFlagRequired(dumpFilepathFlag) - _ = dumpShardCmd.MarkFlagRequired(controlRPC) -} diff --git a/cmd/frostfs-cli/modules/control/shards_restore.go b/cmd/frostfs-cli/modules/control/shards_restore.go deleted file mode 100644 index edf97a73..00000000 --- a/cmd/frostfs-cli/modules/control/shards_restore.go +++ /dev/null @@ -1,66 +0,0 @@ -package control - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" - commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - "github.com/spf13/cobra" -) - -const ( - restoreFilepathFlag = "path" - restoreIgnoreErrorsFlag = "no-errors" -) - -var restoreShardCmd = &cobra.Command{ - Use: "restore", - Short: "Restore objects from shard", - Long: "Restore objects from shard to a file", - Run: restoreShard, -} - -func restoreShard(cmd *cobra.Command, _ []string) { - pk := key.Get(cmd) - - body := new(control.RestoreShardRequest_Body) - body.SetShardID(getShardID(cmd)) - - p, _ := cmd.Flags().GetString(restoreFilepathFlag) - body.SetFilepath(p) - - ignore, _ := cmd.Flags().GetBool(restoreIgnoreErrorsFlag) - body.SetIgnoreErrors(ignore) - - req := new(control.RestoreShardRequest) - req.SetBody(body) - - signRequest(cmd, pk, req) - - cli := getClient(cmd, pk) - - var resp *control.RestoreShardResponse - var err error - err = cli.ExecRaw(func(client *client.Client) error { - resp, err = control.RestoreShard(client, req) - return err - }) - commonCmd.ExitOnErr(cmd, "rpc error: %w", err) - - verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) - - cmd.Println("Shard has been restored successfully.") -} - -func initControlRestoreShardCmd() { - initControlFlags(restoreShardCmd) - - flags := restoreShardCmd.Flags() - flags.String(shardIDFlag, "", "Shard ID in base58 encoding") - flags.String(restoreFilepathFlag, "", "File to read objects from") - flags.Bool(restoreIgnoreErrorsFlag, false, "Skip invalid/unreadable objects") - - _ = restoreShardCmd.MarkFlagRequired(shardIDFlag) - _ = restoreShardCmd.MarkFlagRequired(restoreFilepathFlag) - _ = restoreShardCmd.MarkFlagRequired(controlRPC) -} diff --git a/pkg/local_object_storage/engine/dump.go b/pkg/local_object_storage/engine/dump.go deleted file mode 100644 index f5cf8c32..00000000 --- a/pkg/local_object_storage/engine/dump.go +++ /dev/null @@ -1,19 +0,0 @@ -package engine - -import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - -// DumpShard dumps objects from the shard with provided identifier. -// -// Returns an error if shard is not read-only. -func (e *StorageEngine) DumpShard(id *shard.ID, prm shard.DumpPrm) error { - e.mtx.RLock() - defer e.mtx.RUnlock() - - sh, ok := e.shards[id.String()] - if !ok { - return errShardNotFound - } - - _, err := sh.Dump(prm) - return err -} diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 2ec2c2b3..e212784a 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -9,6 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -16,6 +17,8 @@ import ( "go.uber.org/zap" ) +var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode") + // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { shardID []*shard.ID @@ -135,7 +138,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) } if !sh.GetMode().ReadOnly() { - return nil, nil, shard.ErrMustBeReadOnly + return nil, nil, ErrMustBeReadOnly } } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 291bc2b7..fc9da5e3 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -103,7 +103,7 @@ func TestEvacuateShard(t *testing.T) { t.Run("must be read-only", func(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) - require.ErrorIs(t, err, shard.ErrMustBeReadOnly) + require.ErrorIs(t, err, ErrMustBeReadOnly) require.Equal(t, 0, res.Count()) }) diff --git a/pkg/local_object_storage/engine/restore.go b/pkg/local_object_storage/engine/restore.go deleted file mode 100644 index 7cc2eaf6..00000000 --- a/pkg/local_object_storage/engine/restore.go +++ /dev/null @@ -1,32 +0,0 @@ -package engine - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// RestoreShard restores objects from dump to the shard with provided identifier. -// -// Returns an error if shard is not read-only. -func (e *StorageEngine) RestoreShard(ctx context.Context, id *shard.ID, prm shard.RestorePrm) error { - ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.RestoreShard", - trace.WithAttributes( - attribute.String("shard_id", id.String()), - )) - defer span.End() - - e.mtx.RLock() - defer e.mtx.RUnlock() - - sh, ok := e.shards[id.String()] - if !ok { - return errShardNotFound - } - - _, err := sh.Restore(ctx, prm) - return err -} diff --git a/pkg/local_object_storage/shard/dump.go b/pkg/local_object_storage/shard/dump.go deleted file mode 100644 index 8d9fe0f7..00000000 --- a/pkg/local_object_storage/shard/dump.go +++ /dev/null @@ -1,129 +0,0 @@ -package shard - -import ( - "encoding/binary" - "io" - "os" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" -) - -var dumpMagic = []byte("NEOF") - -// DumpPrm groups the parameters of Dump operation. -type DumpPrm struct { - path string - stream io.Writer - ignoreErrors bool -} - -// WithPath is an Dump option to set the destination path. -func (p *DumpPrm) WithPath(path string) { - p.path = path -} - -// WithStream is an Dump option to set the destination stream. -// It takes priority over `path` option. -func (p *DumpPrm) WithStream(r io.Writer) { - p.stream = r -} - -// WithIgnoreErrors is an Dump option to allow ignore all errors during iteration. -// This includes invalid blobovniczas as well as corrupted objects. -func (p *DumpPrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// DumpRes groups the result fields of Dump operation. -type DumpRes struct { - count int -} - -// Count return amount of object written. -func (r DumpRes) Count() int { - return r.count -} - -var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode") - -// Dump dumps all objects from the shard to a file or stream. -// -// Returns any error encountered. -func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) { - s.m.RLock() - defer s.m.RUnlock() - - if !s.info.Mode.ReadOnly() { - return DumpRes{}, ErrMustBeReadOnly - } - - w := prm.stream - if w == nil { - f, err := os.OpenFile(prm.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) - if err != nil { - return DumpRes{}, err - } - defer f.Close() - - w = f - } - - _, err := w.Write(dumpMagic) - if err != nil { - return DumpRes{}, err - } - - var count int - - if s.hasWriteCache() { - var iterPrm writecache.IterationPrm - - iterPrm.WithIgnoreErrors(prm.ignoreErrors) - iterPrm.WithHandler(func(data []byte) error { - var size [4]byte - binary.LittleEndian.PutUint32(size[:], uint32(len(data))) - if _, err := w.Write(size[:]); err != nil { - return err - } - - if _, err := w.Write(data); err != nil { - return err - } - - count++ - return nil - }) - - err := s.writeCache.Iterate(iterPrm) - if err != nil { - return DumpRes{}, err - } - } - - var pi common.IteratePrm - pi.IgnoreErrors = prm.ignoreErrors - pi.Handler = func(elem common.IterationElement) error { - data := elem.ObjectData - - var size [4]byte - binary.LittleEndian.PutUint32(size[:], uint32(len(data))) - if _, err := w.Write(size[:]); err != nil { - return err - } - - if _, err := w.Write(data); err != nil { - return err - } - - count++ - return nil - } - - if _, err := s.blobStor.Iterate(pi); err != nil { - return DumpRes{}, err - } - - return DumpRes{count: count}, nil -} diff --git a/pkg/local_object_storage/shard/dump_test.go b/pkg/local_object_storage/shard/dump_test.go deleted file mode 100644 index 92171720..00000000 --- a/pkg/local_object_storage/shard/dump_test.go +++ /dev/null @@ -1,412 +0,0 @@ -package shard_test - -import ( - "bytes" - "context" - "io" - "math/rand" - "os" - "path/filepath" - "testing" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" - "github.com/klauspost/compress/zstd" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestDump(t *testing.T) { - t.Run("without write-cache", func(t *testing.T) { - testDump(t, 10, false) - }) - t.Run("with write-cache", func(t *testing.T) { - // Put a bit more objects to write-cache to facilitate race-conditions. - testDump(t, 100, true) - }) -} - -func testDump(t *testing.T, objCount int, hasWriteCache bool) { - const ( - wcSmallObjectSize = 1024 // 1 KiB, goes to write-cache memory - wcBigObjectSize = 4 * 1024 // 4 KiB, goes to write-cache FSTree - bsSmallObjectSize = 10 * 1024 // 10 KiB, goes to blobovnicza DB - bsBigObjectSize = 1024*1024 + 1 // > 1 MiB, goes to blobovnicza FSTree - ) - - var sh *shard.Shard - if !hasWriteCache { - sh = newShard(t, false) - } else { - sh = newCustomShard(t, t.TempDir(), true, - []writecache.Option{ - writecache.WithSmallObjectSize(wcSmallObjectSize), - writecache.WithMaxObjectSize(wcBigObjectSize), - writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), - }, - nil) - } - defer releaseShard(sh, t) - - out := filepath.Join(t.TempDir(), "dump") - var prm shard.DumpPrm - prm.WithPath(out) - - t.Run("must be read-only", func(t *testing.T) { - _, err := sh.Dump(prm) - require.ErrorIs(t, err, shard.ErrMustBeReadOnly) - }) - - require.NoError(t, sh.SetMode(mode.ReadOnly)) - outEmpty := out + ".empty" - var dumpPrm shard.DumpPrm - dumpPrm.WithPath(outEmpty) - - res, err := sh.Dump(dumpPrm) - require.NoError(t, err) - require.Equal(t, 0, res.Count()) - require.NoError(t, sh.SetMode(mode.ReadWrite)) - - // Approximate object header size. - const headerSize = 400 - - objects := make([]*objectSDK.Object, objCount) - for i := 0; i < objCount; i++ { - cnr := cidtest.ID() - var size int - switch i % 6 { - case 0, 1: - size = wcSmallObjectSize - headerSize - case 2, 3: - size = bsSmallObjectSize - headerSize - case 4: - size = wcBigObjectSize - headerSize - default: - size = bsBigObjectSize - headerSize - } - data := make([]byte, size) - rand.Read(data) - obj := testutil.GenerateObjectWithCIDWithPayload(cnr, data) - objects[i] = obj - - var prm shard.PutPrm - prm.SetObject(objects[i]) - _, err := sh.Put(context.Background(), prm) - require.NoError(t, err) - } - - require.NoError(t, sh.SetMode(mode.ReadOnly)) - - t.Run("invalid path", func(t *testing.T) { - var dumpPrm shard.DumpPrm - dumpPrm.WithPath("\x00") - - _, err := sh.Dump(dumpPrm) - require.Error(t, err) - }) - - res, err = sh.Dump(prm) - require.NoError(t, err) - require.Equal(t, objCount, res.Count()) - - t.Run("restore", func(t *testing.T) { - sh := newShard(t, false) - defer releaseShard(sh, t) - - t.Run("empty dump", func(t *testing.T) { - var restorePrm shard.RestorePrm - restorePrm.WithPath(outEmpty) - res, err := sh.Restore(context.Background(), restorePrm) - require.NoError(t, err) - require.Equal(t, 0, res.Count()) - }) - - t.Run("invalid path", func(t *testing.T) { - _, err := sh.Restore(context.Background(), *new(shard.RestorePrm)) - require.ErrorIs(t, err, os.ErrNotExist) - }) - - t.Run("invalid file", func(t *testing.T) { - t.Run("invalid magic", func(t *testing.T) { - out := out + ".wrongmagic" - require.NoError(t, os.WriteFile(out, []byte{0, 0, 0, 0}, os.ModePerm)) - - var restorePrm shard.RestorePrm - restorePrm.WithPath(out) - - _, err := sh.Restore(context.Background(), restorePrm) - require.ErrorIs(t, err, shard.ErrInvalidMagic) - }) - - fileData, err := os.ReadFile(out) - require.NoError(t, err) - - t.Run("incomplete size", func(t *testing.T) { - out := out + ".wrongsize" - fileData := append(fileData, 1) - require.NoError(t, os.WriteFile(out, fileData, os.ModePerm)) - - var restorePrm shard.RestorePrm - restorePrm.WithPath(out) - - _, err := sh.Restore(context.Background(), restorePrm) - require.ErrorIs(t, err, io.ErrUnexpectedEOF) - }) - t.Run("incomplete object data", func(t *testing.T) { - out := out + ".wrongsize" - fileData := append(fileData, 1, 0, 0, 0) - require.NoError(t, os.WriteFile(out, fileData, os.ModePerm)) - - var restorePrm shard.RestorePrm - restorePrm.WithPath(out) - - _, err := sh.Restore(context.Background(), restorePrm) - require.ErrorIs(t, err, io.EOF) - }) - t.Run("invalid object", func(t *testing.T) { - out := out + ".wrongobj" - fileData := append(fileData, 1, 0, 0, 0, 0xFF, 4, 0, 0, 0, 1, 2, 3, 4) - require.NoError(t, os.WriteFile(out, fileData, os.ModePerm)) - - var restorePrm shard.RestorePrm - restorePrm.WithPath(out) - - _, err := sh.Restore(context.Background(), restorePrm) - require.Error(t, err) - - t.Run("skip errors", func(t *testing.T) { - sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil) - t.Cleanup(func() { require.NoError(t, sh.Close()) }) - - var restorePrm shard.RestorePrm - restorePrm.WithPath(out) - restorePrm.WithIgnoreErrors(true) - - res, err := sh.Restore(context.Background(), restorePrm) - require.NoError(t, err) - require.Equal(t, objCount, res.Count()) - require.Equal(t, 2, res.FailCount()) - }) - }) - }) - - var prm shard.RestorePrm - prm.WithPath(out) - t.Run("must allow write", func(t *testing.T) { - require.NoError(t, sh.SetMode(mode.ReadOnly)) - - _, err := sh.Restore(context.Background(), prm) - require.ErrorIs(t, err, shard.ErrReadOnlyMode) - }) - - require.NoError(t, sh.SetMode(mode.ReadWrite)) - - checkRestore(t, sh, prm, objects) - }) -} - -func TestStream(t *testing.T) { - sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil) - defer releaseShard(sh1, t) - - sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil) - defer releaseShard(sh2, t) - - const objCount = 5 - objects := make([]*objectSDK.Object, objCount) - for i := 0; i < objCount; i++ { - cnr := cidtest.ID() - obj := testutil.GenerateObjectWithCID(cnr) - objects[i] = obj - - var prm shard.PutPrm - prm.SetObject(objects[i]) - _, err := sh1.Put(context.Background(), prm) - require.NoError(t, err) - } - - require.NoError(t, sh1.SetMode(mode.ReadOnly)) - - r, w := io.Pipe() - finish := make(chan struct{}) - - go func() { - var dumpPrm shard.DumpPrm - dumpPrm.WithStream(w) - - res, err := sh1.Dump(dumpPrm) - require.NoError(t, err) - require.Equal(t, objCount, res.Count()) - require.NoError(t, w.Close()) - close(finish) - }() - - var restorePrm shard.RestorePrm - restorePrm.WithStream(r) - - checkRestore(t, sh2, restorePrm, objects) - require.Eventually(t, func() bool { - select { - case <-finish: - return true - default: - return false - } - }, time.Second, time.Millisecond) -} - -func checkRestore(t *testing.T, sh *shard.Shard, prm shard.RestorePrm, objects []*objectSDK.Object) { - res, err := sh.Restore(context.Background(), prm) - require.NoError(t, err) - require.Equal(t, len(objects), res.Count()) - - var getPrm shard.GetPrm - - for i := range objects { - getPrm.SetAddress(object.AddressOf(objects[i])) - res, err := sh.Get(context.Background(), getPrm) - require.NoError(t, err) - require.Equal(t, objects[i], res.Object()) - } -} - -func TestDumpIgnoreErrors(t *testing.T) { - const ( - wcSmallObjectSize = 512 // goes to write-cache memory - wcBigObjectSize = wcSmallObjectSize << 1 // goes to write-cache FSTree - bsSmallObjectSize = wcSmallObjectSize << 2 // goes to blobovnicza DB - - objCount = 10 - headerSize = 400 - ) - - dir := t.TempDir() - bsPath := filepath.Join(dir, "blob") - bsOpts := func(sw uint64) []blobstor.Option { - return []blobstor.Option{ - blobstor.WithCompressObjects(true), - blobstor.WithStorages([]blobstor.SubStorage{ - { - Storage: blobovniczatree.NewBlobovniczaTree( - blobovniczatree.WithRootPath(filepath.Join(bsPath, "blobovnicza")), - blobovniczatree.WithBlobovniczaShallowDepth(1), - blobovniczatree.WithBlobovniczaShallowWidth(sw), - blobovniczatree.WithOpenedCacheSize(1)), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return len(data) < bsSmallObjectSize - }, - }, - { - Storage: fstree.New( - fstree.WithPath(bsPath), - fstree.WithDepth(1)), - }, - }), - } - } - wcPath := filepath.Join(dir, "writecache") - wcOpts := []writecache.Option{ - writecache.WithPath(wcPath), - writecache.WithSmallObjectSize(wcSmallObjectSize), - writecache.WithMaxObjectSize(wcBigObjectSize), - } - sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2)) - - objects := make([]*objectSDK.Object, objCount) - for i := 0; i < objCount; i++ { - size := (wcSmallObjectSize << (i % 4)) - headerSize - obj := testutil.GenerateObjectWithCIDWithPayload(cidtest.ID(), make([]byte, size)) - objects[i] = obj - - var prm shard.PutPrm - prm.SetObject(objects[i]) - _, err := sh.Put(context.Background(), prm) - require.NoError(t, err) - } - - releaseShard(sh, t) - - b := bytes.NewBuffer(nil) - badObject := make([]byte, 1000) - enc, err := zstd.NewWriter(b) - require.NoError(t, err) - corruptedData := enc.EncodeAll(badObject, nil) - for i := 4; i < len(corruptedData); i++ { - corruptedData[i] ^= 0xFF - } - - // There are 3 different types of errors to consider. - // To setup envirionment we use implementation details so this test must be updated - // if any of them are changed. - { - // 1. Invalid object in fs tree. - // 1.1. Invalid compressed data. - addr := cidtest.ID().EncodeToString() + "." + objecttest.ID().EncodeToString() - dirName := filepath.Join(bsPath, addr[:2]) - require.NoError(t, os.MkdirAll(dirName, os.ModePerm)) - require.NoError(t, os.WriteFile(filepath.Join(dirName, addr[2:]), corruptedData, os.ModePerm)) - - // 1.2. Unreadable file. - addr = cidtest.ID().EncodeToString() + "." + objecttest.ID().EncodeToString() - dirName = filepath.Join(bsPath, addr[:2]) - require.NoError(t, os.MkdirAll(dirName, os.ModePerm)) - - fname := filepath.Join(dirName, addr[2:]) - require.NoError(t, os.WriteFile(fname, []byte{}, 0)) - - // 1.3. Unreadable dir. - require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0)) - } - - sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3)) - require.NoError(t, sh.SetMode(mode.ReadOnly)) - - { - // 2. Invalid object in blobovnicza. - // 2.1. Invalid blobovnicza. - bTree := filepath.Join(bsPath, "blobovnicza") - data := make([]byte, 1024) - rand.Read(data) - require.NoError(t, os.WriteFile(filepath.Join(bTree, "0", "2"), data, 0)) - - // 2.2. Invalid object in valid blobovnicza. - var prm blobovnicza.PutPrm - prm.SetAddress(oid.Address{}) - prm.SetMarshaledObject(corruptedData) - b := blobovnicza.New(blobovnicza.WithPath(filepath.Join(bTree, "1", "2"))) - require.NoError(t, b.Open()) - _, err := b.Put(prm) - require.NoError(t, err) - require.NoError(t, b.Close()) - } - - { - // 3. Invalid object in write-cache. Note that because shard is read-only - // the object won't be flushed. - addr := cidtest.ID().EncodeToString() + "." + objecttest.ID().EncodeToString() - dir := filepath.Join(wcPath, addr[:1]) - require.NoError(t, os.MkdirAll(dir, os.ModePerm)) - require.NoError(t, os.WriteFile(filepath.Join(dir, addr[1:]), nil, 0)) - } - - out := filepath.Join(t.TempDir(), "out.dump") - var dumpPrm shard.DumpPrm - dumpPrm.WithPath(out) - dumpPrm.WithIgnoreErrors(true) - res, err := sh.Dump(dumpPrm) - require.NoError(t, err) - require.Equal(t, objCount, res.Count()) -} diff --git a/pkg/local_object_storage/shard/restore.go b/pkg/local_object_storage/shard/restore.go deleted file mode 100644 index 2cb64a51..00000000 --- a/pkg/local_object_storage/shard/restore.go +++ /dev/null @@ -1,145 +0,0 @@ -package shard - -import ( - "bytes" - "context" - "encoding/binary" - "errors" - "io" - "os" - - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// ErrInvalidMagic is returned when dump format is invalid. -var ErrInvalidMagic = logicerr.New("invalid magic") - -// RestorePrm groups the parameters of Restore operation. -type RestorePrm struct { - path string - stream io.Reader - ignoreErrors bool -} - -// WithPath is a Restore option to set the destination path. -func (p *RestorePrm) WithPath(path string) { - p.path = path -} - -// WithStream is a Restore option to set the stream to read objects from. -// It takes priority over `WithPath` option. -func (p *RestorePrm) WithStream(r io.Reader) { - p.stream = r -} - -// WithIgnoreErrors is a Restore option which allows to ignore errors encountered during restore. -// Corrupted objects will not be processed. -func (p *RestorePrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// RestoreRes groups the result fields of Restore operation. -type RestoreRes struct { - count int - failed int -} - -// Count return amount of object written. -func (r RestoreRes) Count() int { - return r.count -} - -// FailCount return amount of object skipped. -func (r RestoreRes) FailCount() int { - return r.failed -} - -// Restore restores objects from the dump prepared by Dump. -// -// Returns any error encountered. -func (s *Shard) Restore(ctx context.Context, prm RestorePrm) (RestoreRes, error) { - ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Restore", - trace.WithAttributes( - attribute.String("shard_id", s.ID().String()), - attribute.String("path", prm.path), - attribute.Bool("ignore_errors", prm.ignoreErrors), - )) - defer span.End() - - s.m.RLock() - defer s.m.RUnlock() - - if s.info.Mode.ReadOnly() { - return RestoreRes{}, ErrReadOnlyMode - } - - r := prm.stream - if r == nil { - f, err := os.OpenFile(prm.path, os.O_RDONLY, os.ModeExclusive) - if err != nil { - return RestoreRes{}, err - } - defer f.Close() - - r = f - } - - var m [4]byte - _, _ = io.ReadFull(r, m[:]) - if !bytes.Equal(m[:], dumpMagic) { - return RestoreRes{}, ErrInvalidMagic - } - - var putPrm PutPrm - - var count, failCount int - var data []byte - var size [4]byte - for { - // If there are less than 4 bytes left, `Read` returns nil error instead of - // io.ErrUnexpectedEOF, thus `ReadFull` is used. - _, err := io.ReadFull(r, size[:]) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return RestoreRes{}, err - } - - sz := binary.LittleEndian.Uint32(size[:]) - if uint32(cap(data)) < sz { - data = make([]byte, sz) - } else { - data = data[:sz] - } - - _, err = r.Read(data) - if err != nil { - return RestoreRes{}, err - } - - obj := object.New() - err = obj.Unmarshal(data) - if err != nil { - if prm.ignoreErrors { - failCount++ - continue - } - return RestoreRes{}, err - } - - putPrm.SetObject(obj) - _, err = s.Put(ctx, putPrm) - if err != nil && !IsErrObjectExpired(err) && !IsErrRemoved(err) { - return RestoreRes{}, err - } - - count++ - } - - return RestoreRes{count: count, failed: failCount}, nil -} diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index f7582dd6..84bde31d 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -111,42 +111,6 @@ func (w *setShardModeResponseWrapper) FromGRPCMessage(m grpc.Message) error { return nil } -type dumpShardResponseWrapper struct { - *DumpShardResponse -} - -func (w *dumpShardResponseWrapper) ToGRPCMessage() grpc.Message { - return w.DumpShardResponse -} - -func (w *dumpShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { - r, ok := m.(*DumpShardResponse) - if !ok { - return message.NewUnexpectedMessageType(m, (*DumpShardResponse)(nil)) - } - - w.DumpShardResponse = r - return nil -} - -type restoreShardResponseWrapper struct { - *RestoreShardResponse -} - -func (w *restoreShardResponseWrapper) ToGRPCMessage() grpc.Message { - return w.RestoreShardResponse -} - -func (w *restoreShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { - r, ok := m.(*RestoreShardResponse) - if !ok { - return message.NewUnexpectedMessageType(m, (*RestoreShardResponse)(nil)) - } - - w.RestoreShardResponse = r - return nil -} - type synchronizeTreeResponseWrapper struct { *SynchronizeTreeResponse } diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 2676ea7a..625f485c 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -13,8 +13,6 @@ const ( rpcDropObjects = "DropObjects" rpcListShards = "ListShards" rpcSetShardMode = "SetShardMode" - rpcDumpShard = "DumpShard" - rpcRestoreShard = "RestoreShard" rpcSynchronizeTree = "SynchronizeTree" rpcEvacuateShard = "EvacuateShard" rpcFlushCache = "FlushCache" @@ -128,32 +126,6 @@ func SetShardMode( return wResp.m, nil } -// DumpShard executes ControlService.DumpShard RPC. -func DumpShard(cli *client.Client, req *DumpShardRequest, opts ...client.CallOption) (*DumpShardResponse, error) { - wResp := &dumpShardResponseWrapper{new(DumpShardResponse)} - wReq := &requestWrapper{m: req} - - err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDumpShard), wReq, wResp, opts...) - if err != nil { - return nil, err - } - - return wResp.DumpShardResponse, nil -} - -// RestoreShard executes ControlService.DumpShard RPC. -func RestoreShard(cli *client.Client, req *RestoreShardRequest, opts ...client.CallOption) (*RestoreShardResponse, error) { - wResp := &restoreShardResponseWrapper{new(RestoreShardResponse)} - wReq := &requestWrapper{m: req} - - err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcRestoreShard), wReq, wResp, opts...) - if err != nil { - return nil, err - } - - return wResp.RestoreShardResponse, nil -} - // SynchronizeTree executes ControlService.SynchronizeTree RPC. func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...client.CallOption) (*SynchronizeTreeResponse, error) { wResp := &synchronizeTreeResponseWrapper{new(SynchronizeTreeResponse)} diff --git a/pkg/services/control/server/dump.go b/pkg/services/control/server/dump.go deleted file mode 100644 index 28be02aa..00000000 --- a/pkg/services/control/server/dump.go +++ /dev/null @@ -1,37 +0,0 @@ -package control - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*control.DumpShardResponse, error) { - err := s.isValidRequest(req) - if err != nil { - return nil, status.Error(codes.PermissionDenied, err.Error()) - } - - shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) - - var prm shard.DumpPrm - prm.WithPath(req.GetBody().GetFilepath()) - prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - - err = s.s.DumpShard(shardID, prm) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - resp := new(control.DumpShardResponse) - resp.SetBody(new(control.DumpShardResponse_Body)) - - err = SignMessage(s.key, resp) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - return resp, nil -} diff --git a/pkg/services/control/server/restore.go b/pkg/services/control/server/restore.go deleted file mode 100644 index dba186f5..00000000 --- a/pkg/services/control/server/restore.go +++ /dev/null @@ -1,37 +0,0 @@ -package control - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -func (s *Server) RestoreShard(ctx context.Context, req *control.RestoreShardRequest) (*control.RestoreShardResponse, error) { - err := s.isValidRequest(req) - if err != nil { - return nil, status.Error(codes.PermissionDenied, err.Error()) - } - - shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) - - var prm shard.RestorePrm - prm.WithPath(req.GetBody().GetFilepath()) - prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - - err = s.s.RestoreShard(ctx, shardID, prm) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - resp := new(control.RestoreShardResponse) - resp.SetBody(new(control.RestoreShardResponse_Body)) - - err = SignMessage(s.key, resp) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - return resp, nil -} diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index dd349dc5..ef0c0a8d 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -127,64 +127,6 @@ func (x *SetShardModeResponse) SetBody(v *SetShardModeResponse_Body) { } } -// SetShardID sets shard ID for the dump shard request. -func (x *DumpShardRequest_Body) SetShardID(id []byte) { - x.Shard_ID = id -} - -// SetFilepath sets filepath for the dump shard request. -func (x *DumpShardRequest_Body) SetFilepath(p string) { - x.Filepath = p -} - -// SetIgnoreErrors sets ignore errors flag for the dump shard request. -func (x *DumpShardRequest_Body) SetIgnoreErrors(ignore bool) { - x.IgnoreErrors = ignore -} - -// SetBody sets request body. -func (x *DumpShardRequest) SetBody(v *DumpShardRequest_Body) { - if x != nil { - x.Body = v - } -} - -// SetBody sets response body. -func (x *DumpShardResponse) SetBody(v *DumpShardResponse_Body) { - if x != nil { - x.Body = v - } -} - -// SetShardID sets shard ID for the restore shard request. -func (x *RestoreShardRequest_Body) SetShardID(id []byte) { - x.Shard_ID = id -} - -// SetFilepath sets filepath for the restore shard request. -func (x *RestoreShardRequest_Body) SetFilepath(p string) { - x.Filepath = p -} - -// SetIgnoreErrors sets ignore errors flag for the restore shard request. -func (x *RestoreShardRequest_Body) SetIgnoreErrors(ignore bool) { - x.IgnoreErrors = ignore -} - -// SetBody sets request body. -func (x *RestoreShardRequest) SetBody(v *RestoreShardRequest_Body) { - if x != nil { - x.Body = v - } -} - -// SetBody sets response body. -func (x *RestoreShardResponse) SetBody(v *RestoreShardResponse_Body) { - if x != nil { - x.Body = v - } -} - // SetBody sets list shards request body. func (x *SynchronizeTreeRequest) SetBody(v *SynchronizeTreeRequest_Body) { if x != nil { diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index ca3e2770..d713bb38 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 7c661e66..32a87c74 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -23,12 +23,6 @@ service ControlService { // Sets mode of the shard. rpc SetShardMode (SetShardModeRequest) returns (SetShardModeResponse); - // Dump objects from the shard. - rpc DumpShard (DumpShardRequest) returns (DumpShardResponse); - - // Restore objects from dump. - rpc RestoreShard (RestoreShardRequest) returns (RestoreShardResponse); - // Synchronizes all log operations for the specified tree. rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse); @@ -201,75 +195,6 @@ message SetShardModeResponse { Signature signature = 2; } -// DumpShard request. -message DumpShardRequest { - // Request body structure. - message Body { - // ID of the shard. - bytes shard_ID = 1; - - // Path to the output. - string filepath = 2; - - // Flag indicating whether object read errors should be ignored. - bool ignore_errors = 3; - } - - // Body of dump shard request message. - Body body = 1; - - // Body signature. - Signature signature = 2; -} - -// DumpShard response. -message DumpShardResponse { - // Response body structure. - message Body { - } - - // Body of dump shard response message. - Body body = 1; - - // Body signature. - Signature signature = 2; -} - - -// RestoreShard request. -message RestoreShardRequest { - // Request body structure. - message Body { - // ID of the shard. - bytes shard_ID = 1; - - // Path to the output. - string filepath = 2; - - // Flag indicating whether object read errors should be ignored. - bool ignore_errors = 3; - } - - // Body of restore shard request message. - Body body = 1; - - // Body signature. - Signature signature = 2; -} - -// RestoreShard response. -message RestoreShardResponse { - // Response body structure. - message Body { - } - - // Body of restore shard response message. - Body body = 1; - - // Body signature. - Signature signature = 2; -} - // SynchronizeTree request. message SynchronizeTreeRequest { // Request body structure. diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 0f50d589..b9b865a9 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index 4a4fbeac..1e8dd9e3 100644 Binary files a/pkg/services/control/service_grpc.pb.go and b/pkg/services/control/service_grpc.pb.go differ