engine: Allow to remove redundant object copies #191

Merged
fyrchik merged 3 commits from fyrchik/frostfs-node:shard-reinsertion into master 2023-04-07 17:25:51 +00:00
13 changed files with 510 additions and 5 deletions

View file

@ -0,0 +1,53 @@
package control
import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/spf13/cobra"
)
const (
concurrencyFlag = "concurrency"
removeDuplicatesFlag = "remove-duplicates"
)
var doctorCmd = &cobra.Command{
Use: "doctor",
Short: "Restructure node's storage",
Long: "Restructure node's storage",
Run: doctor,
}
func doctor(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.DoctorRequest{Body: new(control.DoctorRequest_Body)}
req.Body.Concurrency, _ = cmd.Flags().GetUint32(concurrencyFlag)
aarifullin marked this conversation as resolved Outdated

[Optinal]

Using GetUint32, GetBool is fine, but if new flags are added to doctor command, then it will be not obvious that concurrencyFlag is uint32 and someNewFlag is someType

I think that is fine to use global variables to initialize flags

ff := doctorCmd.Flags()
ff.BoolVar(&concurrencyVarFlag, concurrencyFlag, false, "Remove duplicate objects")
[Optinal] Using `GetUint32`, `GetBool` is fine, but if new flags are added to `doctor` command, then it will be not obvious that `concurrencyFlag` is uint32 and `someNewFlag` is `someType` I think that is fine to use global variables to initialize flags ``` ff := doctorCmd.Flags() ff.BoolVar(&concurrencyVarFlag, concurrencyFlag, false, "Remove duplicate objects") ```

We had a problem with this, because in some cases "the same" flag should have different descriptions/defaults in different commands. With many global variables this had become a mess.

Anyway, I suggest discussing it separately and implementing in all CLI commands atomically, after reaching consensus.

We had a problem with this, because in some cases "the same" flag should have different descriptions/defaults in different commands. With many global variables this had become a mess. Anyway, I suggest discussing it separately and implementing in all CLI commands atomically, after reaching consensus.
req.Body.RemoveDuplicates, _ = cmd.Flags().GetBool(removeDuplicatesFlag)
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.DoctorResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.Doctor(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Operation has finished.")
}
func initControlDoctorCmd() {
initControlFlags(doctorCmd)
ff := doctorCmd.Flags()
ff.Uint32(concurrencyFlag, 0, "Number of parallel threads to use")
ff.Bool(removeDuplicatesFlag, false, "Remove duplicate objects")
}

View file

@ -17,6 +17,7 @@ func initControlShardsCmd() {
shardsCmd.AddCommand(restoreShardCmd) shardsCmd.AddCommand(restoreShardCmd)
shardsCmd.AddCommand(evacuateShardCmd) shardsCmd.AddCommand(evacuateShardCmd)
shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd)
initControlShardsListCmd() initControlShardsListCmd()
initControlSetShardModeCmd() initControlSetShardModeCmd()
@ -24,4 +25,5 @@ func initControlShardsCmd() {
initControlRestoreShardCmd() initControlRestoreShardCmd()
initControlEvacuateShardCmd() initControlEvacuateShardCmd()
initControlFlushCacheCmd() initControlFlushCacheCmd()
initControlDoctorCmd()
} }

View file

