engine: Evacuate object from shards concurrently #1356

Merged
acid-ant merged 1 commit from acid-ant/frostfs-node:feat/evac-parallel into master 2024-09-24 12:02:17 +00:00
8 changed files with 468 additions and 108 deletions

View file

@ -21,6 +21,9 @@ const (
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
scopeFlag = "scope" scopeFlag = "scope"
containerWorkerCountFlag = "container-worker-count"
dstepanov-yadro marked this conversation as resolved Outdated

I have doubts that this parameter is necessary. Can you test with a value of 1 and a value of 10, for example, for 100 containers?

I have doubts that this parameter is necessary. Can you test with a value of 1 and a value of 10, for example, for 100 containers?

It will be helpful for case when we need to evacuate containers with policy REP 1 only.
In a bad case, we need ~100ms to get info about container from neo-go.

For 1_000 containers, we need around 1m to iterate over containers with container-worker-count = 1.
For 1_000 containers, we need around 10s to iterate over containers with container-worker-count = 10.

For 10_000 containers, we need around 16m to iterate over containers with container-worker-count = 1.
For 10_000 containers, we need around 1m40s to iterate over containers with container-worker-count = 10.

Measured with unit test here.

It will be helpful for case when we need to evacuate containers with policy `REP 1` only. In a bad case, we need ~100ms to get info about container from `neo-go`. For 1_000 containers, we need around 1m to iterate over containers with `container-worker-count` = 1. For 1_000 containers, we need around 10s to iterate over containers with `container-worker-count` = 10. For 10_000 containers, we need around 16m to iterate over containers with `container-worker-count` = 1. For 10_000 containers, we need around 1m40s to iterate over containers with `container-worker-count` = 10. Measured with unit test [here]([url](https://git.frostfs.info/TrueCloudLab/frostfs-node/commit/2218e42a10518d6be4d4375003a0a0dbe8e348d6)).
objectWorkerCountFlag = "object-worker-count"
scopeAll = "all" scopeAll = "all"
scopeObjects = "objects" scopeObjects = "objects"
scopeTrees = "trees" scopeTrees = "trees"
@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
req := &control.StartShardEvacuationRequest{ req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{ Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd), Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors, IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd), Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount,
}, },
} }
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
dstepanov-yadro marked this conversation as resolved Outdated

maybe set default values here, but not on the server side?

maybe set default values here, but not on the server side?

No, that will lead to resource leak on a server side - when another client may set it zero values.

No, that will lead to resource leak on a server side - when another client may set it zero values.

On server side just check that those values are greater than zero.

On server side just check that those values are greater than zero.
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
} }

View file

