From fc44ced8394c31f01c92221e37a781ab84055ecb Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 30 Mar 2023 14:49:15 +0300 Subject: [PATCH 1/3] [#191] engine: Allow to remove redundant object copies RemoveDuplicates() removes all duplicate object copies stored on multiple shards. All shards are processed and the command tries to leave a copy on the best shard according to HRW. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/engine.go | 2 + .../engine/remove_copies.go | 138 ++++++++++++ .../engine/remove_copies_test.go | 208 ++++++++++++++++++ pkg/local_object_storage/engine/shards.go | 15 +- 4 files changed, 358 insertions(+), 5 deletions(-) create mode 100644 pkg/local_object_storage/engine/remove_copies.go create mode 100644 pkg/local_object_storage/engine/remove_copies_test.go diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 4d154d289..e0161bfe3 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -17,6 +17,8 @@ import ( type StorageEngine struct { *cfg + removeDuplicatesInProgress atomic.Bool + mtx *sync.RWMutex shards map[string]hashedShard diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go new file mode 100644 index 000000000..d881a52d1 --- /dev/null +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -0,0 +1,138 @@ +package engine + +import ( + "context" + "errors" + + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/hrw" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress. +// We need it because `Rebalance` removes objects and executing it concurrently +// on 2 shards can lead to data loss. In future this restriction could be relaxed. +var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress") + +const defaultRemoveDuplicatesConcurrency = 256 + +type RemoveDuplicatesPrm struct { + Concurrency int +} + +// RemoveDuplicates iterates over all objects and removes duplicate object copies +// from shards which are worse as defined by HRW sort. +// Safety: +// 1. Concurrent execution is prohibited, thus 1 object copy should always be left. +// 2. If we delete an object from another thread, this is not a problem. Currently, +// we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects: +// policer and rebalance. For rebalance see (1). +// If policer removes something, we do not care if both copies are removed or one of them is left, +// as the remaining copy will be removed during the next policer iteration. +func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error { + if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) { + return errRemoveDuplicatesInProgress + } + defer e.removeDuplicatesInProgress.Store(false) + + if prm.Concurrency <= 0 { + prm.Concurrency = defaultRemoveDuplicatesConcurrency + } + + e.log.Info("starting removal of locally-redundant copies", + zap.Int("concurrency", prm.Concurrency)) + + // The mutext must be taken for the whole duration to avoid target shard being removed + // concurrently: this can lead to data loss. + e.mtx.RLock() + defer e.mtx.RUnlock() + + // Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously. + // This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0. + // However we could change weights in future and easily forget this function. + for _, sh := range e.shards { + e.log.Debug("started duplicates removal routine", zap.String("shard_id", sh.ID().String())) + ch := make(chan oid.Address) + + errG, ctx := errgroup.WithContext(ctx) + errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread + + errG.Go(func() error { + defer close(ch) + + var cursor *meta.Cursor + for { + var listPrm shard.ListWithCursorPrm + listPrm.WithCount(uint32(prm.Concurrency)) + listPrm.WithCursor(cursor) + res, err := sh.ListWithCursor(listPrm) + if err != nil { + if errors.Is(err, meta.ErrEndOfListing) { + return nil + } + return err + } + for _, addr := range res.AddressList() { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- addr.Address: + } + } + cursor = res.Cursor() + } + }) + + for i := 0; i < prm.Concurrency; i++ { + errG.Go(func() error { + return e.removeObjects(ctx, ch) + }) + } + if err := errG.Wait(); err != nil { + e.log.Error("finished removal of locally-redundant copies", zap.Error(err)) + return err + } + } + + e.log.Info("finished removal of locally-redundant copies") + return nil +} + +// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID. +func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error { + shards := make([]hashedShard, 0, len(e.shards)) + for _, sh := range e.shards { + shards = append(shards, sh) + } + + for addr := range ch { + h := hrw.Hash([]byte(addr.EncodeToString())) + shards := sortShardsByWeight(shards, h) + found := false + for i := range shards { + var existsPrm shard.ExistsPrm + existsPrm.SetAddress(addr) + + res, err := shards[i].Exists(existsPrm) + if err != nil { + return err + } else if !res.Exists() { + continue + } else if !found { + found = true + continue + } + + var deletePrm shard.DeletePrm + deletePrm.SetAddresses(addr) + _, err = shards[i].Delete(deletePrm) + if err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/local_object_storage/engine/remove_copies_test.go b/pkg/local_object_storage/engine/remove_copies_test.go new file mode 100644 index 000000000..4415d01c8 --- /dev/null +++ b/pkg/local_object_storage/engine/remove_copies_test.go @@ -0,0 +1,208 @@ +package engine + +import ( + "context" + "sync" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestRebalance(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + const ( + objCount = 20 + copyCount = (objCount + 2) / 3 + ) + + type objectWithShard struct { + bestShard shard.ID + worstShard shard.ID + object *objectSDK.Object + } + + objects := make([]objectWithShard, objCount) + for i := range objects { + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + objects[i].object = obj + + shards := te.ng.sortShardsByWeight(object.AddressOf(obj)) + objects[i].bestShard = *shards[0].Shard.ID() + objects[i].worstShard = *shards[1].Shard.ID() + } + + for i := range objects { + var prm shard.PutPrm + prm.SetObject(objects[i].object) + + var err1, err2 error + te.ng.mtx.RLock() + // Every 3rd object (i%3 == 0) is put to both shards, others are distributed. + if i%3 != 1 { + _, err1 = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + } + if i%3 != 2 { + _, err2 = te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + } + te.ng.mtx.RUnlock() + + require.NoError(t, err1) + require.NoError(t, err2) + } + + var removedMtx sync.Mutex + var removed []deleteEvent + for _, shard := range te.shards { + id := *shard.id + shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) { + removedMtx.Lock() + removed = append(removed, deleteEvent{shardID: id, addr: prm.Address}) + removedMtx.Unlock() + return common.DeleteRes{}, nil + })) + } + + err := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + require.NoError(t, err) + + require.Equal(t, copyCount, len(removed)) + + removedMask := make([]bool, len(objects)) +loop: + for i := range removed { + for j := range objects { + if removed[i].addr == object.AddressOf(objects[j].object) { + require.Equal(t, objects[j].worstShard, removed[i].shardID, + "object %d was expected to be removed from another shard", j) + removedMask[j] = true + continue loop + } + } + require.FailNow(t, "unexpected object was removed", removed[i].addr) + } + + for i := 0; i < copyCount; i++ { + if i%3 == 0 { + require.True(t, removedMask[i], "object %d was expected to be removed", i) + } else { + require.False(t, removedMask[i], "object %d was not expected to be removed", i) + } + } +} + +func TestRebalanceSingleThread(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + + var prm shard.PutPrm + prm.SetObject(obj) + te.ng.mtx.RLock() + _, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + _, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + te.ng.mtx.RUnlock() + require.NoError(t, err1) + require.NoError(t, err2) + + signal := make(chan struct{}) // unblock rebalance + started := make(chan struct{}) // make sure rebalance is started + for _, shard := range te.shards { + shard.largeFileStorage.SetOption(teststore.WithDelete(func(common.DeletePrm) (common.DeleteRes, error) { + close(started) + <-signal + return common.DeleteRes{}, nil + })) + } + + var firstErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + firstErr = te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + }() + + <-started + secondErr := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + require.ErrorIs(t, secondErr, errRemoveDuplicatesInProgress) + + close(signal) + wg.Wait() + require.NoError(t, firstErr) +} + +type deleteEvent struct { + shardID shard.ID + addr oid.Address +} + +func TestRebalanceExitByContext(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + objects := make([]*objectSDK.Object, 4) + for i := range objects { + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + objects[i] = obj + } + + for i := range objects { + var prm shard.PutPrm + prm.SetObject(objects[i]) + + te.ng.mtx.RLock() + _, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + _, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + te.ng.mtx.RUnlock() + + require.NoError(t, err1) + require.NoError(t, err2) + } + + var removed []deleteEvent + deleteCh := make(chan struct{}) + signal := make(chan struct{}) + for _, shard := range te.shards { + id := *shard.id + shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) { + deleteCh <- struct{}{} + <-signal + removed = append(removed, deleteEvent{shardID: id, addr: prm.Address}) + return common.DeleteRes{}, nil + })) + } + + ctx, cancel := context.WithCancel(context.Background()) + + var rebalanceErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + rebalanceErr = te.ng.RemoveDuplicates(ctx, RemoveDuplicatesPrm{Concurrency: 1}) + }() + + const removeCount = 3 + for i := 0; i < removeCount-1; i++ { + <-deleteCh + signal <- struct{}{} + } + <-deleteCh + cancel() + close(signal) + + wg.Wait() + require.ErrorIs(t, rebalanceErr, context.Canceled) + require.Equal(t, removeCount, len(removed)) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 34210d835..2b1146ff2 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -208,16 +208,21 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s e.mtx.RLock() defer e.mtx.RUnlock() + h := hrw.Hash([]byte(objAddr.EncodeToString())) shards := make([]hashedShard, 0, len(e.shards)) - weights := make([]float64, 0, len(e.shards)) - for _, sh := range e.shards { shards = append(shards, hashedShard(sh)) - weights = append(weights, e.shardWeight(sh.Shard)) + } + return sortShardsByWeight(shards, h) +} + +func sortShardsByWeight(shards []hashedShard, h uint64) []hashedShard { + weights := make([]float64, 0, len(shards)) + for _, sh := range shards { + weights = append(weights, float64(sh.Shard.WeightValues().FreeSpace)) } - hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString()))) - + hrw.SortHasherSliceByWeightValue(shards, weights, h) return shards } -- 2.45.2 From 18568730495d5ab621e2485e109c7bd3e4ea2f50 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 7 Apr 2023 14:21:05 +0300 Subject: [PATCH 2/3] [#191] control: Add Doctor RPC Doctor RPC performs complex operations on the storage engine. Currently only duplicate removal is supported. Signed-off-by: Evgenii Stratonikov --- pkg/services/control/convert.go | 18 ++++++++++ pkg/services/control/rpc.go | 14 ++++++++ pkg/services/control/server/doctor.go | 37 +++++++++++++++++++++ pkg/services/control/service.pb.go | Bin 111752 -> 121755 bytes pkg/services/control/service.proto | 28 ++++++++++++++++ pkg/services/control/service_frostfs.pb.go | Bin 52798 -> 57948 bytes pkg/services/control/service_grpc.pb.go | Bin 18217 -> 20760 bytes 7 files changed, 97 insertions(+) create mode 100644 pkg/services/control/server/doctor.go diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index 833288bb7..f7582dd68 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error { w.FlushCacheResponse = r return nil } + +type doctorResponseWrapper struct { + *DoctorResponse +} + +func (w *doctorResponseWrapper) ToGRPCMessage() grpc.Message { + return w.DoctorResponse +} + +func (w *doctorResponseWrapper) FromGRPCMessage(m grpc.Message) error { + r, ok := m.(*DoctorResponse) + if !ok { + return message.NewUnexpectedMessageType(m, (*DoctorResponse)(nil)) + } + + w.DoctorResponse = r + return nil +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 0779e177b..2676ea7a5 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -18,6 +18,7 @@ const ( rpcSynchronizeTree = "SynchronizeTree" rpcEvacuateShard = "EvacuateShard" rpcFlushCache = "FlushCache" + rpcDoctor = "Doctor" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO return wResp.FlushCacheResponse, nil } + +// Doctor executes ControlService.Doctor RPC. +func Doctor(cli *client.Client, req *DoctorRequest, opts ...client.CallOption) (*DoctorResponse, error) { + wResp := &doctorResponseWrapper{new(DoctorResponse)} + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDoctor), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.DoctorResponse, nil +} diff --git a/pkg/services/control/server/doctor.go b/pkg/services/control/server/doctor.go new file mode 100644 index 000000000..2c91d4c2b --- /dev/null +++ b/pkg/services/control/server/doctor.go @@ -0,0 +1,37 @@ +package control + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) Doctor(ctx context.Context, req *control.DoctorRequest) (*control.DoctorResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + if !req.Body.RemoveDuplicates { + return nil, status.Error(codes.InvalidArgument, "operation not specified") + } + + var prm engine.RemoveDuplicatesPrm + prm.Concurrency = int(req.Body.Concurrency) + + err = s.s.RemoveDuplicates(ctx, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.DoctorResponse{Body: &control.DoctorResponse_Body{}} + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 78e7b0598d93a7a763bfc152815feab18749387b..ca3e2770e1425432e663a12d4763d7420d57f637 100644 GIT binary patch delta 6014 zcma(#X;@TOmftC;DypbrQ3Mfz0$kXww#MR6$h{MNlAhjC!P6BC(OC;)k9q7`-UHn zbI(2dc5mJ1zR|pVOxtJN9y7;P>2`- zeYP^6ar-J@Q+A20TRVpJYt?LL?_AavY-I64YH2;Q9ofO%gY1$<-hcK)itlbmdjPx0 zMBkNXjKE6w@m(Y9dn%aCJe$Jqb{Ku}&uKMm%1a{4WZ~@ePCbd8&&=aPBsg!kmJJ_| zVrx5NUpzn>Z>?r~4_s%bqGeLjz=r*B=IhnIg3fg8z_!0p&YtNWBq@ATEm<-w!CKaKw#xU@{$v$TQUKf3iUw#R!;V@?(%l0Iy*`BWNXdTFe zgfnOQPuRE;eok1cn`1;?>3uH;ht6!WE!rG4*NZPmTh4-$r_3wsY-fN)uwe{@? zVE=K72qIYibT#|S;knG@2w{Ei8hKD&d}k|r{LFRV`gcDFU_<{RvY98NSkLjB1P8L} zkFNUMC&U2u!SDD+Fe{p`VMV93Qe<~62$$far}wg)PX~R;XFm>L?_Cht6UAeh>7UiC z>-b!Ddl{wi!$Zve`5?<%Ea81+ACtt_e$NR**!d;Fl1aChMd0}wit??lSoZ$EwXno5 zIEPF?mEv1}ZcPAlT@rn!&mw_cyFAD~T^ot{Z-JVUkm4)5I5&XVz80D3vvAf|rtz8n zpaS;RHBKfWnCTC4qRTESYo2V;cm6+nRP4YFk;QF_VDST5@_sh^q(+7dFC)|ZX^`z1 z3}x*#YT8$`vX4`I!2`(wtof$Mb~(q%cHcZahqyvn&sS1h*|ck|?8dD@-=0BTfJ!j4 z#=}*<;Tsy@FwDHhoUI0CxWOwZg&Btoc)Nl;3HwINS}%&e!+*X&gci=khHn^H-!=^! zy7@gv$-HA|DZ67FdX}XgOYnE>(Dk2Z7d6#ah;Eas+T^Wqi{(`wlh3Fc6nX4MAJr(%bA;4If}0BgUH1s z(j%6;D{D-mb3?6D7GBlVP*+=7?j;o!8^ubm$5igDGS!LYPZGvi*GwPLROxDHCdH*< zz3WMFj>6$#!!IPwb4$pw5jmc8u6>4#zDN6jZ4Ys=#$QbE2c&anoHD{phc>7LE03vy z>F8aofV)3SRlxZ7>OtUhOhL4qu=}MsFS@g==5*5|XMU-C5|usfs%lO#%@e(&WBdH| zOX#pE|W{Fxf^ z%d$*Vwyqp5RltF+bU#(Y8%~vAW0r-3euU+$L)kj(QCTg%7lwr8X;v#mVM}EW|C%k* zw_N&G@vk88uf1CO?k|NX95g@-Udn_>m9?7P`TG*wI}c`_3exq>bJE^>-p)|Msdlw5I0U%P~{i<03s7W!j6W~M+eRAGh{qS?CdbZ!4X zR!OiF9g9^`Y^~rZvVy)ADPZ?$l)@$l%wd)tTL1=LjZrNB?`k&mg9cCPK`TrD%Tgl9D1_wu z$Tj<3B>{fQ1Pq-+(n|!;1&?w8jRh3Da|INol?96Z6iO|+G?0$-3m^X4TaQB14c69sE%kU zED`(71We;z^Y-0e1QDp2K-^ahFc$mgLrSPnAytDvHI;a#n08_!s5?I`qKJL<803=A zV=yj)jCaNI){UW(m14f57{>5~WMSU|2<9XSyw&p~Xnw@5f(&;0ZyKCi0^>L~FAl62 zw;Z&P!{F{@*84w^yriaLUlL5@@`CWoQi#Ru#pHQ=Ao)YPI?h>MLV-G!LeO6=HQYh2;d*4z`CR)%uhO+kjG}6_!6f|5)(K2%OR3^mjH-dp9G8V!@PEx@| zMc_aZrL#>%%1{#py*|Av;Sc2_wRCY9I!|}hClb`v85Q%Xs z$hGC!Q23zKLn}zef|at=utJEX5=h3&D~bGezHHHj6%@aQl@LQFhT+LV(k%7NKt2^& zXuj;nz-;n^N;T3dHRgS@fH&r-!A{YUhmzc#7P}G}oiq`GXd-ZWyK-uzY2}gWAiA-< zXHpEx89}0CGHKFKdZ=BBU}B&^eUHVfU>1*P=PHOL5Bg|nk_?e2yd*=Ahp04-?VKrm zHO!hIsh4#~v{q>b^TEjSzMUr}KMc~3U0n@Nt5iY?hOC8*V|&-afZBhWV#zwnasOHf zOf&JnY12$rb1q$coL;x9E>r#i=s)B2x@7-PMl4=WL*Y~{d2ziGLKRsxl_03Bxmdo5 z`s{Qiq=F4i>!`4Us=%TWa&c%Y#N%qd$dVQlSbww@d#h-Kw$M)syz2xb{;dk^;J_g# zwRC|9X2`}_F6xtCa5#sCdT#G?mT5 z7vovCPlHcagN6Dq-%b5hzkz!0i)u(F3kgB@VmE1?w1G?(aKJ;(oo)p+ws}YcVZ->{ zA8%LtHDqy;c#PvhqG}+VHCznl`hzrFzdaYvcxkqet08F)%&&n`a^Z9Zoyl}9$GQ#F zq{nMWbq+_yV=PAm*TPh~ZevF+dDp#}`eJ!4q|harW0arwc%}-X%}SmG>;I9bOn$2y zm8^~pFg`}&;^`G^53=FDTFBw)xlm6XbFG%l&ce`*@Hp79l>c+!zKxJaPW<~uSV1Ao z*aRzrsOm_XMauR>)HOmHn(IMB+fvHkNrk@3F|3=ypC2k zf>xSvLY9o0(hS);KNSrIX#nnM21^j#4LPS{R9VtIQ{zB0 zq-f=&2)yRvFmo#nsgGKyJX*HG3^cV;(3fqe1McgsQ~+MSw6)O2IS&22h2ofH!;&_d zK(|_?NcXowG)`{?YSpbIWp^v&Q)d&ljB3HW4c1^Im;U-T5On+p2-%gD!}pb^M?AU> zqN47LxL!wn!71&S&`QidY=gZ0?XqR_Ncwq>JhmP3RCME)3?R#babg=-Rd#`!w??*x z?9$_TLdzyabMuUt+zwe}95JTgv+dN_l7(Ni!yIaBNt~kt>_Jw!IUU>v|5GE*>VWaF zimh@o^*X!k)*8Dx*I~i8JLpWp4hS*G^}I&N$+ep~I&~CA$l;hs{E$x;E>8r}o81lrp6`K=aCSGL z%DU({p@&5*d<8znx*i#I4fh`;XlFNZzJExDj5v)BLH=fUEdEs(12}LHE@R_Mv`dhI zt+NFqZG42&e?>}7J7Fw4dPI#k-ype_hlz?%{Ao?n{vkg922mY3Oz(QPjt67+5n_Dj zO`;;yWgPe?f{u2Q2zo5Tt|KJE_-jH9zX?}y`VoRs--55Px|g84j69%MEqcV#aK}Ex z&Z8u{uNRbODUB@tcpp&{`YH_r({6&lJPMciiI=b&7Vw1u!`_A-9NIxOBUm|lq5SI} dXT1WS(=Y$gs53oq2>afKn;;zP`VIUE{ujNd+SdR8 delta 3502 zcmYjUd05nS8h_rIodafuVE{!CX*5??B$yL~B|X6%}@BdD2XD^Gw3*FrO4@+i}yj#PZq4){1?}nkFYOvS+1S{#OcMInk?-_%A-ctN|zt21M z?ODIV!GpQ=DM1*2Opi5ppFneBX#EilRt`pC*^yF~s<8O*m)`3~Tb20lzp-in&OK1_Q$ z2zxVi-osZmD>3gQ7yh!;gr2K%Fl>xD|3 zdRZ8TWA#5Z=>FjZt&$P(SFT^!MBY#a8veJBt4t3b}$@g-yH3o+s9L@y<_yw z`(~jMqrP%^gKvidHvP-z)!qqE;*GC!amQLc_T3fs-{HbrH?Z(Ey|?JU8x^?!J0T3A z{!c-Tg+JcGaRWjcg0=TlJir}Y_tX$OUu(C;nk{RlETNy4LIh=I!bD1O!3afQ09n>T z7%g*wQNNkLqzRraZnw1MN!XP~#9;E>j#89^$^kmKjwSe0K z7-pS7>K+A?C~g@9r~?DoWiIuH!XvaV7^Vun#$~J*91XvrgLxczFcenMk~|nq!4sKP zP6?=Y7P8wbAuwNf3KBNck|2@_ToB;r_`!1KShZZ{7)KYE%W5_G;H2oWOu@1S%)_}x z2^|S(6YN=MJUy}uj3$Ru6vnbRR-(E%)=bt3kV-RGFroE%Oj0)+a0tlkk zIBVrD9SBip82yNTDWd5VFJy|m>IWHLS86g4=kQeM`O7E!K*mP@;o-- zzD84SHuG|djYpB?N6Lj z%W5_sT*)rEt5a$DYI%mtS@1_Kn>%cx3pdxmG&)-fBLiJxP7L3EM6IHZeD_9h^BxakXO=@S9=@!nUp7IYDn8z9 zw+m?o54UL>JWbsjnd6+DY+AVuVwl)sVghA(DjF2v7XnNapqcv1dEDZ*!$hThN;0Lt z!DCgrT?|npUmVX?FkK7hT+P=-Qw3XD=~M+wXENGK9=GHQ=6tJ}|KwD%&_R`zus~sn zrGjgx04R0iCJ03P7 zs~VF0%Fc)Ak!pz5*d^Tr(WG%?+yO}(^2r^L$&_~OfCXAPM3U6KD9W#e$y&aB*uB*v zdKXPcYWbSW+5(XZhlSiVFprBiPy_S*mR7rHs^2nEwsZ{jStHCH9P)N8%%$&YxS})a z;8_Nm>L5e(`#Har>z|)+5>g$lhC->`+Njn<&ePOrIiH+Qy(K{j8 zPswhTq4hg?@)PL5&LOfqS3{%o7_l^wvIt&yVCVGD5b^8DK85se!2AGzHxs2{8cpXK zAtunCB$doRVI#=1hf{v1md9h@4QV`djga9V4~N)H-r6Qe9U6i3DjCsJL^Ig5@*Fva z?Py6eSheyHIHXod`=_IxW|*XPNWmQrv31cD)dDu&*v>tCZRE8;io%f~AG%v$hQg63 z4v{o=7r!zcQ)J2PHlFZJyLcBINiy=`E=Z?0TOe}hZoVgf>*jsWDuWOu-+TnSxH+%2 zLOO4W6y>v4_#M5qi@heed3PK#TJHvjk~5*_TUno(=xMqK>DAIEo94KDvyA(i=9`DqV~UtY$|kg>=QUmsqeuo0fp8F zsVMb4oS_$vGjj2-AbuLCspx%%wspf(0+K%bWv$K=tW|c3T^aZ4WUcEbS&N~5GQ7`E z`%gn3C7hC< zb+Ir6$lb$X9=pJT?6)BdJ3iwJUyy&UhoOUA%+zyLN5K~v+VG(ag;Uab=%M|+e9X86 zU()4^@{<9DID#PElo^3=p1<=1`McHStdRPi4(NfxxzKR$sN%u6mUDoV{uuAKaFsk}f? zYHog6s!M4>PG)jqNow)tjmlFP*<4`S)=!q&tjqz^GCA;|$mD=}p2@o6{$^!su&c delta 9 Qcmca}gn8c_<_&h|02tr|y#N3J diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index 2cfddd7f57c2371a832755a5ac9f25f7e74d1eaf..4a4fbeac1f727cc433a38e8d9ebaf1a9c4a87956 100644 GIT binary patch delta 1551 zcmZ44$2emV3Ea^QW?)r`=C}-!ntWe~7i7>|SFHY$o~$pz50YPK zBP)bq5hP?PMEO7}?CkOQ%*HLJv^c{#F*zeuiHp-YKd+=HKPNb~s4O!%HQpsZxg@_R z9>aKWz(B*F6QW3oOLOu;N8!omxY;&4GS#q8c3@?3hDa$Cq!y*+7v&Z!6a!65OixuP zN-ZuaDorjaEy~PGSI7s-CzfOaV_P9VPa!ogJu@#=j|*z{9ZR z9|QtU9~rSF6_C7z zwdiIqn|F-Z(h5lauC4Iqmv)aCu_cxd4wom}8;Qb#Fu`a#jG1U#17n`o_9H5VOa|H} u0f|G6BM5 z2`C^l*~M32@)FyU%}0#?BK)z?wqkOosletUvxi_o6wd&)PyTH#u$js78AwtMWZvXH zE1}J^t?z?XBYA7{LfdSRJrF;EEMDut3=+Wf6i70}87K+z70AdxF2b96+#W+5fb2D} ReI7BJ6TJmkCm#w30{~nCq(cAz -- 2.45.2 From 988c55c6b9c2556acbf8a0c410b0e978afe2ad86 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 7 Apr 2023 14:36:00 +0300 Subject: [PATCH 3/3] [#191] cli: Add `control shards doctor` command Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-cli/modules/control/doctor.go | 53 +++++++++++++++++++++++ cmd/frostfs-cli/modules/control/shards.go | 2 + 2 files changed, 55 insertions(+) create mode 100644 cmd/frostfs-cli/modules/control/doctor.go diff --git a/cmd/frostfs-cli/modules/control/doctor.go b/cmd/frostfs-cli/modules/control/doctor.go new file mode 100644 index 000000000..13bb81a0a --- /dev/null +++ b/cmd/frostfs-cli/modules/control/doctor.go @@ -0,0 +1,53 @@ +package control + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "github.com/spf13/cobra" +) + +const ( + concurrencyFlag = "concurrency" + removeDuplicatesFlag = "remove-duplicates" +) + +var doctorCmd = &cobra.Command{ + Use: "doctor", + Short: "Restructure node's storage", + Long: "Restructure node's storage", + Run: doctor, +} + +func doctor(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + + req := &control.DoctorRequest{Body: new(control.DoctorRequest_Body)} + req.Body.Concurrency, _ = cmd.Flags().GetUint32(concurrencyFlag) + req.Body.RemoveDuplicates, _ = cmd.Flags().GetBool(removeDuplicatesFlag) + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.DoctorResponse + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.Doctor(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + cmd.Println("Operation has finished.") +} + +func initControlDoctorCmd() { + initControlFlags(doctorCmd) + + ff := doctorCmd.Flags() + ff.Uint32(concurrencyFlag, 0, "Number of parallel threads to use") + ff.Bool(removeDuplicatesFlag, false, "Remove duplicate objects") +} diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index 6719a4acf..9d3eb5c01 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -17,6 +17,7 @@ func initControlShardsCmd() { shardsCmd.AddCommand(restoreShardCmd) shardsCmd.AddCommand(evacuateShardCmd) shardsCmd.AddCommand(flushCacheCmd) + shardsCmd.AddCommand(doctorCmd) initControlShardsListCmd() initControlSetShardModeCmd() @@ -24,4 +25,5 @@ func initControlShardsCmd() { initControlRestoreShardCmd() initControlEvacuateShardCmd() initControlFlushCacheCmd() + initControlDoctorCmd() } -- 2.45.2