@ -17,6 +17,8 @@ import (
type StorageEngine struct { type StorageEngine struct {
*cfg *cfg
removeDuplicatesInProgress atomic.Bool
mtx *sync.RWMutex mtx *sync.RWMutex
shards map[string]hashedShard shards map[string]hashedShard

View file

@ -0,0 +1,138 @@
package engine
import (
"context"
"errors"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress.
// We need it because `Rebalance` removes objects and executing it concurrently
// on 2 shards can lead to data loss. In future this restriction could be relaxed.
var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress")
const defaultRemoveDuplicatesConcurrency = 256
type RemoveDuplicatesPrm struct {
Concurrency int
}
// RemoveDuplicates iterates over all objects and removes duplicate object copies
// from shards which are worse as defined by HRW sort.
// Safety:
// 1. Concurrent execution is prohibited, thus 1 object copy should always be left.
// 2. If we delete an object from another thread, this is not a problem. Currently,
// we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects:
// policer and rebalance. For rebalance see (1).
// If policer removes something, we do not care if both copies are removed or one of them is left,
// as the remaining copy will be removed during the next policer iteration.
func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error {
if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) {
return errRemoveDuplicatesInProgress
}
defer e.removeDuplicatesInProgress.Store(false)
if prm.Concurrency <= 0 {
prm.Concurrency = defaultRemoveDuplicatesConcurrency
}
e.log.Info("starting removal of locally-redundant copies",
zap.Int("concurrency", prm.Concurrency))
// The mutext must be taken for the whole duration to avoid target shard being removed
// concurrently: this can lead to data loss.
e.mtx.RLock()
defer e.mtx.RUnlock()
// Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously.
// This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0.
// However we could change weights in future and easily forget this function.
for _, sh := range e.shards {
e.log.Debug("started duplicates removal routine", zap.String("shard_id", sh.ID().String()))
ch := make(chan oid.Address)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread
errG.Go(func() error {
defer close(ch)
dstepanov-yadro marked this conversation as resolved Outdated

Why count = prm.Concurrency?

Why count = ```prm.Concurrency```?

Even named constant is magic in this case and it seems logic to depend on the number of workers which process listed object.

What else could we use here?

Even named constant is magic in this case and it seems logic to depend on the number of workers which process listed object. What else could we use here?

If prm.Concurrency = 1 then there will be too many bbolt requests, it seems to me.

What else could we use here?

If I knew for sure... But if you don't have any other ideas, I agree with this approach.

If ```prm.Concurrency = 1``` then there will be too many bbolt requests, it seems to me. > What else could we use here? If I knew for sure... But if you don't have any other ideas, I agree with this approach.
var cursor *meta.Cursor
for {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(uint32(prm.Concurrency))
listPrm.WithCursor(cursor)
res, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) {
return nil
}
return err
}
for _, addr := range res.AddressList() {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- addr.Address:
}
}
dstepanov-yadro marked this conversation as resolved Outdated

Not defaultRemoveDuplicatesConcurrency, but prm.Concurrency ?

Not ```defaultRemoveDuplicatesConcurrency```, but ```prm.Concurrency``` ?

Fixed.

Fixed.
cursor = res.Cursor()
}
})
for i := 0; i < prm.Concurrency; i++ {
errG.Go(func() error {
return e.removeObjects(ctx, ch)
})
}
if err := errG.Wait(); err != nil {
e.log.Error("finished removal of locally-redundant copies", zap.Error(err))
return err
}
}
e.log.Info("finished removal of locally-redundant copies")
return nil
}
// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID.
func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error {
shards := make([]hashedShard, 0, len(e.shards))
for _, sh := range e.shards {
shards = append(shards, sh)
}
for addr := range ch {
h := hrw.Hash([]byte(addr.EncodeToString()))
dstepanov-yadro marked this conversation as resolved Outdated

Do we need to exclude shard, where object was found? If object is placed on single shard, will it be deleted?

Do we need to exclude shard, where object was found? If object is placed on single shard, will it be deleted?

If object is placed on single shard, will it be deleted?

It won't and I guess that why found flag is needed. The deletion of the first object is ignored by the flag. If object is met again, then _, err = shards[i].Delete(deletePrm)

> If object is placed on single shard, will it be deleted? It won't and I guess that why `found` flag is needed. The deletion of the first object is ignored by the flag. If object is met again, then `_, err = shards[i].Delete(deletePrm)`

Here is the logic:

  1. Take object X from the shard A.
  2. Sort shards with HRW.
  3. The first shard an object is found in is considered "the best".
  4. The object is removed from all other shards.
Here is the logic: 1. Take object X from the shard A. 2. Sort shards with HRW. 3. The first shard an object is found in is considered "the best". 4. The object is removed from all other shards.
shards := sortShardsByWeight(shards, h)
found := false
for i := range shards {
var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addr)
res, err := shards[i].Exists(existsPrm)
if err != nil {
return err
} else if !res.Exists() {
continue
aarifullin marked this conversation as resolved Outdated

Wouldn't be helpful to log shards that have had the same object?

Wouldn't be helpful to log shards that have had the same object?

Given the amount of logs we have, no. The only use-case I see is for testing.
Deletion operation is already logged, may be we can add a single log entry when we start processing a shard.

Given the amount of logs we have, no. The only use-case I see is for testing. Deletion operation is already logged, may be we can add a single log entry when we start processing a shard.
} else if !found {
found = true
continue
}
var deletePrm shard.DeletePrm
deletePrm.SetAddresses(addr)
_, err = shards[i].Delete(deletePrm)
if err != nil {
return err
}
}
}
return nil
}