@ -10,7 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -24,6 +23,16 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
// containerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
containerWorkerCountDefault = 10
// objectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
objectWorkerCountDefault = 10
) )
var ( var (
@ -79,6 +88,9 @@ type EvacuateShardPrm struct {
IgnoreErrors bool IgnoreErrors bool
Async bool Async bool
Scope EvacuateScope Scope EvacuateScope
ContainerWorkerCount uint32
ObjectWorkerCount uint32
} }
// EvacuateShardRes represents result of the EvacuateShard operation. // EvacuateShardRes represents result of the EvacuateShard operation.
@ -189,8 +201,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res return res
} }
const defaultEvacuateBatchSize = 100
type pooledShard struct { type pooledShard struct {
hashedShard hashedShard
pool util.WorkerPool pool util.WorkerPool
@ -242,8 +252,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return nil, err return nil, err
} }
var mtx sync.RWMutex
copyShards := func() []pooledShard {
mtx.RLock()
defer mtx.RUnlock()
t := make([]pooledShard, len(shards))
copy(t, shards)
return t
}
eg.Go(func() error { eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate) return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
}) })
if prm.Async { if prm.Async {
@ -261,7 +279,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
} }
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
var err error var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
@ -287,13 +305,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err return err
} }
for _, shardID := range shardIDs { ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { continueLoop := true
for i := 0; continueLoop && i < len(shardIDs); i++ {
select {
case <-ctx.Done():
continueLoop = false
default:
egShard.Go(func() error {
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
if err != nil {
cancel(err)
}
return err
})
}
}
err = egShard.Wait()
if err != nil {
err = fmt.Errorf("shard error: %w", err)
}
errContainer := egContainer.Wait()
errObject := egObject.Wait()
if errContainer != nil {
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
}
if errObject != nil {
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
}
if err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err return err
} }
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs), zap.Strings("shard_ids", shardIDs),
@ -309,6 +353,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return nil return nil
} }
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
) {
operationCtx, cancel := context.WithCancelCause(ctx)
dstepanov-yadro marked this conversation as resolved Outdated

It is ok, but looks redundant.
This code will do almost the same:

egShard, ctx := errgroup.WithContext(ctx)
egContainer, ctx := errgroup.WithContext(ctx)
egObject, ctx := errgroup.WithContext(ctx)
It is ok, but looks redundant. This code will do almost the same: ``` egShard, ctx := errgroup.WithContext(ctx) egContainer, ctx := errgroup.WithContext(ctx) egObject, ctx := errgroup.WithContext(ctx) ```

eg.Wait() cancels context. Routines which process containers and shards will end earlier than routines for objects. That is why they can't depend on context of each other.

`eg.Wait()` cancels context. Routines which process containers and shards will end earlier than routines for objects. That is why they can't depend on context of each other.
egObject, _ := errgroup.WithContext(operationCtx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = objectWorkerCountDefault
}
egObject.SetLimit(int(objectWorkerCount))
egContainer, _ := errgroup.WithContext(operationCtx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = containerWorkerCountDefault
}
egContainer.SetLimit(int(containerWorkerCount))
egShard, _ := errgroup.WithContext(operationCtx)
return operationCtx, cancel, egShard, egContainer, egObject
}
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals") ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End() defer span.End()
@ -335,8 +400,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
return nil return nil
} }
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error { ) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes( trace.WithAttributes(
@ -345,11 +411,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
defer span.End() defer span.End()
if prm.Scope.WithObjects() { if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
return err return err
} }
} }
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err return err
@ -359,44 +424,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil return nil
} }
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error { ) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true) var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
var c *meta.Cursor select {
for { case <-ctx.Done():
listPrm.WithCursor(c) return context.Cause(ctx)
default:
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
} }
egContainer.Go(func() error {
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egObject.Go(func() error {
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
if err != nil {
cancel(err)
}
return err
})
return nil
}
err := sh.IterateOverObjectsInContainer(ctx, objPrm)
if err != nil {
cancel(err)
}
return err
})
return nil
}
sh.SetEvacuationInProgress(true)
err := sh.IterateOverContainers(ctx, cntPrm)
if err != nil {
cancel(err)
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
}
return nil
} }
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
shards := getShards()
var listPrm pilorama.TreeListTreesPrm var listPrm pilorama.TreeListTreesPrm
first := true first := true
@ -637,51 +718,49 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil return shards, nil
} }
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
defer span.End() defer span.End()
for i := range toEvacuate {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return context.Cause(ctx)
default: default:
} }
addr := toEvacuate[i].Address
shards := getShards()
addr := objInfo.Address
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true) getPrm.SkipEvacCheck(true)
getRes, err := sh.Get(ctx, getPrm) getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
if err != nil { if err != nil {
if prm.IgnoreErrors { if prm.IgnoreErrors {
res.objFailed.Add(1) res.objFailed.Add(1)
continue return nil
} }
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err return err
} }
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res) evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
if err != nil { if err != nil {
return err return err
} }
if evacuatedLocal { if evacuatedLocal {
continue return nil
} }
if prm.ObjectsHandler == nil { if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because // Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless. // ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) return fmt.Errorf("%w: %s", errPutShard, objInfo)
} }
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
@ -699,7 +778,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
} else { } else {
return fmt.Errorf("object %s was not replicated", addr) return fmt.Errorf("object %s was not replicated", addr)
} }
}
return nil return nil
} }

