diff --git a/cmd/frostfs-cli/modules/control/doctor.go b/cmd/frostfs-cli/modules/control/doctor.go new file mode 100644 index 000000000..13bb81a0a --- /dev/null +++ b/cmd/frostfs-cli/modules/control/doctor.go @@ -0,0 +1,53 @@ +package control + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "github.com/spf13/cobra" +) + +const ( + concurrencyFlag = "concurrency" + removeDuplicatesFlag = "remove-duplicates" +) + +var doctorCmd = &cobra.Command{ + Use: "doctor", + Short: "Restructure node's storage", + Long: "Restructure node's storage", + Run: doctor, +} + +func doctor(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + + req := &control.DoctorRequest{Body: new(control.DoctorRequest_Body)} + req.Body.Concurrency, _ = cmd.Flags().GetUint32(concurrencyFlag) + req.Body.RemoveDuplicates, _ = cmd.Flags().GetBool(removeDuplicatesFlag) + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.DoctorResponse + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.Doctor(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + cmd.Println("Operation has finished.") +} + +func initControlDoctorCmd() { + initControlFlags(doctorCmd) + + ff := doctorCmd.Flags() + ff.Uint32(concurrencyFlag, 0, "Number of parallel threads to use") + ff.Bool(removeDuplicatesFlag, false, "Remove duplicate objects") +} diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index 6719a4acf..9d3eb5c01 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -17,6 +17,7 @@ func initControlShardsCmd() { shardsCmd.AddCommand(restoreShardCmd) shardsCmd.AddCommand(evacuateShardCmd) shardsCmd.AddCommand(flushCacheCmd) + shardsCmd.AddCommand(doctorCmd) initControlShardsListCmd() initControlSetShardModeCmd() @@ -24,4 +25,5 @@ func initControlShardsCmd() { initControlRestoreShardCmd() initControlEvacuateShardCmd() initControlFlushCacheCmd() + initControlDoctorCmd() } diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 4d154d289..e0161bfe3 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -17,6 +17,8 @@ import ( type StorageEngine struct { *cfg + removeDuplicatesInProgress atomic.Bool + mtx *sync.RWMutex shards map[string]hashedShard diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go new file mode 100644 index 000000000..d881a52d1 --- /dev/null +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -0,0 +1,138 @@ +package engine + +import ( + "context" + "errors" + + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/hrw" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress. +// We need it because `Rebalance` removes objects and executing it concurrently +// on 2 shards can lead to data loss. In future this restriction could be relaxed. +var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress") + +const defaultRemoveDuplicatesConcurrency = 256 + +type RemoveDuplicatesPrm struct { + Concurrency int +} + +// RemoveDuplicates iterates over all objects and removes duplicate object copies +// from shards which are worse as defined by HRW sort. +// Safety: +// 1. Concurrent execution is prohibited, thus 1 object copy should always be left. +// 2. If we delete an object from another thread, this is not a problem. Currently, +// we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects: +// policer and rebalance. For rebalance see (1). +// If policer removes something, we do not care if both copies are removed or one of them is left, +// as the remaining copy will be removed during the next policer iteration. +func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error { + if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) { + return errRemoveDuplicatesInProgress + } + defer e.removeDuplicatesInProgress.Store(false) + + if prm.Concurrency <= 0 { + prm.Concurrency = defaultRemoveDuplicatesConcurrency + } + + e.log.Info("starting removal of locally-redundant copies", + zap.Int("concurrency", prm.Concurrency)) + + // The mutext must be taken for the whole duration to avoid target shard being removed + // concurrently: this can lead to data loss. + e.mtx.RLock() + defer e.mtx.RUnlock() + + // Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously. + // This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0. + // However we could change weights in future and easily forget this function. + for _, sh := range e.shards { + e.log.Debug("started duplicates removal routine", zap.String("shard_id", sh.ID().String())) + ch := make(chan oid.Address) + + errG, ctx := errgroup.WithContext(ctx) + errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread + + errG.Go(func() error { + defer close(ch) + + var cursor *meta.Cursor + for { + var listPrm shard.ListWithCursorPrm + listPrm.WithCount(uint32(prm.Concurrency)) + listPrm.WithCursor(cursor) + res, err := sh.ListWithCursor(listPrm) + if err != nil { + if errors.Is(err, meta.ErrEndOfListing) { + return nil + } + return err + } + for _, addr := range res.AddressList() { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- addr.Address: + } + } + cursor = res.Cursor() + } + }) + + for i := 0; i < prm.Concurrency; i++ { + errG.Go(func() error { + return e.removeObjects(ctx, ch) + }) + } + if err := errG.Wait(); err != nil { + e.log.Error("finished removal of locally-redundant copies", zap.Error(err)) + return err + } + } + + e.log.Info("finished removal of locally-redundant copies") + return nil +} + +// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID. +func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error { + shards := make([]hashedShard, 0, len(e.shards)) + for _, sh := range e.shards { + shards = append(shards, sh) + } + + for addr := range ch { + h := hrw.Hash([]byte(addr.EncodeToString())) + shards := sortShardsByWeight(shards, h) + found := false + for i := range shards { + var existsPrm shard.ExistsPrm + existsPrm.SetAddress(addr) + + res, err := shards[i].Exists(existsPrm) + if err != nil { + return err + } else if !res.Exists() { + continue + } else if !found { + found = true + continue + } + + var deletePrm shard.DeletePrm + deletePrm.SetAddresses(addr) + _, err = shards[i].Delete(deletePrm) + if err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/local_object_storage/engine/remove_copies_test.go b/pkg/local_object_storage/engine/remove_copies_test.go new file mode 100644 index 000000000..4415d01c8 --- /dev/null +++ b/pkg/local_object_storage/engine/remove_copies_test.go @@ -0,0 +1,208 @@ +package engine + +import ( + "context" + "sync" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestRebalance(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + const ( + objCount = 20 + copyCount = (objCount + 2) / 3 + ) + + type objectWithShard struct { + bestShard shard.ID + worstShard shard.ID + object *objectSDK.Object + } + + objects := make([]objectWithShard, objCount) + for i := range objects { + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + objects[i].object = obj + + shards := te.ng.sortShardsByWeight(object.AddressOf(obj)) + objects[i].bestShard = *shards[0].Shard.ID() + objects[i].worstShard = *shards[1].Shard.ID() + } + + for i := range objects { + var prm shard.PutPrm + prm.SetObject(objects[i].object) + + var err1, err2 error + te.ng.mtx.RLock() + // Every 3rd object (i%3 == 0) is put to both shards, others are distributed. + if i%3 != 1 { + _, err1 = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + } + if i%3 != 2 { + _, err2 = te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + } + te.ng.mtx.RUnlock() + + require.NoError(t, err1) + require.NoError(t, err2) + } + + var removedMtx sync.Mutex + var removed []deleteEvent + for _, shard := range te.shards { + id := *shard.id + shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) { + removedMtx.Lock() + removed = append(removed, deleteEvent{shardID: id, addr: prm.Address}) + removedMtx.Unlock() + return common.DeleteRes{}, nil + })) + } + + err := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + require.NoError(t, err) + + require.Equal(t, copyCount, len(removed)) + + removedMask := make([]bool, len(objects)) +loop: + for i := range removed { + for j := range objects { + if removed[i].addr == object.AddressOf(objects[j].object) { + require.Equal(t, objects[j].worstShard, removed[i].shardID, + "object %d was expected to be removed from another shard", j) + removedMask[j] = true + continue loop + } + } + require.FailNow(t, "unexpected object was removed", removed[i].addr) + } + + for i := 0; i < copyCount; i++ { + if i%3 == 0 { + require.True(t, removedMask[i], "object %d was expected to be removed", i) + } else { + require.False(t, removedMask[i], "object %d was not expected to be removed", i) + } + } +} + +func TestRebalanceSingleThread(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + + var prm shard.PutPrm + prm.SetObject(obj) + te.ng.mtx.RLock() + _, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + _, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + te.ng.mtx.RUnlock() + require.NoError(t, err1) + require.NoError(t, err2) + + signal := make(chan struct{}) // unblock rebalance + started := make(chan struct{}) // make sure rebalance is started + for _, shard := range te.shards { + shard.largeFileStorage.SetOption(teststore.WithDelete(func(common.DeletePrm) (common.DeleteRes, error) { + close(started) + <-signal + return common.DeleteRes{}, nil + })) + } + + var firstErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + firstErr = te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + }() + + <-started + secondErr := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + require.ErrorIs(t, secondErr, errRemoveDuplicatesInProgress) + + close(signal) + wg.Wait() + require.NoError(t, firstErr) +} + +type deleteEvent struct { + shardID shard.ID + addr oid.Address +} + +func TestRebalanceExitByContext(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + objects := make([]*objectSDK.Object, 4) + for i := range objects { + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + objects[i] = obj + } + + for i := range objects { + var prm shard.PutPrm + prm.SetObject(objects[i]) + + te.ng.mtx.RLock() + _, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + _, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + te.ng.mtx.RUnlock() + + require.NoError(t, err1) + require.NoError(t, err2) + } + + var removed []deleteEvent + deleteCh := make(chan struct{}) + signal := make(chan struct{}) + for _, shard := range te.shards { + id := *shard.id + shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) { + deleteCh <- struct{}{} + <-signal + removed = append(removed, deleteEvent{shardID: id, addr: prm.Address}) + return common.DeleteRes{}, nil + })) + } + + ctx, cancel := context.WithCancel(context.Background()) + + var rebalanceErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + rebalanceErr = te.ng.RemoveDuplicates(ctx, RemoveDuplicatesPrm{Concurrency: 1}) + }() + + const removeCount = 3 + for i := 0; i < removeCount-1; i++ { + <-deleteCh + signal <- struct{}{} + } + <-deleteCh + cancel() + close(signal) + + wg.Wait() + require.ErrorIs(t, rebalanceErr, context.Canceled) + require.Equal(t, removeCount, len(removed)) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 34210d835..2b1146ff2 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -208,16 +208,21 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s e.mtx.RLock() defer e.mtx.RUnlock() + h := hrw.Hash([]byte(objAddr.EncodeToString())) shards := make([]hashedShard, 0, len(e.shards)) - weights := make([]float64, 0, len(e.shards)) - for _, sh := range e.shards { shards = append(shards, hashedShard(sh)) - weights = append(weights, e.shardWeight(sh.Shard)) + } + return sortShardsByWeight(shards, h) +} + +func sortShardsByWeight(shards []hashedShard, h uint64) []hashedShard { + weights := make([]float64, 0, len(shards)) + for _, sh := range shards { + weights = append(weights, float64(sh.Shard.WeightValues().FreeSpace)) } - hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString()))) - + hrw.SortHasherSliceByWeightValue(shards, weights, h) return shards } diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index 833288bb7..f7582dd68 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error { w.FlushCacheResponse = r return nil } + +type doctorResponseWrapper struct { + *DoctorResponse +} + +func (w *doctorResponseWrapper) ToGRPCMessage() grpc.Message { + return w.DoctorResponse +} + +func (w *doctorResponseWrapper) FromGRPCMessage(m grpc.Message) error { + r, ok := m.(*DoctorResponse) + if !ok { + return message.NewUnexpectedMessageType(m, (*DoctorResponse)(nil)) + } + + w.DoctorResponse = r + return nil +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 0779e177b..2676ea7a5 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -18,6 +18,7 @@ const ( rpcSynchronizeTree = "SynchronizeTree" rpcEvacuateShard = "EvacuateShard" rpcFlushCache = "FlushCache" + rpcDoctor = "Doctor" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO return wResp.FlushCacheResponse, nil } + +// Doctor executes ControlService.Doctor RPC. +func Doctor(cli *client.Client, req *DoctorRequest, opts ...client.CallOption) (*DoctorResponse, error) { + wResp := &doctorResponseWrapper{new(DoctorResponse)} + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDoctor), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.DoctorResponse, nil +} diff --git a/pkg/services/control/server/doctor.go b/pkg/services/control/server/doctor.go new file mode 100644 index 000000000..2c91d4c2b --- /dev/null +++ b/pkg/services/control/server/doctor.go @@ -0,0 +1,37 @@ +package control + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) Doctor(ctx context.Context, req *control.DoctorRequest) (*control.DoctorResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + if !req.Body.RemoveDuplicates { + return nil, status.Error(codes.InvalidArgument, "operation not specified") + } + + var prm engine.RemoveDuplicatesPrm + prm.Concurrency = int(req.Body.Concurrency) + + err = s.s.RemoveDuplicates(ctx, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.DoctorResponse{Body: &control.DoctorResponse_Body{}} + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 78e7b0598..ca3e2770e 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 5a09a74a5..7c661e661 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -37,6 +37,9 @@ service ControlService { // FlushCache moves all data from one shard to the others. rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse); + + // Doctor performs storage restructuring operations on engine. + rpc Doctor (DoctorRequest) returns (DoctorResponse); } // Health check request. @@ -345,3 +348,28 @@ message FlushCacheResponse { Body body = 1; Signature signature = 2; } + + +// Doctor request. +message DoctorRequest { + // Request body structure. + message Body { + // Number of threads to use for the operation. + uint32 concurrency = 1; + // Flag to search engine for duplicate objects and leave only one copy. + bool remove_duplicates = 2; + } + + Body body = 1; + Signature signature = 2; +} + +// Doctor response. +message DoctorResponse { + // Response body structure. + message Body { + } + + Body body = 1; + Signature signature = 2; +} diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 5bb119090..0f50d5893 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index 2cfddd7f5..4a4fbeac1 100644 Binary files a/pkg/services/control/service_grpc.pb.go and b/pkg/services/control/service_grpc.pb.go differ