forked from TrueCloudLab/frostfs-node
[#250] control: remove DumpShard
and RestoreShard
RPC
We have `Evacuate` with a cleaner interface. Also, remove them from CLI and engine. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
070154d506
commit
8466894fdf
20 changed files with 9 additions and 1148 deletions
|
@ -8,6 +8,8 @@ import (
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const ignoreErrorsFlag = "no-errors"
|
||||||
|
|
||||||
var evacuateShardCmd = &cobra.Command{
|
var evacuateShardCmd = &cobra.Command{
|
||||||
Use: "evacuate",
|
Use: "evacuate",
|
||||||
Short: "Evacuate objects from shard",
|
Short: "Evacuate objects from shard",
|
||||||
|
@ -20,7 +22,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
|
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
|
||||||
req.Body.Shard_ID = getShardIDList(cmd)
|
req.Body.Shard_ID = getShardIDList(cmd)
|
||||||
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(dumpIgnoreErrorsFlag)
|
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
|
|
||||||
signRequest(cmd, pk, req)
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
@ -47,7 +49,7 @@ func initControlEvacuateShardCmd() {
|
||||||
flags := evacuateShardCmd.Flags()
|
flags := evacuateShardCmd.Flags()
|
||||||
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
flags.Bool(shardAllFlag, false, "Process all shards")
|
flags.Bool(shardAllFlag, false, "Process all shards")
|
||||||
flags.Bool(dumpIgnoreErrorsFlag, false, "Skip invalid/unreadable objects")
|
flags.Bool(ignoreErrorsFlag, false, "Skip invalid/unreadable objects")
|
||||||
|
|
||||||
evacuateShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
evacuateShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,16 +13,12 @@ var shardsCmd = &cobra.Command{
|
||||||
func initControlShardsCmd() {
|
func initControlShardsCmd() {
|
||||||
shardsCmd.AddCommand(listShardsCmd)
|
shardsCmd.AddCommand(listShardsCmd)
|
||||||
shardsCmd.AddCommand(setShardModeCmd)
|
shardsCmd.AddCommand(setShardModeCmd)
|
||||||
shardsCmd.AddCommand(dumpShardCmd)
|
|
||||||
shardsCmd.AddCommand(restoreShardCmd)
|
|
||||||
shardsCmd.AddCommand(evacuateShardCmd)
|
shardsCmd.AddCommand(evacuateShardCmd)
|
||||||
shardsCmd.AddCommand(flushCacheCmd)
|
shardsCmd.AddCommand(flushCacheCmd)
|
||||||
shardsCmd.AddCommand(doctorCmd)
|
shardsCmd.AddCommand(doctorCmd)
|
||||||
|
|
||||||
initControlShardsListCmd()
|
initControlShardsListCmd()
|
||||||
initControlSetShardModeCmd()
|
initControlSetShardModeCmd()
|
||||||
initControlDumpShardCmd()
|
|
||||||
initControlRestoreShardCmd()
|
|
||||||
initControlEvacuateShardCmd()
|
initControlEvacuateShardCmd()
|
||||||
initControlFlushCacheCmd()
|
initControlFlushCacheCmd()
|
||||||
initControlDoctorCmd()
|
initControlDoctorCmd()
|
||||||
|
|
|
@ -1,66 +0,0 @@
|
||||||
package control
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
dumpFilepathFlag = "path"
|
|
||||||
dumpIgnoreErrorsFlag = "no-errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var dumpShardCmd = &cobra.Command{
|
|
||||||
Use: "dump",
|
|
||||||
Short: "Dump objects from shard",
|
|
||||||
Long: "Dump objects from shard to a file",
|
|
||||||
Run: dumpShard,
|
|
||||||
}
|
|
||||||
|
|
||||||
func dumpShard(cmd *cobra.Command, _ []string) {
|
|
||||||
pk := key.Get(cmd)
|
|
||||||
|
|
||||||
body := new(control.DumpShardRequest_Body)
|
|
||||||
body.SetShardID(getShardID(cmd))
|
|
||||||
|
|
||||||
p, _ := cmd.Flags().GetString(dumpFilepathFlag)
|
|
||||||
body.SetFilepath(p)
|
|
||||||
|
|
||||||
ignore, _ := cmd.Flags().GetBool(dumpIgnoreErrorsFlag)
|
|
||||||
body.SetIgnoreErrors(ignore)
|
|
||||||
|
|
||||||
req := new(control.DumpShardRequest)
|
|
||||||
req.SetBody(body)
|
|
||||||
|
|
||||||
signRequest(cmd, pk, req)
|
|
||||||
|
|
||||||
cli := getClient(cmd, pk)
|
|
||||||
|
|
||||||
var resp *control.DumpShardResponse
|
|
||||||
var err error
|
|
||||||
err = cli.ExecRaw(func(client *client.Client) error {
|
|
||||||
resp, err = control.DumpShard(client, req)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
||||||
|
|
||||||
cmd.Println("Shard has been dumped successfully.")
|
|
||||||
}
|
|
||||||
|
|
||||||
func initControlDumpShardCmd() {
|
|
||||||
initControlFlags(dumpShardCmd)
|
|
||||||
|
|
||||||
flags := dumpShardCmd.Flags()
|
|
||||||
flags.String(shardIDFlag, "", "Shard ID in base58 encoding")
|
|
||||||
flags.String(dumpFilepathFlag, "", "File to write objects to")
|
|
||||||
flags.Bool(dumpIgnoreErrorsFlag, false, "Skip invalid/unreadable objects")
|
|
||||||
|
|
||||||
_ = dumpShardCmd.MarkFlagRequired(shardIDFlag)
|
|
||||||
_ = dumpShardCmd.MarkFlagRequired(dumpFilepathFlag)
|
|
||||||
_ = dumpShardCmd.MarkFlagRequired(controlRPC)
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
package control
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
restoreFilepathFlag = "path"
|
|
||||||
restoreIgnoreErrorsFlag = "no-errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var restoreShardCmd = &cobra.Command{
|
|
||||||
Use: "restore",
|
|
||||||
Short: "Restore objects from shard",
|
|
||||||
Long: "Restore objects from shard to a file",
|
|
||||||
Run: restoreShard,
|
|
||||||
}
|
|
||||||
|
|
||||||
func restoreShard(cmd *cobra.Command, _ []string) {
|
|
||||||
pk := key.Get(cmd)
|
|
||||||
|
|
||||||
body := new(control.RestoreShardRequest_Body)
|
|
||||||
body.SetShardID(getShardID(cmd))
|
|
||||||
|
|
||||||
p, _ := cmd.Flags().GetString(restoreFilepathFlag)
|
|
||||||
body.SetFilepath(p)
|
|
||||||
|
|
||||||
ignore, _ := cmd.Flags().GetBool(restoreIgnoreErrorsFlag)
|
|
||||||
body.SetIgnoreErrors(ignore)
|
|
||||||
|
|
||||||
req := new(control.RestoreShardRequest)
|
|
||||||
req.SetBody(body)
|
|
||||||
|
|
||||||
signRequest(cmd, pk, req)
|
|
||||||
|
|
||||||
cli := getClient(cmd, pk)
|
|
||||||
|
|
||||||
var resp *control.RestoreShardResponse
|
|
||||||
var err error
|
|
||||||
err = cli.ExecRaw(func(client *client.Client) error {
|
|
||||||
resp, err = control.RestoreShard(client, req)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
|
||||||
|
|
||||||
cmd.Println("Shard has been restored successfully.")
|
|
||||||
}
|
|
||||||
|
|
||||||
func initControlRestoreShardCmd() {
|
|
||||||
initControlFlags(restoreShardCmd)
|
|
||||||
|
|
||||||
flags := restoreShardCmd.Flags()
|
|
||||||
flags.String(shardIDFlag, "", "Shard ID in base58 encoding")
|
|
||||||
flags.String(restoreFilepathFlag, "", "File to read objects from")
|
|
||||||
flags.Bool(restoreIgnoreErrorsFlag, false, "Skip invalid/unreadable objects")
|
|
||||||
|
|
||||||
_ = restoreShardCmd.MarkFlagRequired(shardIDFlag)
|
|
||||||
_ = restoreShardCmd.MarkFlagRequired(restoreFilepathFlag)
|
|
||||||
_ = restoreShardCmd.MarkFlagRequired(controlRPC)
|
|
||||||
}
|
|
|
@ -1,19 +0,0 @@
|
||||||
package engine
|
|
||||||
|
|
||||||
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
|
|
||||||
// DumpShard dumps objects from the shard with provided identifier.
|
|
||||||
//
|
|
||||||
// Returns an error if shard is not read-only.
|
|
||||||
func (e *StorageEngine) DumpShard(id *shard.ID, prm shard.DumpPrm) error {
|
|
||||||
e.mtx.RLock()
|
|
||||||
defer e.mtx.RUnlock()
|
|
||||||
|
|
||||||
sh, ok := e.shards[id.String()]
|
|
||||||
if !ok {
|
|
||||||
return errShardNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := sh.Dump(prm)
|
|
||||||
return err
|
|
||||||
}
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -16,6 +17,8 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode")
|
||||||
|
|
||||||
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
|
||||||
type EvacuateShardPrm struct {
|
type EvacuateShardPrm struct {
|
||||||
shardID []*shard.ID
|
shardID []*shard.ID
|
||||||
|
@ -135,7 +138,7 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !sh.GetMode().ReadOnly() {
|
if !sh.GetMode().ReadOnly() {
|
||||||
return nil, nil, shard.ErrMustBeReadOnly
|
return nil, nil, ErrMustBeReadOnly
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,7 @@ func TestEvacuateShard(t *testing.T) {
|
||||||
|
|
||||||
t.Run("must be read-only", func(t *testing.T) {
|
t.Run("must be read-only", func(t *testing.T) {
|
||||||
res, err := e.Evacuate(context.Background(), prm)
|
res, err := e.Evacuate(context.Background(), prm)
|
||||||
require.ErrorIs(t, err, shard.ErrMustBeReadOnly)
|
require.ErrorIs(t, err, ErrMustBeReadOnly)
|
||||||
require.Equal(t, 0, res.Count())
|
require.Equal(t, 0, res.Count())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
package engine
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RestoreShard restores objects from dump to the shard with provided identifier.
|
|
||||||
//
|
|
||||||
// Returns an error if shard is not read-only.
|
|
||||||
func (e *StorageEngine) RestoreShard(ctx context.Context, id *shard.ID, prm shard.RestorePrm) error {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.RestoreShard",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("shard_id", id.String()),
|
|
||||||
))
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
e.mtx.RLock()
|
|
||||||
defer e.mtx.RUnlock()
|
|
||||||
|
|
||||||
sh, ok := e.shards[id.String()]
|
|
||||||
if !ok {
|
|
||||||
return errShardNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := sh.Restore(ctx, prm)
|
|
||||||
return err
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
package shard
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
||||||
)
|
|
||||||
|
|
||||||
var dumpMagic = []byte("NEOF")
|
|
||||||
|
|
||||||
// DumpPrm groups the parameters of Dump operation.
|
|
||||||
type DumpPrm struct {
|
|
||||||
path string
|
|
||||||
stream io.Writer
|
|
||||||
ignoreErrors bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithPath is an Dump option to set the destination path.
|
|
||||||
func (p *DumpPrm) WithPath(path string) {
|
|
||||||
p.path = path
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithStream is an Dump option to set the destination stream.
|
|
||||||
// It takes priority over `path` option.
|
|
||||||
func (p *DumpPrm) WithStream(r io.Writer) {
|
|
||||||
p.stream = r
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithIgnoreErrors is an Dump option to allow ignore all errors during iteration.
|
|
||||||
// This includes invalid blobovniczas as well as corrupted objects.
|
|
||||||
func (p *DumpPrm) WithIgnoreErrors(ignore bool) {
|
|
||||||
p.ignoreErrors = ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// DumpRes groups the result fields of Dump operation.
|
|
||||||
type DumpRes struct {
|
|
||||||
count int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Count return amount of object written.
|
|
||||||
func (r DumpRes) Count() int {
|
|
||||||
return r.count
|
|
||||||
}
|
|
||||||
|
|
||||||
var ErrMustBeReadOnly = logicerr.New("shard must be in read-only mode")
|
|
||||||
|
|
||||||
// Dump dumps all objects from the shard to a file or stream.
|
|
||||||
//
|
|
||||||
// Returns any error encountered.
|
|
||||||
func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
|
|
||||||
s.m.RLock()
|
|
||||||
defer s.m.RUnlock()
|
|
||||||
|
|
||||||
if !s.info.Mode.ReadOnly() {
|
|
||||||
return DumpRes{}, ErrMustBeReadOnly
|
|
||||||
}
|
|
||||||
|
|
||||||
w := prm.stream
|
|
||||||
if w == nil {
|
|
||||||
f, err := os.OpenFile(prm.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640)
|
|
||||||
if err != nil {
|
|
||||||
return DumpRes{}, err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
w = f
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := w.Write(dumpMagic)
|
|
||||||
if err != nil {
|
|
||||||
return DumpRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var count int
|
|
||||||
|
|
||||||
if s.hasWriteCache() {
|
|
||||||
var iterPrm writecache.IterationPrm
|
|
||||||
|
|
||||||
iterPrm.WithIgnoreErrors(prm.ignoreErrors)
|
|
||||||
iterPrm.WithHandler(func(data []byte) error {
|
|
||||||
var size [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
|
||||||
if _, err := w.Write(size[:]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := w.Write(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
count++
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
err := s.writeCache.Iterate(iterPrm)
|
|
||||||
if err != nil {
|
|
||||||
return DumpRes{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var pi common.IteratePrm
|
|
||||||
pi.IgnoreErrors = prm.ignoreErrors
|
|
||||||
pi.Handler = func(elem common.IterationElement) error {
|
|
||||||
data := elem.ObjectData
|
|
||||||
|
|
||||||
var size [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(size[:], uint32(len(data)))
|
|
||||||
if _, err := w.Write(size[:]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := w.Write(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
count++
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := s.blobStor.Iterate(pi); err != nil {
|
|
||||||
return DumpRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return DumpRes{count: count}, nil
|
|
||||||
}
|
|
|
@ -1,412 +0,0 @@
|
||||||
package shard_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"io"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
||||||
"github.com/klauspost/compress/zstd"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"go.uber.org/zap/zaptest"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestDump(t *testing.T) {
|
|
||||||
t.Run("without write-cache", func(t *testing.T) {
|
|
||||||
testDump(t, 10, false)
|
|
||||||
})
|
|
||||||
t.Run("with write-cache", func(t *testing.T) {
|
|
||||||
// Put a bit more objects to write-cache to facilitate race-conditions.
|
|
||||||
testDump(t, 100, true)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|
||||||
const (
|
|
||||||
wcSmallObjectSize = 1024 // 1 KiB, goes to write-cache memory
|
|
||||||
wcBigObjectSize = 4 * 1024 // 4 KiB, goes to write-cache FSTree
|
|
||||||
bsSmallObjectSize = 10 * 1024 // 10 KiB, goes to blobovnicza DB
|
|
||||||
bsBigObjectSize = 1024*1024 + 1 // > 1 MiB, goes to blobovnicza FSTree
|
|
||||||
)
|
|
||||||
|
|
||||||
var sh *shard.Shard
|
|
||||||
if !hasWriteCache {
|
|
||||||
sh = newShard(t, false)
|
|
||||||
} else {
|
|
||||||
sh = newCustomShard(t, t.TempDir(), true,
|
|
||||||
[]writecache.Option{
|
|
||||||
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
|
||||||
writecache.WithMaxObjectSize(wcBigObjectSize),
|
|
||||||
writecache.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
|
||||||
},
|
|
||||||
nil)
|
|
||||||
}
|
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
out := filepath.Join(t.TempDir(), "dump")
|
|
||||||
var prm shard.DumpPrm
|
|
||||||
prm.WithPath(out)
|
|
||||||
|
|
||||||
t.Run("must be read-only", func(t *testing.T) {
|
|
||||||
_, err := sh.Dump(prm)
|
|
||||||
require.ErrorIs(t, err, shard.ErrMustBeReadOnly)
|
|
||||||
})
|
|
||||||
|
|
||||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
|
||||||
outEmpty := out + ".empty"
|
|
||||||
var dumpPrm shard.DumpPrm
|
|
||||||
dumpPrm.WithPath(outEmpty)
|
|
||||||
|
|
||||||
res, err := sh.Dump(dumpPrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 0, res.Count())
|
|
||||||
require.NoError(t, sh.SetMode(mode.ReadWrite))
|
|
||||||
|
|
||||||
// Approximate object header size.
|
|
||||||
const headerSize = 400
|
|
||||||
|
|
||||||
objects := make([]*objectSDK.Object, objCount)
|
|
||||||
for i := 0; i < objCount; i++ {
|
|
||||||
cnr := cidtest.ID()
|
|
||||||
var size int
|
|
||||||
switch i % 6 {
|
|
||||||
case 0, 1:
|
|
||||||
size = wcSmallObjectSize - headerSize
|
|
||||||
case 2, 3:
|
|
||||||
size = bsSmallObjectSize - headerSize
|
|
||||||
case 4:
|
|
||||||
size = wcBigObjectSize - headerSize
|
|
||||||
default:
|
|
||||||
size = bsBigObjectSize - headerSize
|
|
||||||
}
|
|
||||||
data := make([]byte, size)
|
|
||||||
rand.Read(data)
|
|
||||||
obj := testutil.GenerateObjectWithCIDWithPayload(cnr, data)
|
|
||||||
objects[i] = obj
|
|
||||||
|
|
||||||
var prm shard.PutPrm
|
|
||||||
prm.SetObject(objects[i])
|
|
||||||
_, err := sh.Put(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
t.Run("invalid path", func(t *testing.T) {
|
|
||||||
var dumpPrm shard.DumpPrm
|
|
||||||
dumpPrm.WithPath("\x00")
|
|
||||||
|
|
||||||
_, err := sh.Dump(dumpPrm)
|
|
||||||
require.Error(t, err)
|
|
||||||
})
|
|
||||||
|
|
||||||
res, err = sh.Dump(prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objCount, res.Count())
|
|
||||||
|
|
||||||
t.Run("restore", func(t *testing.T) {
|
|
||||||
sh := newShard(t, false)
|
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
t.Run("empty dump", func(t *testing.T) {
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithPath(outEmpty)
|
|
||||||
res, err := sh.Restore(context.Background(), restorePrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 0, res.Count())
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("invalid path", func(t *testing.T) {
|
|
||||||
_, err := sh.Restore(context.Background(), *new(shard.RestorePrm))
|
|
||||||
require.ErrorIs(t, err, os.ErrNotExist)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("invalid file", func(t *testing.T) {
|
|
||||||
t.Run("invalid magic", func(t *testing.T) {
|
|
||||||
out := out + ".wrongmagic"
|
|
||||||
require.NoError(t, os.WriteFile(out, []byte{0, 0, 0, 0}, os.ModePerm))
|
|
||||||
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithPath(out)
|
|
||||||
|
|
||||||
_, err := sh.Restore(context.Background(), restorePrm)
|
|
||||||
require.ErrorIs(t, err, shard.ErrInvalidMagic)
|
|
||||||
})
|
|
||||||
|
|
||||||
fileData, err := os.ReadFile(out)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Run("incomplete size", func(t *testing.T) {
|
|
||||||
out := out + ".wrongsize"
|
|
||||||
fileData := append(fileData, 1)
|
|
||||||
require.NoError(t, os.WriteFile(out, fileData, os.ModePerm))
|
|
||||||
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithPath(out)
|
|
||||||
|
|
||||||
_, err := sh.Restore(context.Background(), restorePrm)
|
|
||||||
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
|
|
||||||
})
|
|
||||||
t.Run("incomplete object data", func(t *testing.T) {
|
|
||||||
out := out + ".wrongsize"
|
|
||||||
fileData := append(fileData, 1, 0, 0, 0)
|
|
||||||
require.NoError(t, os.WriteFile(out, fileData, os.ModePerm))
|
|
||||||
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithPath(out)
|
|
||||||
|
|
||||||
_, err := sh.Restore(context.Background(), restorePrm)
|
|
||||||
require.ErrorIs(t, err, io.EOF)
|
|
||||||
})
|
|
||||||
t.Run("invalid object", func(t *testing.T) {
|
|
||||||
out := out + ".wrongobj"
|
|
||||||
fileData := append(fileData, 1, 0, 0, 0, 0xFF, 4, 0, 0, 0, 1, 2, 3, 4)
|
|
||||||
require.NoError(t, os.WriteFile(out, fileData, os.ModePerm))
|
|
||||||
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithPath(out)
|
|
||||||
|
|
||||||
_, err := sh.Restore(context.Background(), restorePrm)
|
|
||||||
require.Error(t, err)
|
|
||||||
|
|
||||||
t.Run("skip errors", func(t *testing.T) {
|
|
||||||
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil)
|
|
||||||
t.Cleanup(func() { require.NoError(t, sh.Close()) })
|
|
||||||
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithPath(out)
|
|
||||||
restorePrm.WithIgnoreErrors(true)
|
|
||||||
|
|
||||||
res, err := sh.Restore(context.Background(), restorePrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objCount, res.Count())
|
|
||||||
require.Equal(t, 2, res.FailCount())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
var prm shard.RestorePrm
|
|
||||||
prm.WithPath(out)
|
|
||||||
t.Run("must allow write", func(t *testing.T) {
|
|
||||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
_, err := sh.Restore(context.Background(), prm)
|
|
||||||
require.ErrorIs(t, err, shard.ErrReadOnlyMode)
|
|
||||||
})
|
|
||||||
|
|
||||||
require.NoError(t, sh.SetMode(mode.ReadWrite))
|
|
||||||
|
|
||||||
checkRestore(t, sh, prm, objects)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStream(t *testing.T) {
|
|
||||||
sh1 := newCustomShard(t, filepath.Join(t.TempDir(), "shard1"), false, nil, nil)
|
|
||||||
defer releaseShard(sh1, t)
|
|
||||||
|
|
||||||
sh2 := newCustomShard(t, filepath.Join(t.TempDir(), "shard2"), false, nil, nil)
|
|
||||||
defer releaseShard(sh2, t)
|
|
||||||
|
|
||||||
const objCount = 5
|
|
||||||
objects := make([]*objectSDK.Object, objCount)
|
|
||||||
for i := 0; i < objCount; i++ {
|
|
||||||
cnr := cidtest.ID()
|
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
|
||||||
objects[i] = obj
|
|
||||||
|
|
||||||
var prm shard.PutPrm
|
|
||||||
prm.SetObject(objects[i])
|
|
||||||
_, err := sh1.Put(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, sh1.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
r, w := io.Pipe()
|
|
||||||
finish := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
var dumpPrm shard.DumpPrm
|
|
||||||
dumpPrm.WithStream(w)
|
|
||||||
|
|
||||||
res, err := sh1.Dump(dumpPrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objCount, res.Count())
|
|
||||||
require.NoError(t, w.Close())
|
|
||||||
close(finish)
|
|
||||||
}()
|
|
||||||
|
|
||||||
var restorePrm shard.RestorePrm
|
|
||||||
restorePrm.WithStream(r)
|
|
||||||
|
|
||||||
checkRestore(t, sh2, restorePrm, objects)
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
select {
|
|
||||||
case <-finish:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}, time.Second, time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkRestore(t *testing.T, sh *shard.Shard, prm shard.RestorePrm, objects []*objectSDK.Object) {
|
|
||||||
res, err := sh.Restore(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, len(objects), res.Count())
|
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
|
||||||
|
|
||||||
for i := range objects {
|
|
||||||
getPrm.SetAddress(object.AddressOf(objects[i]))
|
|
||||||
res, err := sh.Get(context.Background(), getPrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objects[i], res.Object())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDumpIgnoreErrors(t *testing.T) {
|
|
||||||
const (
|
|
||||||
wcSmallObjectSize = 512 // goes to write-cache memory
|
|
||||||
wcBigObjectSize = wcSmallObjectSize << 1 // goes to write-cache FSTree
|
|
||||||
bsSmallObjectSize = wcSmallObjectSize << 2 // goes to blobovnicza DB
|
|
||||||
|
|
||||||
objCount = 10
|
|
||||||
headerSize = 400
|
|
||||||
)
|
|
||||||
|
|
||||||
dir := t.TempDir()
|
|
||||||
bsPath := filepath.Join(dir, "blob")
|
|
||||||
bsOpts := func(sw uint64) []blobstor.Option {
|
|
||||||
return []blobstor.Option{
|
|
||||||
blobstor.WithCompressObjects(true),
|
|
||||||
blobstor.WithStorages([]blobstor.SubStorage{
|
|
||||||
{
|
|
||||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
|
||||||
blobovniczatree.WithRootPath(filepath.Join(bsPath, "blobovnicza")),
|
|
||||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
|
||||||
blobovniczatree.WithBlobovniczaShallowWidth(sw),
|
|
||||||
blobovniczatree.WithOpenedCacheSize(1)),
|
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
||||||
return len(data) < bsSmallObjectSize
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Storage: fstree.New(
|
|
||||||
fstree.WithPath(bsPath),
|
|
||||||
fstree.WithDepth(1)),
|
|
||||||
},
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wcPath := filepath.Join(dir, "writecache")
|
|
||||||
wcOpts := []writecache.Option{
|
|
||||||
writecache.WithPath(wcPath),
|
|
||||||
writecache.WithSmallObjectSize(wcSmallObjectSize),
|
|
||||||
writecache.WithMaxObjectSize(wcBigObjectSize),
|
|
||||||
}
|
|
||||||
sh := newCustomShard(t, dir, true, wcOpts, bsOpts(2))
|
|
||||||
|
|
||||||
objects := make([]*objectSDK.Object, objCount)
|
|
||||||
for i := 0; i < objCount; i++ {
|
|
||||||
size := (wcSmallObjectSize << (i % 4)) - headerSize
|
|
||||||
obj := testutil.GenerateObjectWithCIDWithPayload(cidtest.ID(), make([]byte, size))
|
|
||||||
objects[i] = obj
|
|
||||||
|
|
||||||
var prm shard.PutPrm
|
|
||||||
prm.SetObject(objects[i])
|
|
||||||
_, err := sh.Put(context.Background(), prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
releaseShard(sh, t)
|
|
||||||
|
|
||||||
b := bytes.NewBuffer(nil)
|
|
||||||
badObject := make([]byte, 1000)
|
|
||||||
enc, err := zstd.NewWriter(b)
|
|
||||||
require.NoError(t, err)
|
|
||||||
corruptedData := enc.EncodeAll(badObject, nil)
|
|
||||||
for i := 4; i < len(corruptedData); i++ {
|
|
||||||
corruptedData[i] ^= 0xFF
|
|
||||||
}
|
|
||||||
|
|
||||||
// There are 3 different types of errors to consider.
|
|
||||||
// To setup envirionment we use implementation details so this test must be updated
|
|
||||||
// if any of them are changed.
|
|
||||||
{
|
|
||||||
// 1. Invalid object in fs tree.
|
|
||||||
// 1.1. Invalid compressed data.
|
|
||||||
addr := cidtest.ID().EncodeToString() + "." + objecttest.ID().EncodeToString()
|
|
||||||
dirName := filepath.Join(bsPath, addr[:2])
|
|
||||||
require.NoError(t, os.MkdirAll(dirName, os.ModePerm))
|
|
||||||
require.NoError(t, os.WriteFile(filepath.Join(dirName, addr[2:]), corruptedData, os.ModePerm))
|
|
||||||
|
|
||||||
// 1.2. Unreadable file.
|
|
||||||
addr = cidtest.ID().EncodeToString() + "." + objecttest.ID().EncodeToString()
|
|
||||||
dirName = filepath.Join(bsPath, addr[:2])
|
|
||||||
require.NoError(t, os.MkdirAll(dirName, os.ModePerm))
|
|
||||||
|
|
||||||
fname := filepath.Join(dirName, addr[2:])
|
|
||||||
require.NoError(t, os.WriteFile(fname, []byte{}, 0))
|
|
||||||
|
|
||||||
// 1.3. Unreadable dir.
|
|
||||||
require.NoError(t, os.MkdirAll(filepath.Join(bsPath, "ZZ"), 0))
|
|
||||||
}
|
|
||||||
|
|
||||||
sh = newCustomShard(t, dir, true, wcOpts, bsOpts(3))
|
|
||||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
{
|
|
||||||
// 2. Invalid object in blobovnicza.
|
|
||||||
// 2.1. Invalid blobovnicza.
|
|
||||||
bTree := filepath.Join(bsPath, "blobovnicza")
|
|
||||||
data := make([]byte, 1024)
|
|
||||||
rand.Read(data)
|
|
||||||
require.NoError(t, os.WriteFile(filepath.Join(bTree, "0", "2"), data, 0))
|
|
||||||
|
|
||||||
// 2.2. Invalid object in valid blobovnicza.
|
|
||||||
var prm blobovnicza.PutPrm
|
|
||||||
prm.SetAddress(oid.Address{})
|
|
||||||
prm.SetMarshaledObject(corruptedData)
|
|
||||||
b := blobovnicza.New(blobovnicza.WithPath(filepath.Join(bTree, "1", "2")))
|
|
||||||
require.NoError(t, b.Open())
|
|
||||||
_, err := b.Put(prm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NoError(t, b.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// 3. Invalid object in write-cache. Note that because shard is read-only
|
|
||||||
// the object won't be flushed.
|
|
||||||
addr := cidtest.ID().EncodeToString() + "." + objecttest.ID().EncodeToString()
|
|
||||||
dir := filepath.Join(wcPath, addr[:1])
|
|
||||||
require.NoError(t, os.MkdirAll(dir, os.ModePerm))
|
|
||||||
require.NoError(t, os.WriteFile(filepath.Join(dir, addr[1:]), nil, 0))
|
|
||||||
}
|
|
||||||
|
|
||||||
out := filepath.Join(t.TempDir(), "out.dump")
|
|
||||||
var dumpPrm shard.DumpPrm
|
|
||||||
dumpPrm.WithPath(out)
|
|
||||||
dumpPrm.WithIgnoreErrors(true)
|
|
||||||
res, err := sh.Dump(dumpPrm)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, objCount, res.Count())
|
|
||||||
}
|
|
|
@ -1,145 +0,0 @@
|
||||||
package shard
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ErrInvalidMagic is returned when dump format is invalid.
|
|
||||||
var ErrInvalidMagic = logicerr.New("invalid magic")
|
|
||||||
|
|
||||||
// RestorePrm groups the parameters of Restore operation.
|
|
||||||
type RestorePrm struct {
|
|
||||||
path string
|
|
||||||
stream io.Reader
|
|
||||||
ignoreErrors bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithPath is a Restore option to set the destination path.
|
|
||||||
func (p *RestorePrm) WithPath(path string) {
|
|
||||||
p.path = path
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithStream is a Restore option to set the stream to read objects from.
|
|
||||||
// It takes priority over `WithPath` option.
|
|
||||||
func (p *RestorePrm) WithStream(r io.Reader) {
|
|
||||||
p.stream = r
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithIgnoreErrors is a Restore option which allows to ignore errors encountered during restore.
|
|
||||||
// Corrupted objects will not be processed.
|
|
||||||
func (p *RestorePrm) WithIgnoreErrors(ignore bool) {
|
|
||||||
p.ignoreErrors = ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// RestoreRes groups the result fields of Restore operation.
|
|
||||||
type RestoreRes struct {
|
|
||||||
count int
|
|
||||||
failed int
|
|
||||||
}
|
|
||||||
|
|
||||||
// Count return amount of object written.
|
|
||||||
func (r RestoreRes) Count() int {
|
|
||||||
return r.count
|
|
||||||
}
|
|
||||||
|
|
||||||
// FailCount return amount of object skipped.
|
|
||||||
func (r RestoreRes) FailCount() int {
|
|
||||||
return r.failed
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restore restores objects from the dump prepared by Dump.
|
|
||||||
//
|
|
||||||
// Returns any error encountered.
|
|
||||||
func (s *Shard) Restore(ctx context.Context, prm RestorePrm) (RestoreRes, error) {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Restore",
|
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.String("shard_id", s.ID().String()),
|
|
||||||
attribute.String("path", prm.path),
|
|
||||||
attribute.Bool("ignore_errors", prm.ignoreErrors),
|
|
||||||
))
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
s.m.RLock()
|
|
||||||
defer s.m.RUnlock()
|
|
||||||
|
|
||||||
if s.info.Mode.ReadOnly() {
|
|
||||||
return RestoreRes{}, ErrReadOnlyMode
|
|
||||||
}
|
|
||||||
|
|
||||||
r := prm.stream
|
|
||||||
if r == nil {
|
|
||||||
f, err := os.OpenFile(prm.path, os.O_RDONLY, os.ModeExclusive)
|
|
||||||
if err != nil {
|
|
||||||
return RestoreRes{}, err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
r = f
|
|
||||||
}
|
|
||||||
|
|
||||||
var m [4]byte
|
|
||||||
_, _ = io.ReadFull(r, m[:])
|
|
||||||
if !bytes.Equal(m[:], dumpMagic) {
|
|
||||||
return RestoreRes{}, ErrInvalidMagic
|
|
||||||
}
|
|
||||||
|
|
||||||
var putPrm PutPrm
|
|
||||||
|
|
||||||
var count, failCount int
|
|
||||||
var data []byte
|
|
||||||
var size [4]byte
|
|
||||||
for {
|
|
||||||
// If there are less than 4 bytes left, `Read` returns nil error instead of
|
|
||||||
// io.ErrUnexpectedEOF, thus `ReadFull` is used.
|
|
||||||
_, err := io.ReadFull(r, size[:])
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return RestoreRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sz := binary.LittleEndian.Uint32(size[:])
|
|
||||||
if uint32(cap(data)) < sz {
|
|
||||||
data = make([]byte, sz)
|
|
||||||
} else {
|
|
||||||
data = data[:sz]
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = r.Read(data)
|
|
||||||
if err != nil {
|
|
||||||
return RestoreRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := object.New()
|
|
||||||
err = obj.Unmarshal(data)
|
|
||||||
if err != nil {
|
|
||||||
if prm.ignoreErrors {
|
|
||||||
failCount++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return RestoreRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
putPrm.SetObject(obj)
|
|
||||||
_, err = s.Put(ctx, putPrm)
|
|
||||||
if err != nil && !IsErrObjectExpired(err) && !IsErrRemoved(err) {
|
|
||||||
return RestoreRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
|
|
||||||
return RestoreRes{count: count, failed: failCount}, nil
|
|
||||||
}
|
|
|
@ -111,42 +111,6 @@ func (w *setShardModeResponseWrapper) FromGRPCMessage(m grpc.Message) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type dumpShardResponseWrapper struct {
|
|
||||||
*DumpShardResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *dumpShardResponseWrapper) ToGRPCMessage() grpc.Message {
|
|
||||||
return w.DumpShardResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *dumpShardResponseWrapper) FromGRPCMessage(m grpc.Message) error {
|
|
||||||
r, ok := m.(*DumpShardResponse)
|
|
||||||
if !ok {
|
|
||||||
return message.NewUnexpectedMessageType(m, (*DumpShardResponse)(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
w.DumpShardResponse = r
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type restoreShardResponseWrapper struct {
|
|
||||||
*RestoreShardResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *restoreShardResponseWrapper) ToGRPCMessage() grpc.Message {
|
|
||||||
return w.RestoreShardResponse
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *restoreShardResponseWrapper) FromGRPCMessage(m grpc.Message) error {
|
|
||||||
r, ok := m.(*RestoreShardResponse)
|
|
||||||
if !ok {
|
|
||||||
return message.NewUnexpectedMessageType(m, (*RestoreShardResponse)(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
w.RestoreShardResponse = r
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type synchronizeTreeResponseWrapper struct {
|
type synchronizeTreeResponseWrapper struct {
|
||||||
*SynchronizeTreeResponse
|
*SynchronizeTreeResponse
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,8 +13,6 @@ const (
|
||||||
rpcDropObjects = "DropObjects"
|
rpcDropObjects = "DropObjects"
|
||||||
rpcListShards = "ListShards"
|
rpcListShards = "ListShards"
|
||||||
rpcSetShardMode = "SetShardMode"
|
rpcSetShardMode = "SetShardMode"
|
||||||
rpcDumpShard = "DumpShard"
|
|
||||||
rpcRestoreShard = "RestoreShard"
|
|
||||||
rpcSynchronizeTree = "SynchronizeTree"
|
rpcSynchronizeTree = "SynchronizeTree"
|
||||||
rpcEvacuateShard = "EvacuateShard"
|
rpcEvacuateShard = "EvacuateShard"
|
||||||
rpcFlushCache = "FlushCache"
|
rpcFlushCache = "FlushCache"
|
||||||
|
@ -128,32 +126,6 @@ func SetShardMode(
|
||||||
return wResp.m, nil
|
return wResp.m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DumpShard executes ControlService.DumpShard RPC.
|
|
||||||
func DumpShard(cli *client.Client, req *DumpShardRequest, opts ...client.CallOption) (*DumpShardResponse, error) {
|
|
||||||
wResp := &dumpShardResponseWrapper{new(DumpShardResponse)}
|
|
||||||
wReq := &requestWrapper{m: req}
|
|
||||||
|
|
||||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDumpShard), wReq, wResp, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return wResp.DumpShardResponse, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RestoreShard executes ControlService.DumpShard RPC.
|
|
||||||
func RestoreShard(cli *client.Client, req *RestoreShardRequest, opts ...client.CallOption) (*RestoreShardResponse, error) {
|
|
||||||
wResp := &restoreShardResponseWrapper{new(RestoreShardResponse)}
|
|
||||||
wReq := &requestWrapper{m: req}
|
|
||||||
|
|
||||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcRestoreShard), wReq, wResp, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return wResp.RestoreShardResponse, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SynchronizeTree executes ControlService.SynchronizeTree RPC.
|
// SynchronizeTree executes ControlService.SynchronizeTree RPC.
|
||||||
func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...client.CallOption) (*SynchronizeTreeResponse, error) {
|
func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...client.CallOption) (*SynchronizeTreeResponse, error) {
|
||||||
wResp := &synchronizeTreeResponseWrapper{new(SynchronizeTreeResponse)}
|
wResp := &synchronizeTreeResponseWrapper{new(SynchronizeTreeResponse)}
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
package control
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*control.DumpShardResponse, error) {
|
|
||||||
err := s.isValidRequest(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())
|
|
||||||
|
|
||||||
var prm shard.DumpPrm
|
|
||||||
prm.WithPath(req.GetBody().GetFilepath())
|
|
||||||
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
|
|
||||||
|
|
||||||
err = s.s.DumpShard(shardID, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(control.DumpShardResponse)
|
|
||||||
resp.SetBody(new(control.DumpShardResponse_Body))
|
|
||||||
|
|
||||||
err = SignMessage(s.key, resp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
package control
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Server) RestoreShard(ctx context.Context, req *control.RestoreShardRequest) (*control.RestoreShardResponse, error) {
|
|
||||||
err := s.isValidRequest(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())
|
|
||||||
|
|
||||||
var prm shard.RestorePrm
|
|
||||||
prm.WithPath(req.GetBody().GetFilepath())
|
|
||||||
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
|
|
||||||
|
|
||||||
err = s.s.RestoreShard(ctx, shardID, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := new(control.RestoreShardResponse)
|
|
||||||
resp.SetBody(new(control.RestoreShardResponse_Body))
|
|
||||||
|
|
||||||
err = SignMessage(s.key, resp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
|
@ -127,64 +127,6 @@ func (x *SetShardModeResponse) SetBody(v *SetShardModeResponse_Body) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetShardID sets shard ID for the dump shard request.
|
|
||||||
func (x *DumpShardRequest_Body) SetShardID(id []byte) {
|
|
||||||
x.Shard_ID = id
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetFilepath sets filepath for the dump shard request.
|
|
||||||
func (x *DumpShardRequest_Body) SetFilepath(p string) {
|
|
||||||
x.Filepath = p
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetIgnoreErrors sets ignore errors flag for the dump shard request.
|
|
||||||
func (x *DumpShardRequest_Body) SetIgnoreErrors(ignore bool) {
|
|
||||||
x.IgnoreErrors = ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetBody sets request body.
|
|
||||||
func (x *DumpShardRequest) SetBody(v *DumpShardRequest_Body) {
|
|
||||||
if x != nil {
|
|
||||||
x.Body = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetBody sets response body.
|
|
||||||
func (x *DumpShardResponse) SetBody(v *DumpShardResponse_Body) {
|
|
||||||
if x != nil {
|
|
||||||
x.Body = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetShardID sets shard ID for the restore shard request.
|
|
||||||
func (x *RestoreShardRequest_Body) SetShardID(id []byte) {
|
|
||||||
x.Shard_ID = id
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetFilepath sets filepath for the restore shard request.
|
|
||||||
func (x *RestoreShardRequest_Body) SetFilepath(p string) {
|
|
||||||
x.Filepath = p
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetIgnoreErrors sets ignore errors flag for the restore shard request.
|
|
||||||
func (x *RestoreShardRequest_Body) SetIgnoreErrors(ignore bool) {
|
|
||||||
x.IgnoreErrors = ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetBody sets request body.
|
|
||||||
func (x *RestoreShardRequest) SetBody(v *RestoreShardRequest_Body) {
|
|
||||||
if x != nil {
|
|
||||||
x.Body = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetBody sets response body.
|
|
||||||
func (x *RestoreShardResponse) SetBody(v *RestoreShardResponse_Body) {
|
|
||||||
if x != nil {
|
|
||||||
x.Body = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetBody sets list shards request body.
|
// SetBody sets list shards request body.
|
||||||
func (x *SynchronizeTreeRequest) SetBody(v *SynchronizeTreeRequest_Body) {
|
func (x *SynchronizeTreeRequest) SetBody(v *SynchronizeTreeRequest_Body) {
|
||||||
if x != nil {
|
if x != nil {
|
||||||
|
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -23,12 +23,6 @@ service ControlService {
|
||||||
// Sets mode of the shard.
|
// Sets mode of the shard.
|
||||||
rpc SetShardMode (SetShardModeRequest) returns (SetShardModeResponse);
|
rpc SetShardMode (SetShardModeRequest) returns (SetShardModeResponse);
|
||||||
|
|
||||||
// Dump objects from the shard.
|
|
||||||
rpc DumpShard (DumpShardRequest) returns (DumpShardResponse);
|
|
||||||
|
|
||||||
// Restore objects from dump.
|
|
||||||
rpc RestoreShard (RestoreShardRequest) returns (RestoreShardResponse);
|
|
||||||
|
|
||||||
// Synchronizes all log operations for the specified tree.
|
// Synchronizes all log operations for the specified tree.
|
||||||
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
|
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
|
||||||
|
|
||||||
|
@ -201,75 +195,6 @@ message SetShardModeResponse {
|
||||||
Signature signature = 2;
|
Signature signature = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
// DumpShard request.
|
|
||||||
message DumpShardRequest {
|
|
||||||
// Request body structure.
|
|
||||||
message Body {
|
|
||||||
// ID of the shard.
|
|
||||||
bytes shard_ID = 1;
|
|
||||||
|
|
||||||
// Path to the output.
|
|
||||||
string filepath = 2;
|
|
||||||
|
|
||||||
// Flag indicating whether object read errors should be ignored.
|
|
||||||
bool ignore_errors = 3;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Body of dump shard request message.
|
|
||||||
Body body = 1;
|
|
||||||
|
|
||||||
// Body signature.
|
|
||||||
Signature signature = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// DumpShard response.
|
|
||||||
message DumpShardResponse {
|
|
||||||
// Response body structure.
|
|
||||||
message Body {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Body of dump shard response message.
|
|
||||||
Body body = 1;
|
|
||||||
|
|
||||||
// Body signature.
|
|
||||||
Signature signature = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// RestoreShard request.
|
|
||||||
message RestoreShardRequest {
|
|
||||||
// Request body structure.
|
|
||||||
message Body {
|
|
||||||
// ID of the shard.
|
|
||||||
bytes shard_ID = 1;
|
|
||||||
|
|
||||||
// Path to the output.
|
|
||||||
string filepath = 2;
|
|
||||||
|
|
||||||
// Flag indicating whether object read errors should be ignored.
|
|
||||||
bool ignore_errors = 3;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Body of restore shard request message.
|
|
||||||
Body body = 1;
|
|
||||||
|
|
||||||
// Body signature.
|
|
||||||
Signature signature = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// RestoreShard response.
|
|
||||||
message RestoreShardResponse {
|
|
||||||
// Response body structure.
|
|
||||||
message Body {
|
|
||||||
}
|
|
||||||
|
|
||||||
// Body of restore shard response message.
|
|
||||||
Body body = 1;
|
|
||||||
|
|
||||||
// Body signature.
|
|
||||||
Signature signature = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// SynchronizeTree request.
|
// SynchronizeTree request.
|
||||||
message SynchronizeTreeRequest {
|
message SynchronizeTreeRequest {
|
||||||
// Request body structure.
|
// Request body structure.
|
||||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue