engine: Evacuate object from shards concurrently #1356

Merged
acid-ant merged 1 commit from acid-ant/frostfs-node:feat/evac-parallel into master 2024-09-24 12:02:17 +00:00
8 changed files with 468 additions and 108 deletions

View file

@ -21,6 +21,9 @@ const (
noProgressFlag = "no-progress"
scopeFlag = "scope"
containerWorkerCountFlag = "container-worker-count"
dstepanov-yadro marked this conversation as resolved Outdated

I have doubts that this parameter is necessary. Can you test with a value of 1 and a value of 10, for example, for 100 containers?

I have doubts that this parameter is necessary. Can you test with a value of 1 and a value of 10, for example, for 100 containers?

It will be helpful for case when we need to evacuate containers with policy REP 1 only.
In a bad case, we need ~100ms to get info about container from neo-go.

For 1_000 containers, we need around 1m to iterate over containers with container-worker-count = 1.
For 1_000 containers, we need around 10s to iterate over containers with container-worker-count = 10.

For 10_000 containers, we need around 16m to iterate over containers with container-worker-count = 1.
For 10_000 containers, we need around 1m40s to iterate over containers with container-worker-count = 10.

Measured with unit test here.

It will be helpful for case when we need to evacuate containers with policy `REP 1` only. In a bad case, we need ~100ms to get info about container from `neo-go`. For 1_000 containers, we need around 1m to iterate over containers with `container-worker-count` = 1. For 1_000 containers, we need around 10s to iterate over containers with `container-worker-count` = 10. For 10_000 containers, we need around 16m to iterate over containers with `container-worker-count` = 1. For 10_000 containers, we need around 1m40s to iterate over containers with `container-worker-count` = 10. Measured with unit test [here]([url](https://git.frostfs.info/TrueCloudLab/frostfs-node/commit/2218e42a10518d6be4d4375003a0a0dbe8e348d6)).
objectWorkerCountFlag = "object-worker-count"
scopeAll = "all"
scopeObjects = "objects"
scopeTrees = "trees"
@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount,
},
}
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
dstepanov-yadro marked this conversation as resolved Outdated

maybe set default values here, but not on the server side?

maybe set default values here, but not on the server side?

No, that will lead to resource leak on a server side - when another client may set it zero values.

No, that will lead to resource leak on a server side - when another client may set it zero values.

On server side just check that those values are greater than zero.

On server side just check that those values are greater than zero.
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -10,7 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -24,6 +23,16 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
// containerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
containerWorkerCountDefault = 10
// objectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
objectWorkerCountDefault = 10
)
var (
@ -79,6 +88,9 @@ type EvacuateShardPrm struct {
IgnoreErrors bool
Async bool
Scope EvacuateScope
ContainerWorkerCount uint32
ObjectWorkerCount uint32
}
// EvacuateShardRes represents result of the EvacuateShard operation.
@ -189,8 +201,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
@ -242,8 +252,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return nil, err
}
var mtx sync.RWMutex
copyShards := func() []pooledShard {
mtx.RLock()
defer mtx.RUnlock()
t := make([]pooledShard, len(shards))
copy(t, shards)
return t
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
})
if prm.Async {
@ -261,7 +279,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
@ -287,13 +305,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err
}
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
continueLoop := true
for i := 0; continueLoop && i < len(shardIDs); i++ {
select {
case <-ctx.Done():
continueLoop = false
default:
egShard.Go(func() error {
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
if err != nil {
cancel(err)
}
return err
})
}
}
err = egShard.Wait()
if err != nil {
err = fmt.Errorf("shard error: %w", err)
}
errContainer := egContainer.Wait()
errObject := egObject.Wait()
if errContainer != nil {
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
}
if errObject != nil {
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
}
if err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs),
@ -309,6 +353,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return nil
}
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
) {
operationCtx, cancel := context.WithCancelCause(ctx)
dstepanov-yadro marked this conversation as resolved Outdated

It is ok, but looks redundant.
This code will do almost the same:

egShard, ctx := errgroup.WithContext(ctx)
egContainer, ctx := errgroup.WithContext(ctx)
egObject, ctx := errgroup.WithContext(ctx)
It is ok, but looks redundant. This code will do almost the same: ``` egShard, ctx := errgroup.WithContext(ctx) egContainer, ctx := errgroup.WithContext(ctx) egObject, ctx := errgroup.WithContext(ctx) ```

eg.Wait() cancels context. Routines which process containers and shards will end earlier than routines for objects. That is why they can't depend on context of each other.

`eg.Wait()` cancels context. Routines which process containers and shards will end earlier than routines for objects. That is why they can't depend on context of each other.
egObject, _ := errgroup.WithContext(operationCtx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = objectWorkerCountDefault
}
egObject.SetLimit(int(objectWorkerCount))
egContainer, _ := errgroup.WithContext(operationCtx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = containerWorkerCountDefault
}
egContainer.SetLimit(int(containerWorkerCount))
egShard, _ := errgroup.WithContext(operationCtx)
return operationCtx, cancel, egShard, egContainer, egObject
}
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End()
@ -335,8 +400,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
return nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
@ -345,11 +411,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
defer span.End()
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
return err
}
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
@ -359,44 +424,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egContainer.Go(func() error {
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egObject.Go(func() error {
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
if err != nil {
cancel(err)
}
return err
})
return nil
}
err := sh.IterateOverObjectsInContainer(ctx, objPrm)
if err != nil {
cancel(err)
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
})
return nil
}
return nil
sh.SetEvacuationInProgress(true)
err := sh.IterateOverContainers(ctx, cntPrm)
if err != nil {
cancel(err)
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
shards := getShards()
var listPrm pilorama.TreeListTreesPrm
first := true
@ -637,68 +718,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil
}
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
defer span.End()
for i := range toEvacuate {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
addr := toEvacuate[i].Address
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
shards := getShards()
addr := objInfo.Address
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
if err != nil {
return err
}
if evacuatedLocal {
continue
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
return nil
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
if err != nil {
return err
}
if evacuatedLocal {
return nil
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, objInfo)
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
}
return nil
}

