From dbc3811ff4653c9262b0b9c31b19ab4655eef046 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 30 Mar 2023 14:49:15 +0300 Subject: [PATCH] [#191] engine: Allow to remove redundant object copies RemoveDuplicates() removes all duplicate object copies stored on multiple shards. All shards are processed and the command tries to leave a copy on the best shard according to HRW. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/engine.go | 2 + .../engine/remove_copies.go | 138 ++++++++++++ .../engine/remove_copies_test.go | 208 ++++++++++++++++++ pkg/local_object_storage/engine/shards.go | 15 +- 4 files changed, 358 insertions(+), 5 deletions(-) create mode 100644 pkg/local_object_storage/engine/remove_copies.go create mode 100644 pkg/local_object_storage/engine/remove_copies_test.go diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 4d154d2899..e0161bfe39 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -17,6 +17,8 @@ import ( type StorageEngine struct { *cfg + removeDuplicatesInProgress atomic.Bool + mtx *sync.RWMutex shards map[string]hashedShard diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go new file mode 100644 index 0000000000..d881a52d1b --- /dev/null +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -0,0 +1,138 @@ +package engine + +import ( + "context" + "errors" + + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/hrw" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress. +// We need it because `Rebalance` removes objects and executing it concurrently +// on 2 shards can lead to data loss. In future this restriction could be relaxed. +var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress") + +const defaultRemoveDuplicatesConcurrency = 256 + +type RemoveDuplicatesPrm struct { + Concurrency int +} + +// RemoveDuplicates iterates over all objects and removes duplicate object copies +// from shards which are worse as defined by HRW sort. +// Safety: +// 1. Concurrent execution is prohibited, thus 1 object copy should always be left. +// 2. If we delete an object from another thread, this is not a problem. Currently, +// we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects: +// policer and rebalance. For rebalance see (1). +// If policer removes something, we do not care if both copies are removed or one of them is left, +// as the remaining copy will be removed during the next policer iteration. +func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error { + if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) { + return errRemoveDuplicatesInProgress + } + defer e.removeDuplicatesInProgress.Store(false) + + if prm.Concurrency <= 0 { + prm.Concurrency = defaultRemoveDuplicatesConcurrency + } + + e.log.Info("starting removal of locally-redundant copies", + zap.Int("concurrency", prm.Concurrency)) + + // The mutext must be taken for the whole duration to avoid target shard being removed + // concurrently: this can lead to data loss. + e.mtx.RLock() + defer e.mtx.RUnlock() + + // Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously. + // This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0. + // However we could change weights in future and easily forget this function. + for _, sh := range e.shards { + e.log.Debug("started duplicates removal routine", zap.String("shard_id", sh.ID().String())) + ch := make(chan oid.Address) + + errG, ctx := errgroup.WithContext(ctx) + errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread + + errG.Go(func() error { + defer close(ch) + + var cursor *meta.Cursor + for { + var listPrm shard.ListWithCursorPrm + listPrm.WithCount(uint32(prm.Concurrency)) + listPrm.WithCursor(cursor) + res, err := sh.ListWithCursor(listPrm) + if err != nil { + if errors.Is(err, meta.ErrEndOfListing) { + return nil + } + return err + } + for _, addr := range res.AddressList() { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- addr.Address: + } + } + cursor = res.Cursor() + } + }) + + for i := 0; i < prm.Concurrency; i++ { + errG.Go(func() error { + return e.removeObjects(ctx, ch) + }) + } + if err := errG.Wait(); err != nil { + e.log.Error("finished removal of locally-redundant copies", zap.Error(err)) + return err + } + } + + e.log.Info("finished removal of locally-redundant copies") + return nil +} + +// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID. +func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error { + shards := make([]hashedShard, 0, len(e.shards)) + for _, sh := range e.shards { + shards = append(shards, sh) + } + + for addr := range ch { + h := hrw.Hash([]byte(addr.EncodeToString())) + shards := sortShardsByWeight(shards, h) + found := false + for i := range shards { + var existsPrm shard.ExistsPrm + existsPrm.SetAddress(addr) + + res, err := shards[i].Exists(existsPrm) + if err != nil { + return err + } else if !res.Exists() { + continue + } else if !found { + found = true + continue + } + + var deletePrm shard.DeletePrm + deletePrm.SetAddresses(addr) + _, err = shards[i].Delete(deletePrm) + if err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/local_object_storage/engine/remove_copies_test.go b/pkg/local_object_storage/engine/remove_copies_test.go new file mode 100644 index 0000000000..4415d01c88 --- /dev/null +++ b/pkg/local_object_storage/engine/remove_copies_test.go @@ -0,0 +1,208 @@ +package engine + +import ( + "context" + "sync" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestRebalance(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + const ( + objCount = 20 + copyCount = (objCount + 2) / 3 + ) + + type objectWithShard struct { + bestShard shard.ID + worstShard shard.ID + object *objectSDK.Object + } + + objects := make([]objectWithShard, objCount) + for i := range objects { + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + objects[i].object = obj + + shards := te.ng.sortShardsByWeight(object.AddressOf(obj)) + objects[i].bestShard = *shards[0].Shard.ID() + objects[i].worstShard = *shards[1].Shard.ID() + } + + for i := range objects { + var prm shard.PutPrm + prm.SetObject(objects[i].object) + + var err1, err2 error + te.ng.mtx.RLock() + // Every 3rd object (i%3 == 0) is put to both shards, others are distributed. + if i%3 != 1 { + _, err1 = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + } + if i%3 != 2 { + _, err2 = te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + } + te.ng.mtx.RUnlock() + + require.NoError(t, err1) + require.NoError(t, err2) + } + + var removedMtx sync.Mutex + var removed []deleteEvent + for _, shard := range te.shards { + id := *shard.id + shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) { + removedMtx.Lock() + removed = append(removed, deleteEvent{shardID: id, addr: prm.Address}) + removedMtx.Unlock() + return common.DeleteRes{}, nil + })) + } + + err := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + require.NoError(t, err) + + require.Equal(t, copyCount, len(removed)) + + removedMask := make([]bool, len(objects)) +loop: + for i := range removed { + for j := range objects { + if removed[i].addr == object.AddressOf(objects[j].object) { + require.Equal(t, objects[j].worstShard, removed[i].shardID, + "object %d was expected to be removed from another shard", j) + removedMask[j] = true + continue loop + } + } + require.FailNow(t, "unexpected object was removed", removed[i].addr) + } + + for i := 0; i < copyCount; i++ { + if i%3 == 0 { + require.True(t, removedMask[i], "object %d was expected to be removed", i) + } else { + require.False(t, removedMask[i], "object %d was not expected to be removed", i) + } + } +} + +func TestRebalanceSingleThread(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + + var prm shard.PutPrm + prm.SetObject(obj) + te.ng.mtx.RLock() + _, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + _, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + te.ng.mtx.RUnlock() + require.NoError(t, err1) + require.NoError(t, err2) + + signal := make(chan struct{}) // unblock rebalance + started := make(chan struct{}) // make sure rebalance is started + for _, shard := range te.shards { + shard.largeFileStorage.SetOption(teststore.WithDelete(func(common.DeletePrm) (common.DeleteRes, error) { + close(started) + <-signal + return common.DeleteRes{}, nil + })) + } + + var firstErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + firstErr = te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + }() + + <-started + secondErr := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{}) + require.ErrorIs(t, secondErr, errRemoveDuplicatesInProgress) + + close(signal) + wg.Wait() + require.NoError(t, firstErr) +} + +type deleteEvent struct { + shardID shard.ID + addr oid.Address +} + +func TestRebalanceExitByContext(t *testing.T) { + te := newEngineWithErrorThreshold(t, "", 0) + + objects := make([]*objectSDK.Object, 4) + for i := range objects { + obj := testutil.GenerateObjectWithCID(cidtest.ID()) + obj.SetPayload(make([]byte, errSmallSize)) + objects[i] = obj + } + + for i := range objects { + var prm shard.PutPrm + prm.SetObject(objects[i]) + + te.ng.mtx.RLock() + _, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm) + _, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm) + te.ng.mtx.RUnlock() + + require.NoError(t, err1) + require.NoError(t, err2) + } + + var removed []deleteEvent + deleteCh := make(chan struct{}) + signal := make(chan struct{}) + for _, shard := range te.shards { + id := *shard.id + shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) { + deleteCh <- struct{}{} + <-signal + removed = append(removed, deleteEvent{shardID: id, addr: prm.Address}) + return common.DeleteRes{}, nil + })) + } + + ctx, cancel := context.WithCancel(context.Background()) + + var rebalanceErr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + rebalanceErr = te.ng.RemoveDuplicates(ctx, RemoveDuplicatesPrm{Concurrency: 1}) + }() + + const removeCount = 3 + for i := 0; i < removeCount-1; i++ { + <-deleteCh + signal <- struct{}{} + } + <-deleteCh + cancel() + close(signal) + + wg.Wait() + require.ErrorIs(t, rebalanceErr, context.Canceled) + require.Equal(t, removeCount, len(removed)) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 34210d8354..2b1146ff22 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -208,16 +208,21 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s e.mtx.RLock() defer e.mtx.RUnlock() + h := hrw.Hash([]byte(objAddr.EncodeToString())) shards := make([]hashedShard, 0, len(e.shards)) - weights := make([]float64, 0, len(e.shards)) - for _, sh := range e.shards { shards = append(shards, hashedShard(sh)) - weights = append(weights, e.shardWeight(sh.Shard)) + } + return sortShardsByWeight(shards, h) +} + +func sortShardsByWeight(shards []hashedShard, h uint64) []hashedShard { + weights := make([]float64, 0, len(shards)) + for _, sh := range shards { + weights = append(weights, float64(sh.Shard.WeightValues().FreeSpace)) } - hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString()))) - + hrw.SortHasherSliceByWeightValue(shards, weights, h) return shards }