View file

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -174,13 +176,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
errReplication := errors.New("handler error") errReplication := errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n uint64 var n atomic.Uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
if n == max { if n.Load() == max {
return false, errReplication return false, errReplication
} }
n++ n.Add(1)
for i := range objects { for i := range objects {
if addr == objectCore.AddressOf(objects[i]) { if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj) require.Equal(t, objects[i], obj)
@ -314,6 +316,36 @@ func TestEvacuateCancellation(t *testing.T) {
require.Equal(t, uint64(0), res.ObjectsEvacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
} }
func TestEvacuateCancellationByError(t *testing.T) {
t.Parallel()
e, ids, _ := newEngineEvacuate(t, 2, 10)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
var once atomic.Bool
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
var err error
flag := true
if once.CompareAndSwap(false, true) {
err = errors.New("test error")
flag = false
}
return flag, err
}
prm.Scope = EvacuateScopeObjects
prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm)
require.ErrorContains(t, err, "test error")
}
func TestEvacuateSingleProcess(t *testing.T) { func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3) e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() { defer func() {
@ -531,6 +563,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move) evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids prm.ShardID = ids
@ -545,7 +578,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
if op.Time == 0 { if op.Time == 0 {
return true, "", nil return true, "", nil
} }
mutex.Lock()
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
mutex.Unlock()
height = op.Time + 1 height = op.Time + 1
} }
} }

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"time" "time"
@ -61,6 +62,20 @@ func (l ListRes) Cursor() *Cursor {
return l.cursor return l.cursor
} }
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {

The sentence must end with a period

The sentence must end with a period
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects. // cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests. // Use cursor value from response for consecutive requests.
@ -259,3 +274,155 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
return rawID, name[0] return rawID, name[0]
} }
// IterateOverContainers lists physical containers available in metabase starting from first.
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverContainers(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
var containerID cid.ID
for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} {
c := tx.Cursor()
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
cidRaw, _ := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
dstepanov-yadro marked this conversation as resolved Outdated

Replace with

for _, prefix := range []int{primaryPrefix, lockersPrefix, tombstonePrefix} {
   c := tx.Cursor()
   for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
   ...
   }
}

to reduce seeks

Replace with ``` for _, prefix := range []int{primaryPrefix, lockersPrefix, tombstonePrefix} { c := tx.Cursor() for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() { ... } } ``` to reduce seeks

Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once.

Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once.

Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others.
After fix iteration will be performed only on particular prefixes: name != nil && bytes.HasPrefix(name, prefix);

Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others. After fix iteration will be performed only on particular prefixes: `name != nil && bytes.HasPrefix(name, prefix); `

Gotcha, fixed.

Gotcha, fixed.
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID
copy(cnt[:], containerID[:])
dstepanov-yadro marked this conversation as resolved Outdated

To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also bkt := tx.Bucket(name) make another one cursor seek inside, so this looks redundant.

To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also `bkt := tx.Bucket(name)` make another one cursor seek inside, so this looks redundant.

Thanks, fixed. That was copy-paste from listWithCursor.

Thanks, fixed. That was copy-paste from `listWithCursor`.
err := prm.Handler(ctx, bktName, cnt)
if err != nil {
return err
}
}
}
return nil
}
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
dstepanov-yadro marked this conversation as resolved Outdated

Add check against nil

Add check against nil

Added.

Added.
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
) error {
bkt := tx.Bucket(prm.BucketName)
if bkt == nil {
return nil
dstepanov-yadro marked this conversation as resolved Outdated

Could be done before transaction start

Could be done before transaction start

Done.

Done.
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, v := c.First()
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(containerID)
a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo)
if err != nil {
return err
}
}
return nil
}

View file

@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID {
return r.containers return r.containers
} }
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon containers in db.
Handler func(context.Context, *objectcore.Info) error
}
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct { type ListWithCursorPrm struct {
count uint32 count uint32
@ -164,3 +178,54 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
cursor: res.Cursor(), cursor: res.Cursor(),
}, nil }, nil
} }
// IterateOverContainers lists physical containers presented in shard.
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
dstepanov-yadro marked this conversation as resolved Outdated

It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something?

It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something?

You are right, RLock here and in IterateOverObjectsInContainer is required. Fixed.

You are right, `RLock` here and in `IterateOverObjectsInContainer` is required. Fixed.
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over containers: %w", err)
}
return nil
}
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over objects: %w", err)
}
return nil
}

View file

@ -29,6 +29,8 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
TreeHandler: s.replicateTree, TreeHandler: s.replicateTree,
Async: true, Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()), Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
} }
_, err = s.s.Evacuate(ctx, prm) _, err = s.s.Evacuate(ctx, prm)

View file

@ -394,6 +394,10 @@ message StartShardEvacuationRequest {
bool ignore_errors = 2; bool ignore_errors = 2;
// Evacuation scope. // Evacuation scope.
uint32 scope = 3; uint32 scope = 3;
// Count of concurrent container evacuation workers.
uint32 container_worker_count = 4;
// Count of concurrent object evacuation workers.
uint32 object_worker_count = 5;
} }
Body body = 1; Body body = 1;

Binary file not shown.