View file

@ -6,6 +6,8 @@ import (
"fmt"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@ -174,13 +176,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
errReplication := errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n uint64
var n atomic.Uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
if n == max {
if n.Load() == max {
return false, errReplication
}
n++
n.Add(1)
for i := range objects {
if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj)
@ -314,6 +316,36 @@ func TestEvacuateCancellation(t *testing.T) {
require.Equal(t, uint64(0), res.ObjectsEvacuated())
}
func TestEvacuateCancellationByError(t *testing.T) {
t.Parallel()
e, ids, _ := newEngineEvacuate(t, 2, 10)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
var once atomic.Bool
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
var err error
flag := true
if once.CompareAndSwap(false, true) {
err = errors.New("test error")
flag = false
}
return flag, err
}
prm.Scope = EvacuateScopeObjects
prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm)
require.ErrorContains(t, err, "test error")
}
func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
@ -531,6 +563,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm
prm.ShardID = ids
@ -545,7 +578,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
if op.Time == 0 {
return true, "", nil
}
mutex.Lock()
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
mutex.Unlock()
height = op.Time + 1
}
}

View file

@ -1,6 +1,7 @@
package meta
import (
"bytes"
"context"
"time"
@ -61,6 +62,20 @@ func (l ListRes) Cursor() *Cursor {
return l.cursor
}
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {

The sentence must end with a period

The sentence must end with a period
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
@ -259,3 +274,155 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
return rawID, name[0]
}
// IterateOverContainers lists physical containers available in metabase starting from first.
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverContainers(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
var containerID cid.ID
for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} {
c := tx.Cursor()
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
cidRaw, _ := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
dstepanov-yadro marked this conversation as resolved Outdated

Replace with

for _, prefix := range []int{primaryPrefix, lockersPrefix, tombstonePrefix} {
   c := tx.Cursor()
   for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
   ...
   }
}

to reduce seeks

Replace with ``` for _, prefix := range []int{primaryPrefix, lockersPrefix, tombstonePrefix} { c := tx.Cursor() for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() { ... } } ``` to reduce seeks

Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once.

Don't see benefits from this, could you point me where I'm wrong. Even if we start from the particular prefix, we need to iterate over all which follows them, and with your approach we need to do that for all other prefixes. In the current implementation, we need to do that iteration only once.

Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others.
After fix iteration will be performed only on particular prefixes: name != nil && bytes.HasPrefix(name, prefix);

Now iteration performed over all of buckets with all of prefixes: small, root, owner, user attributes and others. After fix iteration will be performed only on particular prefixes: `name != nil && bytes.HasPrefix(name, prefix); `

Gotcha, fixed.

Gotcha, fixed.
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID
copy(cnt[:], containerID[:])
dstepanov-yadro marked this conversation as resolved Outdated

To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also bkt := tx.Bucket(name) make another one cursor seek inside, so this looks redundant.

To check that cursor's current item is bucket just compare value with nil. Also root cursor always iterates over buckets. Also `bkt := tx.Bucket(name)` make another one cursor seek inside, so this looks redundant.

Thanks, fixed. That was copy-paste from listWithCursor.

Thanks, fixed. That was copy-paste from `listWithCursor`.
err := prm.Handler(ctx, bktName, cnt)
if err != nil {
return err
}
}
}
return nil
}
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
dstepanov-yadro marked this conversation as resolved Outdated

Add check against nil

Add check against nil

Added.

Added.
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
) error {
bkt := tx.Bucket(prm.BucketName)
if bkt == nil {
return nil
dstepanov-yadro marked this conversation as resolved Outdated

Could be done before transaction start

Could be done before transaction start

Done.

Done.
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, v := c.First()
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(containerID)
a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo)
if err != nil {
return err
}
}
return nil
}

View file

@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID {
return r.containers
}
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon containers in db.
Handler func(context.Context, *objectcore.Info) error
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
@ -164,3 +178,54 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
cursor: res.Cursor(),
}, nil
}
// IterateOverContainers lists physical containers presented in shard.
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
dstepanov-yadro marked this conversation as resolved Outdated

It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something?

It is necessary to use a read lock on the mutex for whole operation, not just to get mode. Or I missing something?

You are right, RLock here and in IterateOverObjectsInContainer is required. Fixed.

You are right, `RLock` here and in `IterateOverObjectsInContainer` is required. Fixed.
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over containers: %w", err)
}
return nil
}
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over objects: %w", err)
}
return nil
}

View file

@ -23,12 +23,14 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
}
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
}
_, err = s.s.Evacuate(ctx, prm)

View file

@ -394,6 +394,10 @@ message StartShardEvacuationRequest {
bool ignore_errors = 2;
// Evacuation scope.
uint32 scope = 3;
// Count of concurrent container evacuation workers.
uint32 container_worker_count = 4;
// Count of concurrent object evacuation workers.
uint32 object_worker_count = 5;
}
Body body = 1;

Binary file not shown.