engine: Evacuate object from shards concurrently #1356
|
@ -21,6 +21,9 @@ const (
|
|||
noProgressFlag = "no-progress"
|
||||
scopeFlag = "scope"
|
||||
|
||||
containerWorkerCountFlag = "container-worker-count"
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
|
||||
objectWorkerCountFlag = "object-worker-count"
|
||||
|
||||
scopeAll = "all"
|
||||
scopeObjects = "objects"
|
||||
scopeTrees = "trees"
|
||||
|
@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
|||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
|
||||
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
|
||||
|
||||
req := &control.StartShardEvacuationRequest{
|
||||
Body: &control.StartShardEvacuationRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Scope: getEvacuationScope(cmd),
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Scope: getEvacuationScope(cmd),
|
||||
ContainerWorkerCount: containerWorkerCount,
|
||||
ObjectWorkerCount: objectWorkerCount,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
|
|||
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
|
||||
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
maybe set default values here, but not on the server side? maybe set default values here, but not on the server side?
acid-ant
commented
No, that will lead to resource leak on a server side - when another client may set it zero values. No, that will lead to resource leak on a server side - when another client may set it zero values.
dstepanov-yadro
commented
On server side just check that those values are greater than zero. On server side just check that those values are greater than zero.
|
||||
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
|
||||
|
||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -24,6 +23,16 @@ import (
|
|||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
// containerWorkerCountDefault is a default value of the count of
|
||||
// concurrent container evacuation workers.
|
||||
containerWorkerCountDefault = 10
|
||||
// objectWorkerCountDefault is a default value of the count of
|
||||
// concurrent object evacuation workers.
|
||||
objectWorkerCountDefault = 10
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -79,6 +88,9 @@ type EvacuateShardPrm struct {
|
|||
IgnoreErrors bool
|
||||
Async bool
|
||||
Scope EvacuateScope
|
||||
|
||||
ContainerWorkerCount uint32
|
||||
ObjectWorkerCount uint32
|
||||
}
|
||||
|
||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||
|
@ -189,8 +201,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
|||
return res
|
||||
}
|
||||
|
||||
const defaultEvacuateBatchSize = 100
|
||||
|
||||
type pooledShard struct {
|
||||
hashedShard
|
||||
pool util.WorkerPool
|
||||
|
@ -242,8 +252,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var mtx sync.RWMutex
|
||||
copyShards := func() []pooledShard {
|
||||
mtx.RLock()
|
||||
defer mtx.RUnlock()
|
||||
t := make([]pooledShard, len(shards))
|
||||
copy(t, shards)
|
||||
return t
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
|
||||
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
|
||||
})
|
||||
|
||||
if prm.Async {
|
||||
|
@ -261,7 +279,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
var err error
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||
|
@ -287,13 +305,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
|||
return err
|
||||
}
|
||||
|
||||
for _, shardID := range shardIDs {
|
||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||
return err
|
||||
ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
|
||||
continueLoop := true
|
||||
for i := 0; continueLoop && i < len(shardIDs); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
continueLoop = false
|
||||
default:
|
||||
egShard.Go(func() error {
|
||||
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
err = egShard.Wait()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("shard error: %w", err)
|
||||
}
|
||||
errContainer := egContainer.Wait()
|
||||
errObject := egObject.Wait()
|
||||
if errContainer != nil {
|
||||
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
|
||||
}
|
||||
if errObject != nil {
|
||||
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
|
||||
}
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||
return err
|
||||
}
|
||||
|
||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
||||
zap.Strings("shard_ids", shardIDs),
|
||||
|
@ -309,6 +353,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
|
||||
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
|
||||
) {
|
||||
operationCtx, cancel := context.WithCancelCause(ctx)
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
It is ok, but looks redundant.
It is ok, but looks redundant.
This code will do almost the same:
```
egShard, ctx := errgroup.WithContext(ctx)
egContainer, ctx := errgroup.WithContext(ctx)
egObject, ctx := errgroup.WithContext(ctx)
```
acid-ant
commented
`eg.Wait()` cancels context. Routines which process containers and shards will end earlier than routines for objects. That is why they can't depend on context of each other.
|
||||
egObject, _ := errgroup.WithContext(operationCtx)
|
||||
objectWorkerCount := prm.ObjectWorkerCount
|
||||
if objectWorkerCount == 0 {
|
||||
objectWorkerCount = objectWorkerCountDefault
|
||||
}
|
||||
egObject.SetLimit(int(objectWorkerCount))
|
||||
egContainer, _ := errgroup.WithContext(operationCtx)
|
||||
containerWorkerCount := prm.ContainerWorkerCount
|
||||
if containerWorkerCount == 0 {
|
||||
containerWorkerCount = containerWorkerCountDefault
|
||||
}
|
||||
egContainer.SetLimit(int(containerWorkerCount))
|
||||
egShard, _ := errgroup.WithContext(operationCtx)
|
||||
|
||||
return operationCtx, cancel, egShard, egContainer, egObject
|
||||
}
|
||||
|
||||
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
|
||||
defer span.End()
|
||||
|
@ -335,8 +400,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||
trace.WithAttributes(
|
||||
|
@ -345,11 +411,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
|||
defer span.End()
|
||||
|
||||
if prm.Scope.WithObjects() {
|
||||
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||
if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
||||
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||
return err
|
||||
|
@ -359,44 +424,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||
) error {
|
||||
var listPrm shard.ListWithCursorPrm
|
||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
sh := shardsToEvacuate[shardID]
|
||||
sh.SetEvacuationInProgress(true)
|
||||
|
||||
var c *meta.Cursor
|
||||
for {
|
||||
listPrm.WithCursor(c)
|
||||
|
||||
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
|
||||
// because ListWithCursor works only with the metabase.
|
||||
listRes, err := sh.ListWithCursor(ctx, listPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
||||
break
|
||||
var cntPrm shard.IterateOverContainersPrm
|
||||
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
default:
|
||||
}
|
||||
egContainer.Go(func() error {
|
||||
var objPrm shard.IterateOverObjectsInContainerPrm
|
||||
objPrm.BucketName = name
|
||||
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
default:
|
||||
}
|
||||
egObject.Go(func() error {
|
||||
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return nil
|
||||
}
|
||||
err := sh.IterateOverObjectsInContainer(ctx, objPrm)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
|
||||
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c = listRes.Cursor()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
|
||||
sh.SetEvacuationInProgress(true)
|
||||
err := sh.IterateOverContainers(ctx, cntPrm)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
sh := shardsToEvacuate[shardID]
|
||||
shards := getShards()
|
||||
|
||||
var listPrm pilorama.TreeListTreesPrm
|
||||
first := true
|
||||
|
@ -637,68 +718,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
|||
return shards, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("objects_count", len(toEvacuate)),
|
||||
))
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
|
||||
defer span.End()
|
||||
|
||||
for i := range toEvacuate {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
addr := toEvacuate[i].Address
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
default:
|
||||
}
|
||||
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
shards := getShards()
|
||||
addr := objInfo.Address
|
||||
|
||||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
continue
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if evacuatedLocal {
|
||||
continue
|
||||
}
|
||||
|
||||
if prm.ObjectsHandler == nil {
|
||||
// Do not check ignoreErrors flag here because
|
||||
// ignoring errors on put make this command kinda useless.
|
||||
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
|
||||
}
|
||||
|
||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
if moved {
|
||||
res.objEvacuated.Add(1)
|
||||
} else if prm.IgnoreErrors {
|
||||
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
return fmt.Errorf("object %s was not replicated", addr)
|
||||
return nil
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if evacuatedLocal {
|
||||
return nil
|
||||
}
|
||||
|
||||
if prm.ObjectsHandler == nil {
|
||||
// Do not check ignoreErrors flag here because
|
||||
// ignoring errors on put make this command kinda useless.
|
||||
return fmt.Errorf("%w: %s", errPutShard, objInfo)
|
||||
}
|
||||
|
||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
if moved {
|
||||
res.objEvacuated.Add(1)
|
||||
} else if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
return fmt.Errorf("object %s was not replicated", addr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"fmt"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -174,13 +176,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
|||
errReplication := errors.New("handler error")
|
||||
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||
var n uint64
|
||||
var n atomic.Uint64
|
||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||
if n == max {
|
||||
if n.Load() == max {
|
||||
return false, errReplication
|
||||
}
|
||||
|
||||
n++
|
||||
n.Add(1)
|
||||
for i := range objects {
|
||||
if addr == objectCore.AddressOf(objects[i]) {
|
||||
require.Equal(t, objects[i], obj)
|
||||
|
@ -314,6 +316,36 @@ func TestEvacuateCancellation(t *testing.T) {
|
|||
require.Equal(t, uint64(0), res.ObjectsEvacuated())
|
||||
}
|
||||
|
||||
func TestEvacuateCancellationByError(t *testing.T) {
|
||||
t.Parallel()
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 10)
|
||||
defer func() {
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
var once atomic.Bool
|
||||
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||
var err error
|
||||
flag := true
|
||||
if once.CompareAndSwap(false, true) {
|
||||
err = errors.New("test error")
|
||||
flag = false
|
||||
}
|
||||
return flag, err
|
||||
}
|
||||
prm.Scope = EvacuateScopeObjects
|
||||
prm.ObjectWorkerCount = 2
|
||||
prm.ContainerWorkerCount = 2
|
||||
|
||||
_, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorContains(t, err, "test error")
|
||||
}
|
||||
|
||||
func TestEvacuateSingleProcess(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||
defer func() {
|
||||
|
@ -531,6 +563,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
|||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
mutex := sync.Mutex{}
|
||||
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids
|
||||
|
@ -545,7 +578,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
|||
if op.Time == 0 {
|
||||
return true, "", nil
|
||||
}
|
||||
mutex.Lock()
|
||||
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
||||
mutex.Unlock()
|
||||
height = op.Time + 1
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
|
@ -61,6 +62,20 @@ func (l ListRes) Cursor() *Cursor {
|
|||
return l.cursor
|
||||
}
|
||||
|
||||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||
type IterateOverContainersPrm struct {
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, []byte, cid.ID) error
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type IterateOverObjectsInContainerPrm struct {
|
||||
dstepanov-yadro
commented
The sentence must end with a period The sentence must end with a period
|
||||
// BucketName container's bucket name.
|
||||
BucketName []byte
|
||||
// Handler function executed upon objects in db.
|
||||
Handler func(context.Context, *objectcore.Info) error
|
||||
}
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase starting from
|
||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
||||
// Use cursor value from response for consecutive requests.
|
||||
|
@ -259,3 +274,155 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
|
|||
|
||||
return rawID, name[0]
|
||||
}
|
||||
|
||||
// IterateOverContainers lists physical containers available in metabase starting from first.
|
||||
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
|
||||
}()
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateOverContainers(ctx, tx, prm)
|
||||
})
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
|
||||
var containerID cid.ID
|
||||
for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} {
|
||||
c := tx.Cursor()
|
||||
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
|
||||
cidRaw, _ := parseContainerIDWithPrefix(&containerID, name)
|
||||
if cidRaw == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Replace with
to reduce seeks Replace with
```
for _, prefix := range []int{primaryPrefix, lockersPrefix, tombstonePrefix} {
c := tx.Cursor()
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
...
}
}
```
to reduce seeks
acid-ant
commented
Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once. Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once.
dstepanov-yadro
commented
Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others. Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others.
After fix iteration will be performed only on particular prefixes: `name != nil && bytes.HasPrefix(name, prefix); `
acid-ant
commented
Gotcha, fixed. Gotcha, fixed.
|
||||
bktName := make([]byte, len(name))
|
||||
copy(bktName, name)
|
||||
var cnt cid.ID
|
||||
copy(cnt[:], containerID[:])
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also `bkt := tx.Bucket(name)` make another one cursor seek inside, so this looks redundant.
acid-ant
commented
Thanks, fixed. That was copy-paste from Thanks, fixed. That was copy-paste from `listWithCursor`.
|
||||
err := prm.Handler(ctx, bktName, cnt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
|
||||
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
|
||||
}()
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
var containerID cid.ID
|
||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
|
||||
if cidRaw == nil {
|
||||
return nil
|
||||
}
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
|
||||
})
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Add check against nil Add check against nil
acid-ant
commented
Added. Added.
|
||||
|
||||
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
|
||||
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
|
||||
) error {
|
||||
bkt := tx.Bucket(prm.BucketName)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Could be done before transaction start Could be done before transaction start
acid-ant
commented
Done. Done.
|
||||
}
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
c := bkt.Cursor()
|
||||
k, v := c.First()
|
||||
|
||||
var objType objectSDK.Type
|
||||
|
||||
switch prefix {
|
||||
case primaryPrefix:
|
||||
objType = objectSDK.TypeRegular
|
||||
case lockersPrefix:
|
||||
objType = objectSDK.TypeLock
|
||||
case tombstonePrefix:
|
||||
objType = objectSDK.TypeTombstone
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(k); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
var ecInfo *objectcore.ECInfo
|
||||
if objType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(v); err != nil {
|
||||
return err
|
||||
}
|
||||
isLinkingObj = isLinkObject(&o)
|
||||
ecHeader := o.ECHeader()
|
||||
if ecHeader != nil {
|
||||
ecInfo = &objectcore.ECInfo{
|
||||
ParentID: ecHeader.Parent(),
|
||||
Index: ecHeader.Index(),
|
||||
Total: ecHeader.Total(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var a oid.Address
|
||||
a.SetContainer(containerID)
|
||||
a.SetObject(obj)
|
||||
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
||||
err := prm.Handler(ctx, &objInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID {
|
|||
return r.containers
|
||||
}
|
||||
|
||||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||
type IterateOverContainersPrm struct {
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, []byte, cid.ID) error
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type IterateOverObjectsInContainerPrm struct {
|
||||
// BucketName container's bucket name.
|
||||
BucketName []byte
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, *objectcore.Info) error
|
||||
}
|
||||
|
||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||
type ListWithCursorPrm struct {
|
||||
count uint32
|
||||
|
@ -164,3 +178,54 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
|||
cursor: res.Cursor(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IterateOverContainers lists physical containers presented in shard.
|
||||
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
dstepanov-yadro
commented
It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something? It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something?
acid-ant
commented
You are right, You are right, `RLock` here and in `IterateOverObjectsInContainer` is required. Fixed.
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
var metaPrm meta.IterateOverContainersPrm
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate over containers: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
|
||||
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
var metaPrm meta.IterateOverObjectsInContainerPrm
|
||||
metaPrm.BucketName = prm.BucketName
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate over objects: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -23,12 +23,14 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
|
|||
}
|
||||
|
||||
prm := engine.EvacuateShardPrm{
|
||||
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicateObject,
|
||||
TreeHandler: s.replicateTree,
|
||||
Async: true,
|
||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicateObject,
|
||||
TreeHandler: s.replicateTree,
|
||||
Async: true,
|
||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
|
||||
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
|
||||
}
|
||||
|
||||
_, err = s.s.Evacuate(ctx, prm)
|
||||
|
|
|
@ -394,6 +394,10 @@ message StartShardEvacuationRequest {
|
|||
bool ignore_errors = 2;
|
||||
// Evacuation scope.
|
||||
uint32 scope = 3;
|
||||
// Count of concurrent container evacuation workers.
|
||||
uint32 container_worker_count = 4;
|
||||
// Count of concurrent object evacuation workers.
|
||||
uint32 object_worker_count = 5;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
|
I have doubts that this parameter is necessary. Can you test with a value of 1 and a value of 10, for example, for 100 containers?
It will be helpful for case when we need to evacuate containers with policy
REP 1
only.In a bad case, we need ~100ms to get info about container from
neo-go
.For 1_000 containers, we need around 1m to iterate over containers with
container-worker-count
= 1.For 1_000 containers, we need around 10s to iterate over containers with
container-worker-count
= 10.For 10_000 containers, we need around 16m to iterate over containers with
container-worker-count
= 1.For 10_000 containers, we need around 1m40s to iterate over containers with
container-worker-count
= 10.Measured with unit test here.