engine: Allow to remove redundant object copies #191
53
cmd/frostfs-cli/modules/control/doctor.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
concurrencyFlag = "concurrency"
|
||||||
|
removeDuplicatesFlag = "remove-duplicates"
|
||||||
|
)
|
||||||
|
|
||||||
|
var doctorCmd = &cobra.Command{
|
||||||
|
Use: "doctor",
|
||||||
|
Short: "Restructure node's storage",
|
||||||
|
Long: "Restructure node's storage",
|
||||||
|
Run: doctor,
|
||||||
|
}
|
||||||
|
|
||||||
|
func doctor(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
|
req := &control.DoctorRequest{Body: new(control.DoctorRequest_Body)}
|
||||||
|
req.Body.Concurrency, _ = cmd.Flags().GetUint32(concurrencyFlag)
|
||||||
aarifullin marked this conversation as resolved
Outdated
|
|||||||
|
req.Body.RemoveDuplicates, _ = cmd.Flags().GetBool(removeDuplicatesFlag)
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.DoctorResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.Doctor(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
cmd.Println("Operation has finished.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func initControlDoctorCmd() {
|
||||||
|
initControlFlags(doctorCmd)
|
||||||
|
|
||||||
|
ff := doctorCmd.Flags()
|
||||||
|
ff.Uint32(concurrencyFlag, 0, "Number of parallel threads to use")
|
||||||
|
ff.Bool(removeDuplicatesFlag, false, "Remove duplicate objects")
|
||||||
|
}
|
|
@ -17,6 +17,7 @@ func initControlShardsCmd() {
|
||||||
shardsCmd.AddCommand(restoreShardCmd)
|
shardsCmd.AddCommand(restoreShardCmd)
|
||||||
shardsCmd.AddCommand(evacuateShardCmd)
|
shardsCmd.AddCommand(evacuateShardCmd)
|
||||||
shardsCmd.AddCommand(flushCacheCmd)
|
shardsCmd.AddCommand(flushCacheCmd)
|
||||||
|
shardsCmd.AddCommand(doctorCmd)
|
||||||
|
|
||||||
initControlShardsListCmd()
|
initControlShardsListCmd()
|
||||||
initControlSetShardModeCmd()
|
initControlSetShardModeCmd()
|
||||||
|
@ -24,4 +25,5 @@ func initControlShardsCmd() {
|
||||||
initControlRestoreShardCmd()
|
initControlRestoreShardCmd()
|
||||||
initControlEvacuateShardCmd()
|
initControlEvacuateShardCmd()
|
||||||
initControlFlushCacheCmd()
|
initControlFlushCacheCmd()
|
||||||
|
initControlDoctorCmd()
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,8 @@ import (
|
||||||
type StorageEngine struct {
|
type StorageEngine struct {
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
|
removeDuplicatesInProgress atomic.Bool
|
||||||
|
|
||||||
mtx *sync.RWMutex
|
mtx *sync.RWMutex
|
||||||
|
|
||||||
shards map[string]hashedShard
|
shards map[string]hashedShard
|
||||||
|
|
138
pkg/local_object_storage/engine/remove_copies.go
Normal file
|
@ -0,0 +1,138 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
// errRemoveDuplicatesInProgress is returned when another rebalancing is in progress.
|
||||||
|
// We need it because `Rebalance` removes objects and executing it concurrently
|
||||||
|
// on 2 shards can lead to data loss. In future this restriction could be relaxed.
|
||||||
|
var errRemoveDuplicatesInProgress = errors.New("redundant copies removal is already in progress")
|
||||||
|
|
||||||
|
const defaultRemoveDuplicatesConcurrency = 256
|
||||||
|
|
||||||
|
type RemoveDuplicatesPrm struct {
|
||||||
|
Concurrency int
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveDuplicates iterates over all objects and removes duplicate object copies
|
||||||
|
// from shards which are worse as defined by HRW sort.
|
||||||
|
// Safety:
|
||||||
|
// 1. Concurrent execution is prohibited, thus 1 object copy should always be left.
|
||||||
|
// 2. If we delete an object from another thread, this is not a problem. Currently,
|
||||||
|
// we have 2 thread that can remove "valid" (non-expired and logically non-removed) objects:
|
||||||
|
// policer and rebalance. For rebalance see (1).
|
||||||
|
// If policer removes something, we do not care if both copies are removed or one of them is left,
|
||||||
|
// as the remaining copy will be removed during the next policer iteration.
|
||||||
|
func (e *StorageEngine) RemoveDuplicates(ctx context.Context, prm RemoveDuplicatesPrm) error {
|
||||||
|
if !e.removeDuplicatesInProgress.CompareAndSwap(false, true) {
|
||||||
|
return errRemoveDuplicatesInProgress
|
||||||
|
}
|
||||||
|
defer e.removeDuplicatesInProgress.Store(false)
|
||||||
|
|
||||||
|
if prm.Concurrency <= 0 {
|
||||||
|
prm.Concurrency = defaultRemoveDuplicatesConcurrency
|
||||||
|
}
|
||||||
|
|
||||||
|
e.log.Info("starting removal of locally-redundant copies",
|
||||||
|
zap.Int("concurrency", prm.Concurrency))
|
||||||
|
|
||||||
|
// The mutext must be taken for the whole duration to avoid target shard being removed
|
||||||
|
// concurrently: this can lead to data loss.
|
||||||
|
e.mtx.RLock()
|
||||||
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
|
// Iterate by shards to be sure that no objects from 2 different shards are removed simultaneously.
|
||||||
|
// This is not currently the case, because `FreeSpace` metric used by weight sorting is always 0.
|
||||||
|
// However we could change weights in future and easily forget this function.
|
||||||
|
for _, sh := range e.shards {
|
||||||
|
e.log.Debug("started duplicates removal routine", zap.String("shard_id", sh.ID().String()))
|
||||||
|
ch := make(chan oid.Address)
|
||||||
|
|
||||||
|
errG, ctx := errgroup.WithContext(ctx)
|
||||||
|
errG.SetLimit(prm.Concurrency + 1) // +1 for the listing thread
|
||||||
|
|
||||||
|
errG.Go(func() error {
|
||||||
|
defer close(ch)
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Why count = Why count = ```prm.Concurrency```?
fyrchik
commented
Even named constant is magic in this case and it seems logic to depend on the number of workers which process listed object. What else could we use here? Even named constant is magic in this case and it seems logic to depend on the number of workers which process listed object.
What else could we use here?
dstepanov-yadro
commented
If
If I knew for sure... But if you don't have any other ideas, I agree with this approach. If ```prm.Concurrency = 1``` then there will be too many bbolt requests, it seems to me.
> What else could we use here?
If I knew for sure... But if you don't have any other ideas, I agree with this approach.
|
|||||||
|
|
||||||
|
var cursor *meta.Cursor
|
||||||
|
for {
|
||||||
|
var listPrm shard.ListWithCursorPrm
|
||||||
|
listPrm.WithCount(uint32(prm.Concurrency))
|
||||||
|
listPrm.WithCursor(cursor)
|
||||||
|
res, err := sh.ListWithCursor(listPrm)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, meta.ErrEndOfListing) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, addr := range res.AddressList() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case ch <- addr.Address:
|
||||||
|
}
|
||||||
|
}
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Not Not ```defaultRemoveDuplicatesConcurrency```, but ```prm.Concurrency``` ?
fyrchik
commented
Fixed. Fixed.
|
|||||||
|
cursor = res.Cursor()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := 0; i < prm.Concurrency; i++ {
|
||||||
|
errG.Go(func() error {
|
||||||
|
return e.removeObjects(ctx, ch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := errG.Wait(); err != nil {
|
||||||
|
e.log.Error("finished removal of locally-redundant copies", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
e.log.Info("finished removal of locally-redundant copies")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeObjects reads addresses from ch and removes all objects from other shards, excluding excludeID.
|
||||||
|
func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address) error {
|
||||||
|
shards := make([]hashedShard, 0, len(e.shards))
|
||||||
|
for _, sh := range e.shards {
|
||||||
|
shards = append(shards, sh)
|
||||||
|
}
|
||||||
|
|
||||||
|
for addr := range ch {
|
||||||
|
h := hrw.Hash([]byte(addr.EncodeToString()))
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Do we need to exclude shard, where object was found? If object is placed on single shard, will it be deleted? Do we need to exclude shard, where object was found? If object is placed on single shard, will it be deleted?
aarifullin
commented
It won't and I guess that why > If object is placed on single shard, will it be deleted?
It won't and I guess that why `found` flag is needed. The deletion of the first object is ignored by the flag. If object is met again, then `_, err = shards[i].Delete(deletePrm)`
fyrchik
commented
Here is the logic:
Here is the logic:
1. Take object X from the shard A.
2. Sort shards with HRW.
3. The first shard an object is found in is considered "the best".
4. The object is removed from all other shards.
|
|||||||
|
shards := sortShardsByWeight(shards, h)
|
||||||
|
found := false
|
||||||
|
for i := range shards {
|
||||||
|
var existsPrm shard.ExistsPrm
|
||||||
|
existsPrm.SetAddress(addr)
|
||||||
|
|
||||||
|
res, err := shards[i].Exists(existsPrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !res.Exists() {
|
||||||
|
continue
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Wouldn't be helpful to log shards that have had the same object? Wouldn't be helpful to log shards that have had the same object?
fyrchik
commented
Given the amount of logs we have, no. The only use-case I see is for testing. Given the amount of logs we have, no. The only use-case I see is for testing.
Deletion operation is already logged, may be we can add a single log entry when we start processing a shard.
|
|||||||
|
} else if !found {
|
||||||
|
found = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var deletePrm shard.DeletePrm
|
||||||
|
deletePrm.SetAddresses(addr)
|
||||||
|
_, err = shards[i].Delete(deletePrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
208
pkg/local_object_storage/engine/remove_copies_test.go
Normal file
|
@ -0,0 +1,208 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRebalance(t *testing.T) {
|
||||||
|
te := newEngineWithErrorThreshold(t, "", 0)
|
||||||
|
|
||||||
|
const (
|
||||||
|
objCount = 20
|
||||||
|
copyCount = (objCount + 2) / 3
|
||||||
|
)
|
||||||
|
|
||||||
|
type objectWithShard struct {
|
||||||
|
bestShard shard.ID
|
||||||
|
worstShard shard.ID
|
||||||
|
object *objectSDK.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
objects := make([]objectWithShard, objCount)
|
||||||
|
for i := range objects {
|
||||||
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||||
|
obj.SetPayload(make([]byte, errSmallSize))
|
||||||
|
objects[i].object = obj
|
||||||
|
|
||||||
|
shards := te.ng.sortShardsByWeight(object.AddressOf(obj))
|
||||||
|
objects[i].bestShard = *shards[0].Shard.ID()
|
||||||
|
objects[i].worstShard = *shards[1].Shard.ID()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range objects {
|
||||||
|
var prm shard.PutPrm
|
||||||
|
prm.SetObject(objects[i].object)
|
||||||
|
|
||||||
|
var err1, err2 error
|
||||||
|
te.ng.mtx.RLock()
|
||||||
|
// Every 3rd object (i%3 == 0) is put to both shards, others are distributed.
|
||||||
|
if i%3 != 1 {
|
||||||
|
_, err1 = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||||
|
}
|
||||||
|
if i%3 != 2 {
|
||||||
|
_, err2 = te.ng.shards[te.shards[1].id.String()].Shard.Put(prm)
|
||||||
|
}
|
||||||
|
te.ng.mtx.RUnlock()
|
||||||
|
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.NoError(t, err2)
|
||||||
|
}
|
||||||
|
|
||||||
|
var removedMtx sync.Mutex
|
||||||
|
var removed []deleteEvent
|
||||||
|
for _, shard := range te.shards {
|
||||||
|
id := *shard.id
|
||||||
|
shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
|
removedMtx.Lock()
|
||||||
|
removed = append(removed, deleteEvent{shardID: id, addr: prm.Address})
|
||||||
|
removedMtx.Unlock()
|
||||||
|
return common.DeleteRes{}, nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
err := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, copyCount, len(removed))
|
||||||
|
|
||||||
|
removedMask := make([]bool, len(objects))
|
||||||
|
loop:
|
||||||
|
for i := range removed {
|
||||||
|
for j := range objects {
|
||||||
|
if removed[i].addr == object.AddressOf(objects[j].object) {
|
||||||
|
require.Equal(t, objects[j].worstShard, removed[i].shardID,
|
||||||
|
"object %d was expected to be removed from another shard", j)
|
||||||
|
removedMask[j] = true
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.FailNow(t, "unexpected object was removed", removed[i].addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < copyCount; i++ {
|
||||||
|
if i%3 == 0 {
|
||||||
|
require.True(t, removedMask[i], "object %d was expected to be removed", i)
|
||||||
|
} else {
|
||||||
|
require.False(t, removedMask[i], "object %d was not expected to be removed", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRebalanceSingleThread(t *testing.T) {
|
||||||
|
te := newEngineWithErrorThreshold(t, "", 0)
|
||||||
|
|
||||||
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||||
|
obj.SetPayload(make([]byte, errSmallSize))
|
||||||
|
|
||||||
|
var prm shard.PutPrm
|
||||||
|
prm.SetObject(obj)
|
||||||
|
te.ng.mtx.RLock()
|
||||||
|
_, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||||
|
_, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm)
|
||||||
|
te.ng.mtx.RUnlock()
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.NoError(t, err2)
|
||||||
|
|
||||||
|
signal := make(chan struct{}) // unblock rebalance
|
||||||
|
started := make(chan struct{}) // make sure rebalance is started
|
||||||
|
for _, shard := range te.shards {
|
||||||
|
shard.largeFileStorage.SetOption(teststore.WithDelete(func(common.DeletePrm) (common.DeleteRes, error) {
|
||||||
|
close(started)
|
||||||
|
<-signal
|
||||||
|
return common.DeleteRes{}, nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
var firstErr error
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
firstErr = te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{})
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-started
|
||||||
|
secondErr := te.ng.RemoveDuplicates(context.Background(), RemoveDuplicatesPrm{})
|
||||||
|
require.ErrorIs(t, secondErr, errRemoveDuplicatesInProgress)
|
||||||
|
|
||||||
|
close(signal)
|
||||||
|
wg.Wait()
|
||||||
|
require.NoError(t, firstErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
type deleteEvent struct {
|
||||||
|
shardID shard.ID
|
||||||
|
addr oid.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRebalanceExitByContext(t *testing.T) {
|
||||||
|
te := newEngineWithErrorThreshold(t, "", 0)
|
||||||
|
|
||||||
|
objects := make([]*objectSDK.Object, 4)
|
||||||
|
for i := range objects {
|
||||||
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||||
|
obj.SetPayload(make([]byte, errSmallSize))
|
||||||
|
objects[i] = obj
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range objects {
|
||||||
|
var prm shard.PutPrm
|
||||||
|
prm.SetObject(objects[i])
|
||||||
|
|
||||||
|
te.ng.mtx.RLock()
|
||||||
|
_, err1 := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||||
|
_, err2 := te.ng.shards[te.shards[1].id.String()].Shard.Put(prm)
|
||||||
|
te.ng.mtx.RUnlock()
|
||||||
|
|
||||||
|
require.NoError(t, err1)
|
||||||
|
require.NoError(t, err2)
|
||||||
|
}
|
||||||
|
|
||||||
|
var removed []deleteEvent
|
||||||
|
deleteCh := make(chan struct{})
|
||||||
|
signal := make(chan struct{})
|
||||||
|
for _, shard := range te.shards {
|
||||||
|
id := *shard.id
|
||||||
|
shard.largeFileStorage.SetOption(teststore.WithDelete(func(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
|
deleteCh <- struct{}{}
|
||||||
|
<-signal
|
||||||
|
removed = append(removed, deleteEvent{shardID: id, addr: prm.Address})
|
||||||
|
return common.DeleteRes{}, nil
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
var rebalanceErr error
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
rebalanceErr = te.ng.RemoveDuplicates(ctx, RemoveDuplicatesPrm{Concurrency: 1})
|
||||||
|
}()
|
||||||
|
|
||||||
|
const removeCount = 3
|
||||||
|
for i := 0; i < removeCount-1; i++ {
|
||||||
|
<-deleteCh
|
||||||
|
signal <- struct{}{}
|
||||||
|
}
|
||||||
|
<-deleteCh
|
||||||
|
cancel()
|
||||||
|
close(signal)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
require.ErrorIs(t, rebalanceErr, context.Canceled)
|
||||||
|
require.Equal(t, removeCount, len(removed))
|
||||||
|
}
|
|
@ -208,16 +208,21 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
|
h := hrw.Hash([]byte(objAddr.EncodeToString()))
|
||||||
shards := make([]hashedShard, 0, len(e.shards))
|
shards := make([]hashedShard, 0, len(e.shards))
|
||||||
weights := make([]float64, 0, len(e.shards))
|
|
||||||
|
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
shards = append(shards, hashedShard(sh))
|
shards = append(shards, hashedShard(sh))
|
||||||
weights = append(weights, e.shardWeight(sh.Shard))
|
}
|
||||||
|
return sortShardsByWeight(shards, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortShardsByWeight(shards []hashedShard, h uint64) []hashedShard {
|
||||||
|
weights := make([]float64, 0, len(shards))
|
||||||
|
for _, sh := range shards {
|
||||||
|
weights = append(weights, float64(sh.Shard.WeightValues().FreeSpace))
|
||||||
}
|
}
|
||||||
|
|
||||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
|
hrw.SortHasherSliceByWeightValue(shards, weights, h)
|
||||||
|
|
||||||
return shards
|
return shards
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error {
|
||||||
w.FlushCacheResponse = r
|
w.FlushCacheResponse = r
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type doctorResponseWrapper struct {
|
||||||
|
*DoctorResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *doctorResponseWrapper) ToGRPCMessage() grpc.Message {
|
||||||
|
return w.DoctorResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *doctorResponseWrapper) FromGRPCMessage(m grpc.Message) error {
|
||||||
|
r, ok := m.(*DoctorResponse)
|
||||||
|
if !ok {
|
||||||
|
return message.NewUnexpectedMessageType(m, (*DoctorResponse)(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
w.DoctorResponse = r
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ const (
|
||||||
rpcSynchronizeTree = "SynchronizeTree"
|
rpcSynchronizeTree = "SynchronizeTree"
|
||||||
rpcEvacuateShard = "EvacuateShard"
|
rpcEvacuateShard = "EvacuateShard"
|
||||||
rpcFlushCache = "FlushCache"
|
rpcFlushCache = "FlushCache"
|
||||||
|
rpcDoctor = "Doctor"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||||
|
@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO
|
||||||
|
|
||||||
return wResp.FlushCacheResponse, nil
|
return wResp.FlushCacheResponse, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Doctor executes ControlService.Doctor RPC.
|
||||||
|
func Doctor(cli *client.Client, req *DoctorRequest, opts ...client.CallOption) (*DoctorResponse, error) {
|
||||||
|
wResp := &doctorResponseWrapper{new(DoctorResponse)}
|
||||||
|
wReq := &requestWrapper{m: req}
|
||||||
|
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDoctor), wReq, wResp, opts...)
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
There is no timeout for RPC call? There is no timeout for RPC call?
fyrchik
commented
It is hidden inside the client. https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/cmd/frostfs-cli/internal/client/sdk.go#L53 (yes, we could improve this) It is hidden inside the client. https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/cmd/frostfs-cli/internal/client/sdk.go#L53 (yes, we could improve this)
|
|||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.DoctorResponse, nil
|
||||||
|
}
|
||||||
|
|
37
pkg/services/control/server/doctor.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Server) Doctor(ctx context.Context, req *control.DoctorRequest) (*control.DoctorResponse, error) {
|
||||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Will Will ```context``` be canceled if the command execution is interrupted?
fyrchik
commented
Haven't tested this, but I would expect this from the gRPC. Haven't tested this, but I would expect this from the gRPC.
|
|||||||
|
err := s.isValidRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if !req.Body.RemoveDuplicates {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "operation not specified")
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm engine.RemoveDuplicatesPrm
|
||||||
|
prm.Concurrency = int(req.Body.Concurrency)
|
||||||
|
|
||||||
|
err = s.s.RemoveDuplicates(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &control.DoctorResponse{Body: &control.DoctorResponse_Body{}}
|
||||||
|
|
||||||
|
err = SignMessage(s.key, resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
BIN
pkg/services/control/service.pb.go
generated
|
@ -37,6 +37,9 @@ service ControlService {
|
||||||
|
|
||||||
// FlushCache moves all data from one shard to the others.
|
// FlushCache moves all data from one shard to the others.
|
||||||
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
|
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
|
||||||
|
|
||||||
|
// Doctor performs storage restructuring operations on engine.
|
||||||
|
rpc Doctor (DoctorRequest) returns (DoctorResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Health check request.
|
// Health check request.
|
||||||
|
@ -345,3 +348,28 @@ message FlushCacheResponse {
|
||||||
Body body = 1;
|
Body body = 1;
|
||||||
Signature signature = 2;
|
Signature signature = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Doctor request.
|
||||||
|
message DoctorRequest {
|
||||||
|
// Request body structure.
|
||||||
|
message Body {
|
||||||
|
// Number of threads to use for the operation.
|
||||||
|
uint32 concurrency = 1;
|
||||||
|
// Flag to search engine for duplicate objects and leave only one copy.
|
||||||
|
bool remove_duplicates = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Doctor response.
|
||||||
|
message DoctorResponse {
|
||||||
|
// Response body structure.
|
||||||
|
message Body {
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
||||||
|
|
[Optinal]
Using
GetUint32
,GetBool
is fine, but if new flags are added todoctor
command, then it will be not obvious thatconcurrencyFlag
is uint32 andsomeNewFlag
issomeType
I think that is fine to use global variables to initialize flags
We had a problem with this, because in some cases "the same" flag should have different descriptions/defaults in different commands. With many global variables this had become a mess.
Anyway, I suggest discussing it separately and implementing in all CLI commands atomically, after reaching consensus.