View file

@ -0,0 +1,208 @@
package engine
import (
"context"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestRebalance(t *testing.T) {
te := newEngineWithErrorThreshold(t, "", 0)
const (
objCount = 20
copyCount = (objCount + 2) / 3
)
type objectWithShard struct {
bestShard shard.ID
worstShard shard.ID
object *objectSDK.Object
}
objects := make([]objectWithShard, objCount)
for i := range objects {
obj := testutil.GenerateObjectWithCID(cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize))
objects[i].object = obj
shards := te.ng.sortShardsByWeight(object.AddressOf(obj))
objects[i].bestShard = *shards[0].Shard.ID()
objects[i].worstShard = *shards[1].Shard.ID()
}
for i := range objects {
var prm shard.PutPrm
prm.SetObject(objects[i].object)
var err1, err2 error
te.ng.mtx.RLock()
// Every 3rd object (i%3 == 0) is put to both shards, others are distributed.
if i%3 != 1 {
_, err1 = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
}
if i%3 != 2 {
_, err2 = te.ng.shards[te.shards[1].id.String()].Shard.Put(prm)
}
te.ng.mtx.RUnlock()
require.NoError(t, err1)
require.NoError(t, err2)
}
var removedMtx sync.Mutex
var removed []deleteEvent
for _, shard := range te.shards {
id := *shard.id
shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) {
removedMtx.Lock()
removed = append(removed, deleteEvent{shardID: id, addr: prm.Address})
removedMtx.Unlock()
return common.DeleteRes{}, nil
}))
}
err := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{})
require.NoError(t, err)
require.Equal(t, copyCount, len(removed))
removedMask := make([]bool, len(objects))
loop:
for i := range removed {
for j := range objects {
if removed[i].addr == object.AddressOf(objects[j].object) {
require.Equal(t, objects[j].worstShard, removed[i].shardID,
"object %d was expected to be removed from another shard", j)
removedMask[j] = true
continue loop
}
}
require.FailNow(t, "unexpected object was removed", removed[i].addr)
}
for i := 0; i < copyCount; i++ {
if i%3 == 0 {
require.True(t, removedMask[i], "object %d was expected to be removed", i)
} else {
require.False(t, removedMask[i], "object %d was not expected to be removed", i)
}
}
}
func TestRebalanceSingleThread(t *testing.T) {
te := newEngineWithErrorThreshold(t, "", 0)
obj := testutil.GenerateObjectWithCID(cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize))
var prm shard.PutPrm
prm.SetObject(obj)
te.ng.mtx.RLock()
_, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
_, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm)
te.ng.mtx.RUnlock()
require.NoError(t, err1)
require.NoError(t, err2)
signal := make(chan struct{}) // unblock rebalance
started := make(chan struct{}) // make sure rebalance is started
for _, shard := range te.shards {
shard.largeFileStorage.SetOption(teststore.WithDelete(func(common.DeletePrm) (common.DeleteRes, error) {
close(started)
<-signal
return common.DeleteRes{}, nil
}))
}
var firstErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
firstErr = te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{})
}()
<-started
secondErr := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{})
require.ErrorIs(t, secondErr, errRemoveDuplicatesInProgress)
close(signal)
wg.Wait()
require.NoError(t, firstErr)
}
type deleteEvent struct {
shardID shard.ID
addr oid.Address
}
func TestRebalanceExitByContext(t *testing.T) {
te := newEngineWithErrorThreshold(t, "", 0)
objects := make([]*objectSDK.Object, 4)
for i := range objects {
obj := testutil.GenerateObjectWithCID(cidtest.ID())
obj.SetPayload(make([]byte, errSmallSize))
objects[i] = obj
}
for i := range objects {
var prm shard.PutPrm
prm.SetObject(objects[i])
te.ng.mtx.RLock()
_, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
_, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm)
te.ng.mtx.RUnlock()
require.NoError(t, err1)
require.NoError(t, err2)
}
var removed []deleteEvent
deleteCh := make(chan struct{})
signal := make(chan struct{})
for _, shard := range te.shards {
id := *shard.id
shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) {
deleteCh <- struct{}{}
<-signal
removed = append(removed, deleteEvent{shardID: id, addr: prm.Address})
return common.DeleteRes{}, nil
}))
}
ctx, cancel := context.WithCancel(context.Background())
var rebalanceErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
rebalanceErr = te.ng.RemoveDuplicates(ctx, RemoveDuplicatesPrm{Concurrency: 1})
}()
const removeCount = 3
for i := 0; i < removeCount-1; i++ {
<-deleteCh
signal <- struct{}{}
}
<-deleteCh
cancel()
close(signal)
wg.Wait()
require.ErrorIs(t, rebalanceErr, context.Canceled)
require.Equal(t, removeCount, len(removed))
}

View file

@ -208,16 +208,21 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
h := hrw.Hash([]byte(objAddr.EncodeToString()))
shards := make([]hashedShard, 0, len(e.shards)) shards := make([]hashedShard, 0, len(e.shards))
weights := make([]float64, 0, len(e.shards))
for _, sh := range e.shards { for _, sh := range e.shards {
shards = append(shards, hashedShard(sh)) shards = append(shards, hashedShard(sh))
weights = append(weights, e.shardWeight(sh.Shard)) }
return sortShardsByWeight(shards, h)
}
func sortShardsByWeight(shards []hashedShard, h uint64) []hashedShard {
weights := make([]float64, 0, len(shards))
for _, sh := range shards {
weights = append(weights, float64(sh.Shard.WeightValues().FreeSpace))
} }
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString()))) hrw.SortHasherSliceByWeightValue(shards, weights, h)
return shards return shards
} }

View file

@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.FlushCacheResponse = r w.FlushCacheResponse = r
return nil return nil
} }
type doctorResponseWrapper struct {
*DoctorResponse
}
func (w *doctorResponseWrapper) ToGRPCMessage() grpc.Message {
return w.DoctorResponse
}
func (w *doctorResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*DoctorResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*DoctorResponse)(nil))
}
w.DoctorResponse = r
return nil
}

View file

@ -18,6 +18,7 @@ const (
rpcSynchronizeTree = "SynchronizeTree" rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard" rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache" rpcFlushCache = "FlushCache"
rpcDoctor = "Doctor"
) )
// HealthCheck executes ControlService.HealthCheck RPC. // HealthCheck executes ControlService.HealthCheck RPC.
@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO
return wResp.FlushCacheResponse, nil return wResp.FlushCacheResponse, nil
} }
// Doctor executes ControlService.Doctor RPC.
func Doctor(cli *client.Client, req *DoctorRequest, opts ...client.CallOption) (*DoctorResponse, error) {
wResp := &doctorResponseWrapper{new(DoctorResponse)}
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDoctor), wReq, wResp, opts...)
dstepanov-yadro marked this conversation as resolved Outdated

There is no timeout for RPC call?

There is no timeout for RPC call?
It is hidden inside the client. https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/cmd/frostfs-cli/internal/client/sdk.go#L53 (yes, we could improve this)
if err != nil {
return nil, err
}
return wResp.DoctorResponse, nil
}

View file

@ -0,0 +1,37 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) Doctor(ctx context.Context, req *control.DoctorRequest) (*control.DoctorResponse, error) {
dstepanov-yadro marked this conversation as resolved Outdated

Will context be canceled if the command execution is interrupted?

Will ```context``` be canceled if the command execution is interrupted?

Haven't tested this, but I would expect this from the gRPC.

Haven't tested this, but I would expect this from the gRPC.
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if !req.Body.RemoveDuplicates {
return nil, status.Error(codes.InvalidArgument, "operation not specified")
}
var prm engine.RemoveDuplicatesPrm
prm.Concurrency = int(req.Body.Concurrency)
err = s.s.RemoveDuplicates(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.DoctorResponse{Body: &control.DoctorResponse_Body{}}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

Binary file not shown.

View file

@ -37,6 +37,9 @@ service ControlService {
// FlushCache moves all data from one shard to the others. // FlushCache moves all data from one shard to the others.
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse); rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
// Doctor performs storage restructuring operations on engine.
rpc Doctor (DoctorRequest) returns (DoctorResponse);
} }
// Health check request. // Health check request.
@ -345,3 +348,28 @@ message FlushCacheResponse {
Body body = 1; Body body = 1;
Signature signature = 2; Signature signature = 2;
} }
// Doctor request.
message DoctorRequest {
// Request body structure.
message Body {
// Number of threads to use for the operation.
uint32 concurrency = 1;
// Flag to search engine for duplicate objects and leave only one copy.
bool remove_duplicates = 2;
}
Body body = 1;
Signature signature = 2;
}
// Doctor response.
message DoctorResponse {
// Response body structure.
message Body {
}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.